Documentation Index Fetch the complete documentation index at: https://mintlify.com/GoogleCloudPlatform/generative-ai/llms.txt
Use this file to discover all available pages before exploring further.
Batch Prediction
Batch prediction allows you to send large numbers of multimodal requests to Gemini asynchronously. Instead of getting immediate responses, results are written to Cloud Storage or BigQuery when processing completes.
Why Batch Prediction?
Cost Effective 50% lower cost compared to online predictions
High Volume Process thousands of requests in a single job
No Rate Limits Bypass per-minute quota restrictions
When to Use Batch Prediction
✅ Good Use Cases:
Processing large datasets (1000+ items)
Offline analysis and evaluation
Bulk content classification or summarization
Dataset labeling and annotation
Periodic batch jobs (nightly, weekly)
Cost-sensitive workloads
❌ Not Suitable For:
Real-time applications
Interactive user experiences
Low-latency requirements
Small request volumes (less than 100 items)
Supported Models
Batch prediction is available for:
gemini-3.1-pro-preview
gemini-3-flash-preview
gemini-2.5-pro
gemini-2.5-flash
gemini-2.0-flash
Quick Start
Installation
pip install --upgrade google-genai google-cloud-storage google-cloud-bigquery
Setup
import os
from google import genai
from google.genai.types import CreateBatchJobConfig
PROJECT_ID = "your-project-id"
LOCATION = "global" # or "us-central1"
client = genai.Client(
vertexai = True ,
project = PROJECT_ID ,
location = LOCATION
)
Cloud Storage Workflow
Create a JSONL file with your requests:
batch_requests.jsonl:
{ "request" :{ "contents" :[{ "role" : "user" , "parts" :[{ "text" : "Summarize this: AI is transforming industries." }]}], "generationConfig" :{ "temperature" : 0.4 }}}
{ "request" :{ "contents" :[{ "role" : "user" , "parts" :[{ "text" : "What is machine learning?" }]}], "generationConfig" :{ "temperature" : 0.2 }}}
{ "request" :{ "contents" :[{ "role" : "user" , "parts" :[{ "text" : "Explain neural networks." }]}], "generationConfig" :{ "temperature" : 0.3 }}}
Step 2: Upload to Cloud Storage
# Create bucket
gsutil mb -l us-central1 gs://your-bucket-name
# Upload input file
gsutil cp batch_requests.jsonl gs://your-bucket-name/input/
Step 3: Submit Batch Job
batch_job = client.batches.create(
model = "gemini-2.5-flash" ,
src = "gs://your-bucket-name/input/batch_requests.jsonl" ,
config = CreateBatchJobConfig(
dest = "gs://your-bucket-name/output/"
)
)
print ( f "Job created: { batch_job.name } " )
print ( f "State: { batch_job.state } " )
Step 4: Monitor Job Status
import time
# Poll until complete
while batch_job.state in [ "JOB_STATE_PENDING" , "JOB_STATE_RUNNING" , "JOB_STATE_QUEUED" ]:
time.sleep( 10 )
batch_job = client.batches.get( name = batch_job.name)
print ( f "Status: { batch_job.state } " )
if batch_job.state == "JOB_STATE_SUCCEEDED" :
print ( " \n ✓ Job completed successfully!" )
print ( f "Output: { batch_job.dest.gcs_uri } " )
else :
print ( f " \n ✗ Job failed: { batch_job.error } " )
Step 5: Retrieve Results
import pandas as pd
import fsspec
# Read results from Cloud Storage
fs = fsspec.filesystem( "gcs" )
file_paths = fs.glob( f " { batch_job.dest.gcs_uri } /*/predictions.jsonl" )
if file_paths:
df = pd.read_json( f "gs:// { file_paths[ 0 ] } " , lines = True )
# Extract responses
df = df.join(pd.json_normalize(df[ "response" ], "candidates" ))
# View results
for idx, row in df.iterrows():
request_text = row[ "request" ][ "contents" ][ 0 ][ "parts" ][ 0 ][ "text" ]
response_text = row[ "content" ][ "parts" ][ 0 ][ "text" ]
print ( f " \n Request: { request_text } " )
print ( f "Response: { response_text[: 200 ] } ..." )
Multimodal Batch Requests
Images
{ "request" :{ "contents" :[{ "role" : "user" , "parts" :[{ "text" : "Describe this image." },{ "file_data" :{ "file_uri" : "gs://samples/image1.jpg" , "mime_type" : "image/jpeg" }}]}], "generationConfig" :{ "temperature" : 0.4 }}}
{ "request" :{ "contents" :[{ "role" : "user" , "parts" :[{ "text" : "What objects are visible?" },{ "file_data" :{ "file_uri" : "gs://samples/image2.jpg" , "mime_type" : "image/jpeg" }}]}], "generationConfig" :{ "temperature" : 0.4 }}}
Videos
{ "request" :{ "contents" :[{ "role" : "user" , "parts" :[{ "text" : "Summarize this video." },{ "file_data" :{ "file_uri" : "gs://samples/video.mp4" , "mime_type" : "video/mp4" }}]}], "generationConfig" :{ "temperature" : 0.3 }}}
PDFs
{ "request" :{ "contents" :[{ "role" : "user" , "parts" :[{ "text" : "Extract key findings." },{ "file_data" :{ "file_uri" : "gs://samples/paper.pdf" , "mime_type" : "application/pdf" }}]}], "generationConfig" :{ "temperature" : 0.2 }}}
BigQuery Workflow
CREATE OR REPLACE TABLE `project.dataset.batch_input` AS
SELECT
STRUCT(
[STRUCT(
'user' AS role ,
[STRUCT('What is AI?' AS text)] AS parts
)] AS contents,
STRUCT( 0 . 4 AS temperature) AS generationConfig
) AS request
UNION ALL
SELECT
STRUCT(
[STRUCT(
'user' AS role ,
[STRUCT('Explain machine learning.' AS text)] AS parts
)] AS contents,
STRUCT( 0 . 3 AS temperature) AS generationConfig
) AS request;
Step 2: Submit Batch Job
from google.genai.types import BigQueryDestination, BigQuerySource
batch_job = client.batches.create(
model = "gemini-2.5-flash" ,
src = BigQuerySource(
input_uri = f "bq:// { PROJECT_ID } .dataset.batch_input"
),
config = CreateBatchJobConfig(
dest = BigQueryDestination(
output_uri = f "bq:// { PROJECT_ID } .dataset.batch_output"
)
)
)
print ( f "Job ID: { batch_job.name } " )
Step 3: Query Results
SELECT
request . contents [0].parts[0]. text AS input_text,
response . candidates [0]. content . parts [0]. text AS output_text,
response . usageMetadata .totalTokenCount AS total_tokens
FROM `project.dataset.batch_output`
WHERE status = ''
LIMIT 10 ;
System Instructions
{
"request" : {
"contents" : [
{
"role" : "user" ,
"parts" : [{ "text" : "Translate 'hello' to Spanish." }]
}
],
"systemInstruction" : {
"parts" : [{ "text" : "You are a professional translator." }]
},
"generationConfig" : {
"temperature" : 0.2
}
}
}
Safety Settings
{
"request" : {
"contents" : [{ "role" : "user" , "parts" : [{ "text" : "Your prompt" }]}],
"safetySettings" : [
{
"category" : "HARM_CATEGORY_HATE_SPEECH" ,
"threshold" : "BLOCK_LOW_AND_ABOVE"
}
],
"generationConfig" : { "temperature" : 0.4 }
}
}
Multiple Models
Mix different generation configs per request:
{ "request" :{ "contents" :[ ... ], "generationConfig" :{ "temperature" : 0.2 , "maxOutputTokens" : 100 }}}
{ "request" :{ "contents" :[ ... ], "generationConfig" :{ "temperature" : 0.8 , "maxOutputTokens" : 500 }}}
{ "request" :{ "contents" :[ ... ], "generationConfig" :{ "temperature" : 0.1 , "topP" : 0.9 }}}
List and Manage Jobs
List All Jobs
for job in client.batches.list():
print ( f "Job: { job.name } " )
print ( f " Created: { job.create_time } " )
print ( f " State: { job.state } " )
print ( f " Model: { job.model } " )
print ()
Get Job Details
job = client.batches.get( name = "projects/.../locations/.../batchPredictionJobs/..." )
print ( f "State: { job.state } " )
print ( f "Progress: { job.completion_stats } " )
print ( f "Input: { job.src } " )
print ( f "Output: { job.dest } " )
print ( f "Error: { job.error } " )
Cancel a Job
client.batches.cancel( name = batch_job.name)
print ( "Job cancelled" )
Response Structure
Batch prediction output JSONL format:
{
"status" : "" ,
"processed_time" : "2024-03-09T10:30:00.000Z" ,
"request" : {
"contents" : [{ "role" : "user" , "parts" : [{ "text" : "What is AI?" }]}],
"generationConfig" : { "temperature" : 0.4 }
},
"response" : {
"candidates" : [{
"content" : {
"role" : "model" ,
"parts" : [{ "text" : "AI stands for Artificial Intelligence..." }]
},
"finishReason" : "STOP" ,
"avgLogprobs" : -0.123
}],
"usageMetadata" : {
"promptTokenCount" : 12 ,
"candidatesTokenCount" : 150 ,
"totalTokenCount" : 162
},
"modelVersion" : "gemini-2.5-flash@001"
}
}
Error Handling
Request-Level Errors
Check status field in output:
for idx, row in df.iterrows():
if row[ "status" ]:
print ( f "Error in request { idx } : { row[ 'status' ] } " )
else :
print ( f "Request { idx } : Success" )
Job-Level Errors
if batch_job.state == "JOB_STATE_FAILED" :
print ( f "Job failed: { batch_job.error.message } " )
print ( f "Error code: { batch_job.error.code } " )
elif batch_job.state == "JOB_STATE_CANCELLED" :
print ( "Job was cancelled" )
elif batch_job.state == "JOB_STATE_PAUSED" :
print ( "Job is paused" )
Cost Optimization
Calculate Costs
def calculate_batch_cost ( df ):
"""Calculate approximate batch prediction cost."""
total_input_tokens = 0
total_output_tokens = 0
for _, row in df.iterrows():
if row[ "response" ]:
usage = row[ "response" ].get( "usageMetadata" , {})
total_input_tokens += usage.get( "promptTokenCount" , 0 )
total_output_tokens += usage.get( "candidatesTokenCount" , 0 )
# Batch pricing (50% discount)
INPUT_RATE = 0.0005 # Per 1K tokens
OUTPUT_RATE = 0.0015 # Per 1K tokens
input_cost = (total_input_tokens / 1000 ) * INPUT_RATE
output_cost = (total_output_tokens / 1000 ) * OUTPUT_RATE
return {
"input_tokens" : total_input_tokens,
"output_tokens" : total_output_tokens,
"input_cost" : input_cost,
"output_cost" : output_cost,
"total_cost" : input_cost + output_cost
}
costs = calculate_batch_cost(df)
print ( f "Total cost: $ { costs[ 'total_cost' ] :.4f} " )
Best Practices
Batch Size Optimal batch size: 100-10,000 requests per file
File Location Keep input files in us-central1 for best performance
Monitoring Monitor job progress via console or API polling
Retries Implement retry logic for failed individual requests
Format : JSONL (JSON Lines) with one request per line
Size : Up to 10,000 requests per file
Location : Must be in us-central1 region
Naming : Use regex patterns like gs://bucket/*.jsonl for multiple files
Permissions : Service account needs storage.objects.get access
Output Considerations
Results maintain input order
Failed requests included with error status
Output files written to timestamped subdirectories
Use BigQuery for easier querying of large result sets
Processing Results at Scale
Parallel Processing
import concurrent.futures
def process_result ( row ):
"""Process a single result row."""
if row[ "status" ]:
return { "error" : row[ "status" ]}
response = row[ "response" ][ "candidates" ][ 0 ][ "content" ][ "parts" ][ 0 ][ "text" ]
return { "success" : response}
with concurrent.futures.ThreadPoolExecutor( max_workers = 10 ) as executor:
results = list (executor.map(process_result, df.to_dict( 'records' )))
Export to Database
from google.cloud import bigquery
bq_client = bigquery.Client()
# Write results to BigQuery
table_id = f " { PROJECT_ID } .dataset.results"
df.to_gbq(table_id, project_id = PROJECT_ID , if_exists = "replace" )
Next Steps
Context Caching Cache repeated content in batch jobs
Multimodal Process images and videos in batch
Function Calling Use function calling in batch requests
Grounding Ground batch predictions in data sources
Resources