Posted at

Apache FlinkのWindow周りAPIを紹介

More than 1 year has passed since last update.

Distributed computing (Apache Hadoop, Spark, ...) Advent Calendar 2016 15日目の記事です. Apache FlinkのStreaming APIにおけるWindow周りの機能を紹介してみます. https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html に書いてあるような内容です.

ベースにしているFlinkのバージョンは1.1です. 例などはScalaで書きますが、Javaでも大体同じような感じです.


はじめに

ストリーム処理では、データは終わること無く永遠に流れてきます. 従って、そこに対して計算を行うためには、何かしらの手段で計算対象のデータを区切ることが必要になってきます. この、永続的に流れるデータを区切るための考え方がWindowです. 例えば、"過去1分間"とか、"過去の10個のイベント"とかになります.

さらに、実際に処理を行う上ではWindowそのものだけではなく、そのWindowの中身はいつ計算されるのか、どのように計算されるのか、といった要素も必要になってきます. 本記事では、それらを記述するためのFlinkのAPIを紹介します.

なお、"いつ"という要素については実時間(Processing Time)なのか、イベントの時刻(Event Time)なのか、という話もあるのですが、それについては既にAdvent Calendarの別の日の記事になっている(http://qiita.com/sitotkfm/items/812766a4a4cf5e6a8501) のでそちらも読んでいただけると良いと思います. FlinkでのEventTimeの使い方は本記事の最後に記載します.


Window関連APIの概要

Apache FlinkでWindowを利用した処理を書く場合、以下のような書き方をします.

val input: DataStream[T] = ...

input
.keyBy(<key selector>)
.window(<window assigner>)
.trigger(<trigger>)
.<windowed transformation>(<window function>)

それぞれの要素の役割は以下の通りです.



  • keyBy(<key selector>)


    • ストリームを何かしらのキーで分割します. キーは、集計時のGROUP BYに相当します. 例えばPVだったらサイトのIDとかですね

    • 基本的に各Windowはキーごとに生成されます. (そうでない処理も書けますが、分散処理はできません)




  • window(<window assigner>)


    • 入力のデータが、どこのwindowに属するかを決めます




  • trigger(<trigger>)


    • どのタイミングでWindowの評価を行い結果を出力するか、またWindowの内容を削除するか、を決めます

    • 明示的に書かなくても良いケースもあります




  • <windowed transformation>(<window function>)


    • Window内のデータに対する処理を書く部分です




Window Assigner

Window Assignerは、入力データがどのwindowに属するかを決めます. Flinkでサポートされているwindow assignerは以下の4つがあります. 画像は公式ドキュメントへのリンクです.


Tumbling Windows

例えば10:00〜11:00, 11:00〜12:00, ...のように固定の長さを持ち、お互いに重なり合わない形のwindowです. Windowのサイズは時間またはイベント数で決まります. イベント数の場合は、例えばサイズが10であれば、1〜10番目のイベントが属するWindow, 11〜20番目のWindow, という形になります.

Tumbling Window

Tumbling Windowを使う場合は、以下のように書きます.

// 時間ベースのWindow

stream.timeWindow(Time.hours(1))
// または
stream.TumblingProcessingTimeWindow.of(Time.hours(10))

// イベント数ベースのWindow
stream.countWindow(10)


Sliding Windows

Tumbling Windowのように一定のサイズを持ちますが、それとは別にスライド幅、というパラメータを持ち、その幅ずつスライドしていくwindowです. 従ってwindow同士が重なり合います.

例えば時刻ベースで、windowサイズ1時間, スライド幅10分であれば、10:00〜11:00, 10:10〜11:10, 10:20〜11:20,...のように、各windowが重なり合いながら、スライド幅ずつずれていきます. 時刻ベースまたはイベント数ベースのwindowを使用することができます.

Sliding Window

// 時間ベースのWindow. 第二引数にスライド幅を取る.

stream.timeWindow(Time.hours(10), Time.minutes(10))
// または
stream.TumblingProcessingTimeWindow.of(Time.seconds(10))

// イベント数ベースのwindow.
stream.countWindow(10, 2)


Session Windows

固定幅ではなく、イベントが飛んでくる限り終了しないwindowです. 一定時間イベントが無いと、windowが終了します. Webアプリケーションのセッションのようなイメージのものです. どれくらいの時間イベントが無い場合にwindowを切るか、というパラメータ(session gap)を持ちます.

stream.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))

Session Window


Global Windows

上記のどれとも異なり、ずっと終了しない、無限のサイズを持つwindowです. 時系列のどこで来たイベントであっても、同じキーであれば同じwindowに所属します.

stream.window(GlobalWindows.create())

終了しない、ということはWindowを評価し、結果を出力するタイミングがありません. 従って、通常はtriggerを指定し、その中でwindow評価のタイミングを記述します.


Window Function

Window Assignerにより、計算対象のデータが属するwindowが決まりました. 通常は、これらに対して何かしらの計算をして、結果を導き出す、Aggregationをすることになります. その計算ロジックを決めるのがWindow Functionです. SQLで言うとselect文の中身、 sum, countなどの部分になります. 基本的には、何かしらの関数を記述する形になります.

例えば、(単語, 数) なタプルを集計するケースを考えます.

reduceを使って書くとこうなります. 2つの値をまとめて一つにする処理を書きます.

.reduce{ (a, b) => (b._2, a._1 + b._1)} 

foldだとこんな感じです. 初期値を設定し、そこに一つの値が来た時に、次の値をどのように計算するか、という形で処理を書きます. reduceよりももう少し書ける処理の幅が広くなります.

.fold("",0){ (r,i) => (i._1, r._1 + i._1 )}

一番自由度が高いのがapplyです. reduceやfoldと違い、そのwindowの対象になっているキーや、window自身の情報にもアクセスすることができます. 例えば、windowの開始/終了タイムスタンプを取得することもできます. また、Collector.collectを複数回呼ぶことで、一つのaggregation関数の中で複数の値を出力することもできます.

.apply(

{(key, window, records: Iterable[(String, Int)], collector: Collector[(String, Int)]) =>
val result = records.reduceLeft{ (a, b)=> (a._1, a._2 + b._2)}
collector.collect((result._1, result._2))
})

ただし、reduce, foldの場合はwindowにイベントが入ってくる都度集計結果が更新される、というインクリメンタルな処理になりますが、applyの場合は全てのイベントをWindow内に蓄積し、Windowを評価するタイミングで関数が実行される、という動きになります.

apply的にメタ情報へアクセスしつつ、reduce/foldのようにインクリメンタルに処理をする、ということもできますが、ここでは割愛します. 興味があれば冒頭にリンクしたドキュメントを見てみてください.


Trigger

さて、各イベントが所属するwindowが決まり、処理方法も決まったとして、出力が生成されるタイミングは何で決まるのでしょうか? それを決めるのがTriggerです. とは言え、Window Assignerを決めた時点で期待するTriggerの挙動が決まる場合も多いでしょう. 例えば、大抵の場合はWindowの終了時に出力が生成されることを期待すると思います. よって、各Window AssignerにはデフォルトのTriggerが決まっており、明示的に指定しなかった場合はそれが使われます.

カスタムのTriggerを書きたいユースケースとしては、例えば以下のようなものがあります

* Time windowは1日だけど、アップデートされた結果を1分ごとに出力したい

* Time windowでもCount windowでもなく、特定のイベントが来たタイミングをwindowの終了としたい

TriggerのAPIは以下のようになっています. イベントが到着したタイミングや、windowが終了したタイミングでそれぞれのメソッドが呼ばれ、各メソッドはCONTINUE(何もしない), FIRE(その時点でのWindowの内容を評価し、出力する), PURGE(評価せずにwindowの内容を削除する), FIRE_AND_PURGE(評価し、windowを削除する)のいずれかを返します.

// イベントが到着するごとに呼ばれる

abstract TriggerResult onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx)

// EventTimeを使っていて、指定した時間になった時に呼ばれる
abstract TriggerResult onEventTime(long time, W window, Trigger.TriggerContext ctx)

// ProcessingTimeを使っていて、指定した時間になった時に呼ばれる
abstract TriggerResult onProcessingTime(long time, W window, Trigger.TriggerContext ctx)

これにより、出力を生成するタイミング、windowからデータをクリアするタイミングを自由に記述することができます.

実際に私が経験したユースケースとしては、特定のイベントが来るまでwindowを継続したい、かつ一定時間ごとに結果を出力したい、という要件がありました. その時は、上記のGlobal windowとカスタムTriggerを組み合わせて対応しました.


EventTimeを使う

EventTimeを使う場合の書き方にも少しだけ触れておきます. EventTimeというのは各イベントの発生時刻です. デフォルトであるProcessing Timeの場合はイベント到着時のシステムの実時間を使用してWindowなどの処理を行うのに対し、EventTimeの場合はイベントの時間を使います. そして、EventTimeを使う場合はWatermarkという概念も入ってきます. これは、ストリーム処理エンジン内の時間が今どこまで進んでいるか、というのを表すものです. EventTimeを使用することで、例えばログのリプレイを行うケース等でも正確な処理を行うことができます.

FlinkでEventTimeを使うために必要な要素は3つです.


  1. EventTimeを使うよ、という宣言

  2. 飛んできたイベントに対して、EventTimeを割り当てる方法の定義

  3. Watermarkの定義

1.は以下のようにジョブの冒頭で以下のように宣言します. デフォルトはProcessingTimeです.

val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2と3についてはストリーム処理を記述する中で、以下のようにassignTimestampsAndWatermarksメソッドを使ってEventTimeの割り当て方、およびWatermarkをの決め方を定義したクラスを指定します. (もしくは、ストリームにデータを取り込む処理の中で、同様のことを行うケースもあります)

stream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

MyTimestampAndWatermarksは、以下2つのメソッドを実装します.

// 各イベントに対してEventTimeを割り当てるメソッド

def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long

// エンジンが、現在のWatermarkを取得する際に呼ばれるメソッド. デフォルトだと200ms間隔で呼ばれる
def override def getCurrentWatermark(): Watermark

実際には幾つか事前定義されているクラスがあったり、他のバリエーションがあったりしますが、そのあたりは公式ドキュメントを見て頂ければと思います.

また、EventTimeを使う場合は、window assigner等一部のクラスをEventTime用のものに変更する必要があります. 例えばTumblingProcessingTimeWindowTumblingEventTimeWindow のようになります.


まとめ

本記事では、Apache FlinkのWindow周りのAPIを紹介しました. ストリーム処理を記述する上でのプログラミングモデルは、Dataflow Model の論文で議論されていますが、その中で提唱されている手法の多くがFlinkでは実装されています.

また、Statefulな処理用のAPIもあるので、必要になりそうな要素は大体今の段階でそろっているのかな、と思っています.