Window とは
Dataflow 、つまり Apache Beam では、データの論理的な塊である PCollection は2種類の扱いがあり、Bounded と Unbounded と言われます。
Bounded は、データ量が定まっているデータのことをいい、これを処理する場合は基本的にバッチジョブになります。
Unbounded は、データ量が定まらず定常的に発生するようなデータのことをいい、これを処理する場合はストリーム処理になります。
ストリーム処理の場合、例えば、GroupByKey などの集計処理をする際にどういったまとまりでデータを集計対象にするかが重要になります。一方、バッチは、限られたデータなのでまずは全てを対象にしますね。
そのまとまりを Window といい、Window の種類によってストリームデータの区切り方が異なります。
Window の種類
Window には、以下の3種類がある。()カッコ内は、Apache Beam での名称。
- タンブリングウィンドウ (固定ウィンドウ)
- ホッピングウィンドウ (スライディングウィンドウ)
- セッションウィンドウ
タンブリングウィンドウ
タンブリングウィンドウは、Unbounded のデータストリーム(つまり、PCollection)を重なりなく固定幅時間で分割します。(イメージは、Google Cloud のサイトから引用)
つまり、15秒ごとの集計とか1分ごとの集計みたいなことができます。
ホッピングウィンドウ
ホッピングウィンドウは、タンブリングウィンドウと同様にデータストリームを固定幅で分割しますが、重なりを許容します。したがって、ウィンドウの幅と開始時間を選択します。
以下のイメージは、ウィンドウ幅を60秒にして、開始を30秒にしています。なので、前のウィンドウと30秒分のデータが被ります。(イメージは、Google Cloud のサイトから引用)
何故こんなのがあるかというと、タンブリングウィンドウの場合、データの数値の増減が激しい場合、それがそのまま集計結果に現れます。もちろんそれで良い場合もありますが、分析が突発的なデータの変動にひきづられないようにしたい場合、こういった重なりを用いてマイルドにします。
移動平均みたいな分析と同じような考え方ですね。
セッションウィンドウ
セッションウィンドウは、前出の2つと違い、ウィンドウの幅を固定せず、一定の連続したデータの発生範囲で分割します。データが発生してから、指定した時間間隔以上にデータが途切れなければ一つのウィンドウとみなします。言い換えると、指定した時間以上にデータの発生が止まるとウィンドウを一旦分割して集計対象にします。また、これをキーごとに分割します。
次のイメージでは、左上の「Minimum Gap Duration」におさまらない時間データが発生しなかったら別のウィンドウに分割されます。逆に、右上の赤字のようにそれ以下のデータの発生断絶はウィンドウの分割対象にはなりません。
Webのユーザのサイト遷移やマウスの動きなど、一定の操作単位で分析が必要な場合にこのようなウィンドウが有効になります。
ウォーターマーク
データパイプライン上では、データが発生した時間 イベント時間 に必ずしも各ステップで処理は 処理時間 で処理されない。処理時間は処理される際のシステム時間を指します。あと、順序通りデータが処理される保証もありません。つまり、イベント時間の若いデータが先に処理される可能性だってあります。
そういう前提において、ストリーミングデータをWindowに分割して処理する場合に、ウィンドウ内のデータが全て届くためにはいつまで待てば良いかには答えがないことになります。なぜならどれくらい遅れてデータが届くかわからないからです。
そこでこのウォーターマークという考え方が登場します。ウォーターマークは、とあるWindow内の全てのデータがいつパイプラインに到着すると考えるか、というシステム概念です。一度、ウォーターマークが一度Windowの最後の時刻を過ぎたら、それ以降に届いたデータは、たとえそのウィンドウに含まれるべきタイムスタンプを持っていても遅延データとして扱われることになります。
とあるWindow内の全てのデータがいつパイプラインに到着すると考えるか
とありますが、ここで言う全ては結果的には全てにならないかもしれないことを示唆しています。本当に全てのデータの到着を待つととんでもない時間待つ可能性が出てきて、いつまで経っても集計結果などが出せないからです。
例えば、00:00:00
から 00:05:00
のWindowが
トリガー
トリガーは、ウィンドウ単位で集計された結果を出力するタイミングを決める機能です。Apache Beam SDK で設定できますが、Dataflow SQLでは使えません。
トリガーには、以下の種類があります。
- イベント時間トリガー: イベントタイムを指定して、トリガーにする。デフォルトはこれ。
- 処理時間トリガー: 処理時間でトリガーを設定する。
- データ駆動トリガー: データの特性をトリガーにする。現時点では、データの数をトリガーにすることのみできる。
- 複合トリガー: 上の組み合わせ
これをどう設定するかは、以下のバランスを考慮して決める必要がある。
- 完全性: 全てのデータが揃うことがどれくらい必要か
- レイテンシ: どれくらいデータを待てるのか。例えば、全てのデータを待てるのか。あるいは、データが到着したらすぐ欲しいのか。
- コスト: レイテンシを下げるためにどれくらいコストをかけることができるか
例えば、時間に厳密に更新を要求するシステムの場合、時間ベースのトリガーでN秒ごとに出力する設定を使い、データの完全性より即時性を優先している。正確な時間よりも完全性を求める場合は、デフォルトトリガーを選択して、ウィンドウが最後まできたら出力すれば良い。
また、シングルグローバルウィンドウを使った無制限データに対してもトリガーを設定できる。これは、パイプラインに定期的な更新を無制限データ上で求める際に有用になります。例えば、その時点での移動平均をN秒ごとやN個ごとに出したい場合など。
Emelemt の timestamp
参考
https://cloud.google.com/dataflow/docs/concepts/streaming-pipelines?hl=ja
https://beam.apache.org/documentation/programming-guide/#windowing