[Apache Beam] (https://beam.apache.org/) で、ステートフル (StatefulDoFn) を使用すると、各イベントの処理時に共通して使える状態を持たせることができる。
0. テストデータ
次のような8個のイベントデータを処理していくことにする。
events = TestStream().add_elements([
TimestampedValue(0, 1600000000),
TimestampedValue(1, 1600000001),
TimestampedValue(1, 1600000002),
TimestampedValue(2, 1600000003),
TimestampedValue(3, 1600000004),
TimestampedValue(5, 1600000005),
TimestampedValue(8, 1600000006),
TimestampedValue(13, 1600000007),
])
1. IndexAssigningStatefulDoFn
を用いて各イベントに連番でインデックスを割り当てる
IndexAssigningStatefulDoFn
は以下のリソースを参考にして次にように作成した。
- Beamのステートフル処理を紹介している公式ブログ記事: Stateful processing with Apache Beam
- Beamのテストコード: beam/sdks/python/apache_beam/transforms/userstate_test.py
この DoFn は、INDEX_STATE
というインデックスを保持しておくステートを持っていて、イベントを1つ処理するごとに現在のインデックスを割り当て、インデックスを1インクリメントする。
class IndexAssigningStatefulDoFn(beam.DoFn):
INDEX_STATE = CombiningValueStateSpec("index", sum)
def process(self, element, state=beam.DoFn.StateParam(INDEX_STATE)):
unused_key, value = element
current_index = state.read()
yield (current_index, value)
state.add(1)
次のようなコードで、0. で作ったテストデータに対して IndexAssigningStatefulDoFn
を適用 (ParDo) させるようなパイプラインを実行すると、各イベントに連番でインデックスを付けることができる。
パイプラインコード
_ = (p
| events
| beam.Map(lambda x: (None, x))
| beam.ParDo(IndexAssigningStatefulDoFn())
| beam.Map(print))
出力
(0, 0)
(1, 1)
(2, 1)
(3, 2)
(4, 3)
(5, 5)
(6, 8)
(7, 13)
※ タプルの1個目がインデックスで、2個目がイベントデータ本体
2. 【問題】ウィンドウへ分割後のデータに対するインデックス割り当てがうまくいかない
元のデータを Fixed window (サイズは2) へ分割後に各ウィンドウごとのイベントデータをリストにまとめて、各リストに対して連番でインデックスを割り振ろうとするとうまくいかない。
パイプラインコード
_ = (p
| events
| beam.WindowInto(beam.window.FixedWindows(size=2))
| beam.CombineGlobally(beam.combiners.ToListCombineFn()).without_defaults()
| beam.Map(lambda x: (None, x))
| beam.ParDo(IndexAssigningStatefulDoFn())
| beam.Map(print))
出力
(0, [0, 1])
(0, [1, 2])
(0, [3, 5])
(0, [8, 13])
全部のイベントデータにインデックス 0 がついてしまう。
3. 【解決法】ウィンドウ分割後に再度グローバルウィンドウに入れる必要がある
Fixed windows へ分割後のデータを再度 Global window へ入れると解決する。
パイプラインコード
_ = (p
| events
| beam.WindowInto(beam.window.FixedWindows(size=2))
| beam.CombineGlobally(beam.combiners.ToListCombineFn()).without_defaults()
| "Global Windows" >> beam.WindowInto(beam.window.GlobalWindows()) # ここが差分
| beam.Map(lambda x: (None, x))
| beam.ParDo(IndexAssigningStatefulDoFn())
| beam.Map(print))
出力
(0, [0, 1])
(1, [1, 2])
(2, [3, 5])
(3, [8, 13])
ちゃんと連番でインデックスが振られる。
Appendix
フルのソースコードはこちら:
https://gist.github.com/kotarot/1da48904557b2e2cbd5a159fa1acf9fc