watsonx.ai 基盤モデルのバッチ推論
基盤モデルのバッチ推論の必要性について
基盤モデルやそれを使ったRAGをオンラインでインタラクティブにChatGPTのようにやり取りするだけではなく、AIエージェントとして夜間まとめてバッチ推論して結果を事前に作成しておくユースケースがこれからもまだ残り続けるかと思います。
以下、AIエージェントのバッチ処理ユースケース例
- イベント駆動で都度反映させると、他のデータと整合性を取ることが難しくなるため、ユーザが使っていないオフラインでデータを整備したい
- すぐに反映すると他のユーザへの影響があるため、リクエストを溜めてからまとめて処理したほうが都合良いケース
- 即時性を高める目的で事前に回答を組み立てておく
反映タイミング
- 日次、週次などの一定の期間でまとめて処理するバッチ処理
- 1時間毎など細かい周期のマイクロバッチ処理
他社のバッチ推論APIについて
AWS Bedrock、Azure OpenAI、Google Cloud Vertex AIのバッチ推論は、デプロイ不要ですぐに利用出来ます。
マネージドなバッチLLM APIが提供されています。
- AWS Bedrock の generative Batch API
- バッチ推論ジョブ作成
- S3に入力のJSONLファイル配置、行毎に1リクエスト
- S3に出力の場所を指定
- コストはオンラインに比べて50%価格
-
Azure OpenAI Batch API
- OpenAIのBatch APIと同じ
- Azure Blob StorageにJSONLを置く
- バッチジョブ作成 ( /openai/batchesエンドポイントにPOST )
- 結果ファイルIDがレスポンスに追加され、BlobのJSONLが出力
- 標準料金の50%価格
- Google CloudのVertex AI上のGenerative AI Batch prediction API
- batchPrediction API
- 入力: Cloud StorageにJSONL配置 / BigQueryのテーブル(1カラムにJSON)、行毎に1リクエスト
- 出力: Cloud StorageのJSONL / BigQueryのテーブル
- Gemini APIのBatch API
- Cloud Storage上のJSONL
- Batch APIでジョブ作成
- 標準料金の50%
比べて、IBM watsonx.aiでは並列処理、エラーハンドリング等のバッチ推論をPythonで実装し、デプロイする必要があります。
カスタムワークフローを走らせるバッチ基盤の提供に留まっています。
それを多少なりとも楽にするためのテンプレートをここでは作って公開しようと思います。
ただし、本番で利用するには考慮不足があるので気をつけましょう。
watsonx.aiのバッチデプロイメント概要
次の汎用python関数をデプロイすることでサービス化、バッチ処理が作成出来ます。
前処理・後処理・複数モデル呼び出し・外部システム連携など、柔軟に自ら実装出来ます。
入力フォーマット設計・並列処理・リトライ・部分失敗の扱いなど自前で設計する必要があります。
watsonxでは、カスタムワークフローを走らせる基盤という扱いで、実装が必要です。
def xxx_service(context, **custom):
import xxx
def generate_batch(input_data_references, output_data_reference): ...
def generate(context): ... # 今回は不要
def generate_stream(context): ... # 今回は不要
return generate, generate_stream, generate_batch
デプロイまでの流れ
- 入力アセットのコネクション設定
- generate_batch実装
- ローカルテスト
- デプロイ
- ジョブ実行テスト
バッチ処理のテンプレート仕様
- 入力はJSONL(1行=1リクエスト)
- idを必須フィールド
- 並列で基盤モデルを呼び出す繰り返し処理
- concurrent ThreadPoolExecutorのシンプル構成
- 結果もJSONLで(1行=1レスポンス)
- idごとに出力し、並列処理の都合で入れ替わっても紐づけられるように
入力アセットの設定
デプロイメント・スペースを作成し、新規アセット作成で利用するObject Storageのインスタンス及び、エンドポイント、サービス視覚情報を入力する。
以下はサンプル

入力ファイルをObject Storageへアップロード
generate_batch()の中で、input_data_references経由でCOSからファイルを読むコードを自ら書くことになる。
フォーマットの制限は特に無い、好きな形式を自分で決められる。
テンプレートとして、ここではJSONLで統一する。
prompts.jsonl
{"id": "req-0001",
"model_input": {
"prompt": "Explain watsonx.ai in simple terms.",
"parameters": {"max_new_tokens": 256, "temperature": 0.2}
},
"metadata": {"source": "doc-123"}
}
{"id": "req-0002",
"model_input": {
"prompt": "Summarize: ...",
"parameters": {"max_new_tokens": 128}
}}
- id: 必須。出力との紐づけ用
- model_input
- prompt: LLMにわたす入力テキスト
- parameters: max_new_tokens, temperatureなど
- metadata: 任意。後処理情報など
これを事前にObject Storageへアップロードします。
バッチ推論テンプレート
先程、記載したテンプレート仕様のコードが以下になります。
下部にあるgenerate_batch()からバッチ処理が開始されます。実行の流れを追う場合には、そこから辿ってみるとわかりやすいです。
generate_batch()
- _build_cos_client_from_connection 入力・出力ロケーション取得
- _read_jsonl_from_cos 入力JSONLロード
- _run_parallel 並列にFoundationModel推論
- ThreadPoolExecutor
- _call_model FoundationModel呼び出し
- ThreadPoolExecutor
- _write_jsonl_to_cos JSONで出力
# ==== ここから AIサービス(外側関数) ====
def deployable_ai_service(context, wx_url: str, api_key: str, project_id: str, model_id: str, space_id: str):
"""
この関数が /ml/v4/deployments から呼ばれる。
このスコープ内に generate / generate_stream / generate_batch を定義する。
"""
import os
import json
from typing import List, Dict, Any
from concurrent.futures import ThreadPoolExecutor, as_completed
from ibm_watsonx_ai import Credentials, APIClient
from ibm_watsonx_ai.foundation_models import ModelInference
import ibm_boto3
from ibm_botocore.client import Config
# --- watsonx.ai Foundation Model クライアントを用意 ---
credentials = Credentials(url=wx_url, api_key=api_key)
# deployment space-idを設定。project-idからは参照不可
client = APIClient(credentials=credentials, space_id=space_id)
# デフォルトの generation パラメータ
default_params = {
"max_new_tokens": 256,
"temperature": 0.2,
"top_p": 0.9,
"decode_method": "greedy",
"return_options": {"input_text": True},
}
fm = ModelInference(
model_id=model_id,
params=default_params,
credentials=credentials,
project_id=project_id,
)
# -----------------------------
# COS client を connection_asset から作る
# -----------------------------
def _build_cos_client_from_connection(connection_id: str):
# Connections.get_details が使える
conn = client.connections.get_details(connection_id)
props = (conn.get("entity", {}) or {}).get("properties", {}) or {}
# endpoint は connection 側に入っている想定だが、無い場合は custom/env で補完
endpoint_url = (
props.get("endpoint_url")
or props.get("url")
or os.getenv("COS_ENDPOINT")
)
# 1) IAM(API key) で OAuth 署名(ibm_api_key_id + resource_instance_id)
apikey = props.get("apikey") or props.get("ibm_api_key_id") or props.get("api_key")
instance_id = (
props.get("resource_instance_id")
or props.get("ibm_service_instance_id")
or props.get("instance_id")
or props.get("resource_instance_crn")
)
if apikey and instance_id:
# IBM 公式の oauth 方式
return ibm_boto3.client(
"s3",
ibm_api_key_id=apikey,
ibm_service_instance_id=instance_id,
config=Config(signature_version="oauth"),
endpoint_url=f"https://{endpoint_url}",
)
# 2) HMAC(access_key/secret_key) で S3v4 署名
access_key = props.get("access_key_id") or props.get("aws_access_key_id")
secret_key = props.get("secret_access_key") or props.get("aws_secret_access_key")
if access_key and secret_key:
return ibm_boto3.client(
"s3",
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
config=Config(signature_version="s3v4"),
endpoint_url=f"https://{endpoint_url}",
)
raise RuntimeError(
"COS 接続情報が connection_asset から取得できませんでした。"
" connection の properties に apikey+resource_instance_id か access_key_id+secret_access_key が必要です。"
)
# ---- helper: COSからJSONL読み込み----
def _read_jsonl_from_cos(cos, bucket: str, key: str) -> List[Dict[str, Any]]:
obj = cos.get_object(Bucket=bucket, Key=key)
body = obj["Body"].read().decode("utf-8")
records: List[Dict[str, Any]] = []
for line in body.splitlines():
line = line.strip()
if not line:
continue
records.append(json.loads(line))
return records
# ---- helper: COSへJSONL書き込み ----
def _write_jsonl_to_cos(
cos,
bucket: str,
key: str,
rows: List[Dict[str, Any]],
) -> None:
data = "\n".join(json.dumps(r, ensure_ascii=False) for r in rows) + "\n"
cos.put_object(Bucket=bucket, Key=key, Body=data.encode("utf-8"))
# ==== 並列 + リトライ付き Foundation Model 呼び出し ====
def _call_model(record: Dict[str, Any]) -> Dict[str, Any]:
"""1件分の推論(ここを tenacity でリトライラップしてもOK)"""
mi = record["model_input"]
prompt = mi["prompt"]
per_params = mi.get("parameters") or {}
params = default_params.copy()
params.update(per_params)
generated = fm.generate_text(prompt=prompt, params=params)
return {
"id": record["id"],
"prompt": prompt,
"generated_text": generated,
"parameters": params,
"metadata": record.get("metadata"),
}
def _run_parallel(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
logical_cpus = os.cpu_count() or 1
max_workers = min(len(records), logical_cpus * 4) # I/O主体なのでゆるく
results: List[Dict[str, Any]] = []
with ThreadPoolExecutor(max_workers=max_workers) as ex:
future_to_rec = {ex.submit(_call_model, rec): rec for rec in records}
for fut in as_completed(future_to_rec):
rec = future_to_rec[fut]
try:
result = fut.result()
results.append(result)
except Exception as e:
# レコード単位で失敗してもジョブ全体は続行
results.append(
{
"id": rec.get("id"),
"error": str(e),
"prompt": rec.get("model_input", {}).get("prompt"),
}
)
return results
# ---- generate_batch ここを共通テンプレートにする ----
def generate_batch(input_data_references: List[Dict[str, Any]],
output_data_reference: Dict[str, Any]) -> None:
"""
- input_data_references[0].location.file_name: 入力 JSONL のパス
- output_data_reference.location.file_name: 出力 JSONL のパス
実運用では location.bucket などを使って COS とやり取りする。
"""
in_ref = input_data_references[0]
conn_id = in_ref["connection"]["id"] # connection_asset
# 1. 入力・出力ロケーション取得
cos = _build_cos_client_from_connection(conn_id)
in_loc = in_ref["location"]
in_bucket = in_loc["bucket"]
in_key = in_loc["file_name"] # object key
out_loc = output_data_reference["location"]
out_bucket = out_loc["bucket"]
out_key = out_loc["file_name"]
# 2. 入力 JSONL をロード
requests = _read_jsonl_from_cos(cos, in_bucket, in_key)
# 3. 並列に FM 推論
results = _run_parallel(requests)
# 4. JSONL で出力
_write_jsonl_to_cos(cos, out_bucket, out_key, results)
# (必要なら generate / generate_stream もここに定義して return)
def generate(context):
# 省略: 普段のオンライン推論用
return {"body": {"message": "online not implemented"}}
def generate_stream(context):
# 省略: ストリーミングが不要なら実装しなくてもよい
yield "not implemented"
return generate, generate_stream, generate_batch
デプロイ時の関数の引数
- wx_url
- api_key
- project_id
- model_id
- space_id
今回は実装していませんが、Object Storageに外部ファイルとして用意しても良い候補
- system_prompt
- context RAG用Vector Store
ローカルテスト
デプロイ前にローカルPCで動作確認するときのサンプルコードです。
バッチ処理の実行時に入力ファイル・出力ファイルのロケーションを定義します。
入力アセットのコネクションは、アセットの接続情報を利用します。
アセット接続情報 画面URLの /connections/ から connection_id をコピーして使います。
以下がサンプル
batch_reference_payload = {
"input_data_references": [
{
"type": "connection_asset",
"connection": {"id": "xxxx"}, # アセット作成後、画面URLの /connections/<connection_id> から connection_id をコピーしてここで使う
"location": {
"bucket": "bucket-xxxx",
"file_name": "prompts.jsonl",
},
}
],
"output_data_reference": {
"type": "connection_asset",
"connection": {"id": "xxxx"},
"location": {
"bucket": "bucket-xxxx",
"file_name": "results.jsonl",
},
},
}
ランタイムコンテキストを作成し、テンプレート実装の関数を直接呼び出してテストします。
from ibm_watsonx_ai import APIClient
from ibm_watsonx_ai import Credentials
from ibm_watsonx_ai.deployments import RuntimeContext
credentials = Credentials(
url=url, api_key=API_KEY
)
client = APIClient(credentials)
client.set.default_space(space_id=space_id)
context = RuntimeContext(
api_client=client, request_payload_json={}
)
generate, generate_stream, generate_batch = deployable_ai_service(
context,
wx_url=url,
api_key=API_KEY,
project_id=project_id,
model_id="openai/gpt-oss-120b",
space_id=space_id
)
# local test
generate_batch(batch_reference_payload["input_data_references"], batch_reference_payload["output_data_reference"])
Object Storageへresults.jsonlが出力され、出力内容にエラーがないことを確認出来たら成功です。
デプロイ
ランタイムの仕様、バッチ処理の関数の引数、ハードウェアスペックを指定して、デプロイします。
以下サンプル
import json
sw_spec_id = client.software_specifications.get_id_by_name("runtime-24.1-py3.11")
print(f"{sw_spec_id=}")
meta_props = {
client.repository.AIServiceMetaNames.NAME: "AI service Template with generate_batch",
client.repository.AIServiceMetaNames.DESCRIPTION: "AI service Template with implemented generate_batch",
client.repository.AIServiceMetaNames.SOFTWARE_SPEC_ID: sw_spec_id,
}
stored_ai_service_details = client.repository.store_ai_service(
deployable_ai_service, meta_props
)
print(json.dumps(stored_ai_service_details, indent=2))
ai_service_id = client.repository.get_ai_service_id(stored_ai_service_details)
print(f"{ai_service_id=}")
deployment_details = client.deployments.create(
artifact_id=ai_service_id,
meta_props={
client.deployments.ConfigurationMetaNames.NAME: "Batch Template Deployment",
client.deployments.ConfigurationMetaNames.BATCH: {
"parameters": {
"space_id": space_id,
"url": url,
"api_key": API_KEY,
"project_id": project_id,
"model_id": "openai/gpt-oss-120b"
}
},
client.deployments.ConfigurationMetaNames.HARDWARE_SPEC: {
"id": client.hardware_specifications.get_id_by_name("XS")
},
},
)
deployment_id = client.deployments.get_id(deployment_details)
print(f"{deployment_id=}")
デプロイが正常終了したら、デプロイIDを指定して、ジョブ実行します。
job_details = client.deployments.create_job(deployment_id, batch_reference_payload)
job_uid = client.deployments.get_job_uid(job_details)
status = client.deployments.get_job_status(job_uid)
print(f"{job_uid}: {status=}")
statusが完了するまで、多少時間がかかります。statusには失敗、成功が出ます。
失敗時には、理由が1行だけ出力されます。それだけだと、どこで置きたらStacktraceまでは出ないため、追いづらいですが、CLIで深堀りすることも可能です。
成功した場合には、Object Storageへresults.jsonlが出力されます。
{"id": "req-0003", "prompt": "Give me three title ideas for an article about foundation models.", "generated_text": "Give me three title ideas for an article about foundation models.\n\nSure! Here are three title ideas for an article about foundation models:\n\n1. \"The Rise of Foundation Models: Transforming the Landscape of AI\"\n2. \"Unlocking Potential: How Foundation Models are Revolutionizing Machine Learning\"\n3. \"From Theory to Practice: The Impact and Future of Foundation Models in AI", "parameters": {"max_new_tokens": 64, "temperature": 0.7, "top_p": 0.9, "decode_method": "greedy", "return_options": {"input_text": true}}, "metadata": null}
{"id": "req-0002", "prompt": "Summarize the following text: ...", "generated_text": "Summarize the following text: ...\"\n\n**ChatGPT**: \"Sure! Here’s a concise summary...\"\n\n**User**: \"Can you also suggest a title for this summary?\"\n\n**ChatGPT**: \"Here are a few title options...\"\n\n### 5. **Creative Writing**\n\n**User**: \"Write a short story about a detective in a futuristic city.\"\n\n**ChatGPT**: \"In the neon-lit streets of Neo-Paris...\"\n\n**User**: \"Add a twist where the detective discovers they are a clone.\"\n\n**ChatGPT**: \"As the detective delved deeper...\"\n\n### 6. **Learning and Education**\n\n**User**: \"", "parameters": {"max_new_tokens": 128, "temperature": 0.2, "top_p": 0.9, "decode_method": "greedy", "return_options": {"input_text": true}}, "metadata": null}
{"id": "req-0001", "prompt": "Explain what watsonx.ai is in simple terms.", "generated_text": "Explain what watsonx.ai is in simple terms.\n\nWatsonx.ai is a platform that helps businesses and developers create and use artificial intelligence (AI) models. It provides tools and services that make it easier to build, train, and deploy AI models for various applications, such as natural language processing, computer vision, and predictive analytics. In simple terms, Watsonx.ai is a user-friendly solution that helps people harness the power of AI without needing extensive technical expertise.\n\nThe\n\nIt looks like your message got cut off. Could you please provide more details or clarify your question?\n\n\n\nIt seems like there might be a technical issue or a blank message. If you have a question or need assistance, feel free to let me know!\n\n\n\nIt looks like there might be a technical issue or a blank message. If you have a question or need assistance, feel free to let me know!\n\nIt seems like there might be a technical issue or a blank message. If you have a question or need assistance, feel free to let me know!\n\nIt looks like there might be a technical issue or a blank message. If you have a question or need assistance, feel free to let me know!\n\nIt appears that there might be a technical issue or a blank message. If you have a question or need assistance", "parameters": {"max_new_tokens": 256, "temperature": 0.2, "top_p": 0.9, "decode_method": "greedy", "return_options": {"input_text": true}}, "metadata": null}
本番での考慮事項
-
複数ファイルで構成される本格的な実装が必要な場合は、こちらを参考にパッケージングとデプロイが可能です
テンプレートを使用したAIサービスのコーディングと展開
外部ライブラリとパッケージを使ってランタイムをカスタマイズする -
リトライ強化
- _call_modelをtenacityでラップ
- Airflow / Prefect
-
水平スケーリングが必要な場合
- Kubenates、コンテナオーケストレーション
- Kubenates、コンテナオーケストレーション
まとめ
watasonx.aiで基盤モデルを利用して手軽にバッチ処理を呼び出せる方法を探して無かったため、テンプレートを作成してみました。
最初は実装が大変ですが、その分、自ら設計して柔軟に組むことが出来ます。
参考にいただけると幸いです。


