Distributed computing (Apache Hadoop, Spark, ...) Advent Calendar 2016 15日目の記事です. Apache FlinkのStreaming APIにおけるWindow周りの機能を紹介してみます. https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html に書いてあるような内容です.
ベースにしているFlinkのバージョンは1.1です. 例などはScalaで書きますが、Javaでも大体同じような感じです.
はじめに
ストリーム処理では、データは終わること無く永遠に流れてきます. 従って、そこに対して計算を行うためには、何かしらの手段で計算対象のデータを区切ることが必要になってきます. この、永続的に流れるデータを区切るための考え方がWindowです. 例えば、"過去1分間"とか、"過去の10個のイベント"とかになります.
さらに、実際に処理を行う上ではWindowそのものだけではなく、そのWindowの中身はいつ計算されるのか、どのように計算されるのか、といった要素も必要になってきます. 本記事では、それらを記述するためのFlinkのAPIを紹介します.
なお、"いつ"という要素については実時間(Processing Time)なのか、イベントの時刻(Event Time)なのか、という話もあるのですが、それについては既にAdvent Calendarの別の日の記事になっている(http://qiita.com/sitotkfm/items/812766a4a4cf5e6a8501) のでそちらも読んでいただけると良いと思います. FlinkでのEventTimeの使い方は本記事の最後に記載します.
Window関連APIの概要
Apache FlinkでWindowを利用した処理を書く場合、以下のような書き方をします.
val input: DataStream[T] = ...
input
.keyBy(<key selector>)
.window(<window assigner>)
.trigger(<trigger>)
.<windowed transformation>(<window function>)
それぞれの要素の役割は以下の通りです.
-
keyBy(<key selector>)
- ストリームを何かしらのキーで分割します. キーは、集計時のGROUP BYに相当します. 例えばPVだったらサイトのIDとかですね
- 基本的に各Windowはキーごとに生成されます. (そうでない処理も書けますが、分散処理はできません)
-
window(<window assigner>)
- 入力のデータが、どこのwindowに属するかを決めます
-
trigger(<trigger>)
- どのタイミングでWindowの評価を行い結果を出力するか、またWindowの内容を削除するか、を決めます
- 明示的に書かなくても良いケースもあります
-
<windowed transformation>(<window function>)
- Window内のデータに対する処理を書く部分です
Window Assigner
Window Assignerは、入力データがどのwindowに属するかを決めます. Flinkでサポートされているwindow assignerは以下の4つがあります. 画像は公式ドキュメントへのリンクです.
Tumbling Windows
例えば10:00〜11:00, 11:00〜12:00, ...のように固定の長さを持ち、お互いに重なり合わない形のwindowです. Windowのサイズは時間またはイベント数で決まります. イベント数の場合は、例えばサイズが10であれば、1〜10番目のイベントが属するWindow, 11〜20番目のWindow, という形になります.
Tumbling Windowを使う場合は、以下のように書きます.
// 時間ベースのWindow
stream.timeWindow(Time.hours(1))
// または
stream.TumblingProcessingTimeWindow.of(Time.hours(10))
// イベント数ベースのWindow
stream.countWindow(10)
Sliding Windows
Tumbling Windowのように一定のサイズを持ちますが、それとは別にスライド幅、というパラメータを持ち、その幅ずつスライドしていくwindowです. 従ってwindow同士が重なり合います.
例えば時刻ベースで、windowサイズ1時間, スライド幅10分であれば、10:00〜11:00, 10:10〜11:10, 10:20〜11:20,...のように、各windowが重なり合いながら、スライド幅ずつずれていきます. 時刻ベースまたはイベント数ベースのwindowを使用することができます.
// 時間ベースのWindow. 第二引数にスライド幅を取る.
stream.timeWindow(Time.hours(10), Time.minutes(10))
// または
stream.TumblingProcessingTimeWindow.of(Time.seconds(10))
// イベント数ベースのwindow.
stream.countWindow(10, 2)
Session Windows
固定幅ではなく、イベントが飛んでくる限り終了しないwindowです. 一定時間イベントが無いと、windowが終了します. Webアプリケーションのセッションのようなイメージのものです. どれくらいの時間イベントが無い場合にwindowを切るか、というパラメータ(session gap)を持ちます.
stream.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
Global Windows
上記のどれとも異なり、ずっと終了しない、無限のサイズを持つwindowです. 時系列のどこで来たイベントであっても、同じキーであれば同じwindowに所属します.
stream.window(GlobalWindows.create())
終了しない、ということはWindowを評価し、結果を出力するタイミングがありません. 従って、通常はtriggerを指定し、その中でwindow評価のタイミングを記述します.
Window Function
Window Assignerにより、計算対象のデータが属するwindowが決まりました. 通常は、これらに対して何かしらの計算をして、結果を導き出す、Aggregationをすることになります. その計算ロジックを決めるのがWindow Functionです. SQLで言うとselect文の中身、 sum, countなどの部分になります. 基本的には、何かしらの関数を記述する形になります.
例えば、(単語, 数) なタプルを集計するケースを考えます.
reduceを使って書くとこうなります. 2つの値をまとめて一つにする処理を書きます.
.reduce{ (a, b) => (b._2, a._1 + b._1)}
foldだとこんな感じです. 初期値を設定し、そこに一つの値が来た時に、次の値をどのように計算するか、という形で処理を書きます. reduceよりももう少し書ける処理の幅が広くなります.
.fold("",0){ (r,i) => (i._1, r._1 + i._1 )}
一番自由度が高いのがapplyです. reduceやfoldと違い、そのwindowの対象になっているキーや、window自身の情報にもアクセスすることができます. 例えば、windowの開始/終了タイムスタンプを取得することもできます. また、Collector.collectを複数回呼ぶことで、一つのaggregation関数の中で複数の値を出力することもできます.
.apply(
{(key, window, records: Iterable[(String, Int)], collector: Collector[(String, Int)]) =>
val result = records.reduceLeft{ (a, b)=> (a._1, a._2 + b._2)}
collector.collect((result._1, result._2))
})
ただし、reduce, foldの場合はwindowにイベントが入ってくる都度集計結果が更新される、というインクリメンタルな処理になりますが、applyの場合は全てのイベントをWindow内に蓄積し、Windowを評価するタイミングで関数が実行される、という動きになります.
apply的にメタ情報へアクセスしつつ、reduce/foldのようにインクリメンタルに処理をする、ということもできますが、ここでは割愛します. 興味があれば冒頭にリンクしたドキュメントを見てみてください.
Trigger
さて、各イベントが所属するwindowが決まり、処理方法も決まったとして、出力が生成されるタイミングは何で決まるのでしょうか? それを決めるのがTriggerです. とは言え、Window Assignerを決めた時点で期待するTriggerの挙動が決まる場合も多いでしょう. 例えば、大抵の場合はWindowの終了時に出力が生成されることを期待すると思います. よって、各Window AssignerにはデフォルトのTriggerが決まっており、明示的に指定しなかった場合はそれが使われます.
カスタムのTriggerを書きたいユースケースとしては、例えば以下のようなものがあります
- Time windowは1日だけど、アップデートされた結果を1分ごとに出力したい
- Time windowでもCount windowでもなく、特定のイベントが来たタイミングをwindowの終了としたい
TriggerのAPIは以下のようになっています. イベントが到着したタイミングや、windowが終了したタイミングでそれぞれのメソッドが呼ばれ、各メソッドはCONTINUE(何もしない), FIRE(その時点でのWindowの内容を評価し、出力する), PURGE(評価せずにwindowの内容を削除する), FIRE_AND_PURGE(評価し、windowを削除する)のいずれかを返します.
// イベントが到着するごとに呼ばれる
abstract TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx)
// EventTimeを使っていて、指定した時間になった時に呼ばれる
abstract TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx)
// ProcessingTimeを使っていて、指定した時間になった時に呼ばれる
abstract TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx)
これにより、出力を生成するタイミング、windowからデータをクリアするタイミングを自由に記述することができます.
実際に私が経験したユースケースとしては、特定のイベントが来るまでwindowを継続したい、かつ一定時間ごとに結果を出力したい、という要件がありました. その時は、上記のGlobal windowとカスタムTriggerを組み合わせて対応しました.
EventTimeを使う
EventTimeを使う場合の書き方にも少しだけ触れておきます. EventTimeというのは各イベントの発生時刻です. デフォルトであるProcessing Timeの場合はイベント到着時のシステムの実時間を使用してWindowなどの処理を行うのに対し、EventTimeの場合はイベントの時間を使います. そして、EventTimeを使う場合はWatermarkという概念も入ってきます. これは、ストリーム処理エンジン内の時間が今どこまで進んでいるか、というのを表すものです. EventTimeを使用することで、例えばログのリプレイを行うケース等でも正確な処理を行うことができます.
FlinkでEventTimeを使うために必要な要素は3つです.
- EventTimeを使うよ、という宣言
- 飛んできたイベントに対して、EventTimeを割り当てる方法の定義
- Watermarkの定義
1.は以下のようにジョブの冒頭で以下のように宣言します. デフォルトはProcessingTimeです.
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
2と3についてはストリーム処理を記述する中で、以下のようにassignTimestampsAndWatermarks
メソッドを使ってEventTimeの割り当て方、およびWatermarkをの決め方を定義したクラスを指定します. (もしくは、ストリームにデータを取り込む処理の中で、同様のことを行うケースもあります)
stream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())
MyTimestampAndWatermarks
は、以下2つのメソッドを実装します.
// 各イベントに対してEventTimeを割り当てるメソッド
def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long
// エンジンが、現在のWatermarkを取得する際に呼ばれるメソッド. デフォルトだと200ms間隔で呼ばれる
def override def getCurrentWatermark(): Watermark
実際には幾つか事前定義されているクラスがあったり、他のバリエーションがあったりしますが、そのあたりは公式ドキュメントを見て頂ければと思います.
また、EventTimeを使う場合は、window assigner等一部のクラスをEventTime用のものに変更する必要があります. 例えばTumblingProcessingTimeWindow
→TumblingEventTimeWindow
のようになります.
まとめ
本記事では、Apache FlinkのWindow周りのAPIを紹介しました. ストリーム処理を記述する上でのプログラミングモデルは、Dataflow Model の論文で議論されていますが、その中で提唱されている手法の多くがFlinkでは実装されています.
また、Statefulな処理用のAPIもあるので、必要になりそうな要素は大体今の段階でそろっているのかな、と思っています.