LoginSignup
1
2

More than 3 years have passed since last update.

Apache Beam でウィンドウ分割後に連番インデックスを割り当てる StatefulDoFn

Last updated at Posted at 2020-12-05

Apache Beam で、ステートフル (StatefulDoFn) を使用すると、各イベントの処理時に共通して使える状態を持たせることができる。

0. テストデータ

次のような8個のイベントデータを処理していくことにする。

events = TestStream().add_elements([
    TimestampedValue(0, 1600000000),
    TimestampedValue(1, 1600000001),
    TimestampedValue(1, 1600000002),
    TimestampedValue(2, 1600000003),
    TimestampedValue(3, 1600000004),
    TimestampedValue(5, 1600000005),
    TimestampedValue(8, 1600000006),
    TimestampedValue(13, 1600000007),
])

1. IndexAssigningStatefulDoFn を用いて各イベントに連番でインデックスを割り当てる

IndexAssigningStatefulDoFn は以下のリソースを参考にして次にように作成した。

この DoFn は、INDEX_STATE というインデックスを保持しておくステートを持っていて、イベントを1つ処理するごとに現在のインデックスを割り当て、インデックスを1インクリメントする。

class IndexAssigningStatefulDoFn(beam.DoFn):
    INDEX_STATE = CombiningValueStateSpec("index", sum)

    def process(self, element, state=beam.DoFn.StateParam(INDEX_STATE)):
        unused_key, value = element
        current_index = state.read()
        yield (current_index, value)
        state.add(1)

次のようなコードで、0. で作ったテストデータに対して IndexAssigningStatefulDoFn を適用 (ParDo) させるようなパイプラインを実行すると、各イベントに連番でインデックスを付けることができる。

パイプラインコード

_ = (p
    | events
    | beam.Map(lambda x: (None, x))
    | beam.ParDo(IndexAssigningStatefulDoFn())
    | beam.Map(print))

出力

(0, 0)
(1, 1)
(2, 1)
(3, 2)
(4, 3)
(5, 5)
(6, 8)
(7, 13)

※ タプルの1個目がインデックスで、2個目がイベントデータ本体

2. 【問題】ウィンドウへ分割後のデータに対するインデックス割り当てがうまくいかない

元のデータを Fixed window (サイズは2) へ分割後に各ウィンドウごとのイベントデータをリストにまとめて、各リストに対して連番でインデックスを割り振ろうとするとうまくいかない。

パイプラインコード

_ = (p
    | events
    | beam.WindowInto(beam.window.FixedWindows(size=2))
    | beam.CombineGlobally(beam.combiners.ToListCombineFn()).without_defaults()
    | beam.Map(lambda x: (None, x))
    | beam.ParDo(IndexAssigningStatefulDoFn())
    | beam.Map(print))

出力

(0, [0, 1])
(0, [1, 2])
(0, [3, 5])
(0, [8, 13])

全部のイベントデータにインデックス 0 がついてしまう。

3. 【解決法】ウィンドウ分割後に再度グローバルウィンドウに入れる必要がある

Fixed windows へ分割後のデータを再度 Global window へ入れると解決する。

パイプラインコード

_ = (p
    | events
    | beam.WindowInto(beam.window.FixedWindows(size=2))
    | beam.CombineGlobally(beam.combiners.ToListCombineFn()).without_defaults()
    | "Global Windows" >> beam.WindowInto(beam.window.GlobalWindows())  # ここが差分
    | beam.Map(lambda x: (None, x))
    | beam.ParDo(IndexAssigningStatefulDoFn())
    | beam.Map(print))

出力

(0, [0, 1])
(1, [1, 2])
(2, [3, 5])
(3, [8, 13])

ちゃんと連番でインデックスが振られる。

Appendix

フルのソースコードはこちら:
https://gist.github.com/kotarot/1da48904557b2e2cbd5a159fa1acf9fc

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2