概要
Apache Beamで提供されているBatchElementsは、下流の処理(APIコールやデータベースへの書き込みなど)にかかる「固定コスト」を、要素をまとめて処理することで効率化(アモルタイズ)するための便利なトランスフォームです。
このクラスの主な使い方と、各パラメータの役割を分かりやすく解説します。
1. 基本的な使い方
最もシンプルな使い方は、PCollectionに対して適用するだけです。デフォルトでは、要素数(count)に基づいてバッチが作成されます。
import apache_beam as beam
from apache_beam.transforms.util import BatchElements
with beam.Pipeline() as p:
(
p
| 'CreateData' >> beam.Create(range(1000))
| 'Batch' >> BatchElements(min_batch_size=10, max_batch_size=100)
| 'ProcessBatch' >> beam.Map(lambda batch: print(f"Processing batch of size: {len(batch)}"))
)
2. パラメータの仕組みと調整方法
BatchElementsの賢い点は、「下流の処理時間を計測して、最適なバッチサイズを自動で調整する」ところにあります。
コスト計算の考え方
このトランスフォームは、処理時間が以下の式に従うと仮定しています。
-
target_batch_overhead:
全体の処理時間のうち、固定コスト(オーバーヘッド)が占める割合の目標値です。デフォルトの0.05は、「オーバーヘッドを全体の5%以内に抑えたい(=95%は実処理に使いたい)」という意味になります。 -
target_batch_duration_secs:
1つのバッチ処理にかける時間の目標値(秒)です。リアルタイム性が重要な場合は、この値を小さく設定します。
サイズの制限
-
min_batch_size/max_batch_size:
自動調整されるバッチサイズの動く範囲を決めます。 -
ヒント: バッチサイズを固定したい場合は、この2つを同じ値(例:
min_batch_size=100, max_batch_size=100)に設定します。
3. 応用:要素の「大きさ」でバッチを組む
単なる「個数」ではなく、データのバイト数や特定の重みでバッチを組みたい場合は、element_size_fn を使います。
# 例:文字列の長さに基づいてバッチサイズ(合計文字数)を調整する
| 'BatchByLength' >> BatchElements(
min_batch_size=100, # 合計100文字以上
max_batch_size=5000, # 合計5000文字以下
element_size_fn=lambda x: len(x)
)
4. 注意点
-
ウィンドウ内での動作:
BatchElementsは、ウィンドウごとにバッチを作成します。ウィンドウを跨いで要素がまとめられることはありません。 -
動的な調整: 処理が始まると、最初は小さなバッチから開始し、下流の
DoFnなどの実行時間をプロファイリングしながら、徐々に最適なサイズへと自動で変化させていきます。