Geminiで数万件の推論を一度にさせようとすると、レート制限がネックになる。
対応策としてはバッチモードの利用があるが、バッチモードで構造化出力を使わずにFileAPIを使う例や、バッチモードでFileAPIを使わずに構造化出力を使う例は公式にあるものの、バッチモード+FileAPI+構造化出力を一度に使うサンプルコードが見当たらなかったのでメモ。
構造化出力のためのスキーマを定義
# 構造化出力のためのスキーマを定義
recipe_schema = {
"type": "array",
"items": {
"type": "object",
"properties": {
"recipe_name": {"type": "string"},
"ingredients": {
"type": "array",
"items": {"type": "string"}
}
},
"required": ["recipe_name", "ingredients"]
}
}
FileAPIでアップロードするファイルの作成
# JSONLファイルの内容を作成
requests_data = [
{
"contents": [{
"parts": [{"text": "List a few popular cookie recipes, and include the amounts of ingredients."}],
"role": "user"
}],
"generationConfig": {
"responseMimeType": "application/json",
"responseJsonSchema": recipe_schema
}
},
{
"contents": [{
"parts": [{"text": "List a few popular gluten free cookie recipes, and include the amounts of ingredients."}],
"role": "user"
}],
"generationConfig": {
"responseMimeType": "application/json",
"responseJsonSchema": recipe_schema
}
}
]
# JSONLファイルを作成
jsonl_filename = "batch-requests.jsonl"
with open(jsonl_filename, "w") as f:
for i, req in enumerate(requests_data):
generate_content_request = {
"key": f"request-{i+1}",
"request": req
}
f.write(json.dumps(generate_content_request) + "\n")
File APIを使用してファイルをアップロード
uploaded_file = client.files.upload(
file=jsonl_filename,
config={
"display_name": "structured-output-batch-requests",
"mime_type": "application/jsonl"
}
)
print(f"Uploaded file: {uploaded_file.name}")
アップロードしたファイルを使用してバッチジョブを作成
file_batch_job = client.batches.create(
model="models/gemini-2.0-flash",
src=uploaded_file.name,
config={
"display_name": "structured-output-job-1"
}
)
# ジョブの完了を待機
job_name = file_batch_job.name
print(f"Polling status for job: {job_name}")
結果が生成されるまでポーリング
while True:
batch_job = client.batches.get(name=job_name)
if batch_job.state.name in ('JOB_STATE_SUCCEEDED', 'JOB_STATE_FAILED', 'JOB_STATE_CANCELLED', 'JOB_STATE_EXPIRED'):
break
print(f"Job not finished. Current state: {batch_job.state.name}. Waiting 30 seconds...")
time.sleep(30)
print(f"Job finished with state: {batch_job.state.name}")
結果を取得して表示
if batch_job.state.name == 'JOB_STATE_SUCCEEDED':
if batch_job.dest and batch_job.dest.file_name:
# 結果がファイルに保存されている場合
result_file_name = batch_job.dest.file_name
print(f"Results are in file: {result_file_name}")
print("Downloading result file content...")
# 結果ファイルをダウンロード
file_content = client.files.download(file=result_file_name)
# JSONLファイルの各行を処理
lines = file_content.decode('utf-8').strip().split('\n')
for i, line in enumerate(lines):
print(f"\n--- Response {i+1} ---")
try:
response_data = json.loads(line)
# レスポンスの構造に応じて処理
if 'response' in response_data:
if hasattr(response_data['response'], 'text'):
print(response_data['response']['text'])
else:
# GenerateContentResponseの場合
if 'candidates' in response_data['response']:
for candidate in response_data['response']['candidates']:
if 'content' in candidate and 'parts' in candidate['content']:
for part in candidate['content']['parts']:
if 'text' in part:
print(part['text'])
elif 'error' in response_data:
print(f"Error: {response_data['error']}")
except json.JSONDecodeError as e:
print(f"Error parsing response line: {e}")
print(f"Raw line: {line}")
else:
print("No file result found.")
else:
print(f"Job did not succeed. Final state: {batch_job.state.name}")
if hasattr(batch_job, 'error') and batch_job.error:
print(f"Error: {batch_job.error}")