LoginSignup
15
7

More than 5 years have passed since last update.

Distributed computing (Apache Hadoop, Spark, ...) Advent Calendar 2016 15日目の記事です. Apache FlinkのStreaming APIにおけるWindow周りの機能を紹介してみます. https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/windows.html に書いてあるような内容です.

ベースにしているFlinkのバージョンは1.1です. 例などはScalaで書きますが、Javaでも大体同じような感じです.

はじめに

ストリーム処理では、データは終わること無く永遠に流れてきます. 従って、そこに対して計算を行うためには、何かしらの手段で計算対象のデータを区切ることが必要になってきます. この、永続的に流れるデータを区切るための考え方がWindowです. 例えば、"過去1分間"とか、"過去の10個のイベント"とかになります.

さらに、実際に処理を行う上ではWindowそのものだけではなく、そのWindowの中身はいつ計算されるのか、どのように計算されるのか、といった要素も必要になってきます. 本記事では、それらを記述するためのFlinkのAPIを紹介します.
なお、"いつ"という要素については実時間(Processing Time)なのか、イベントの時刻(Event Time)なのか、という話もあるのですが、それについては既にAdvent Calendarの別の日の記事になっている(http://qiita.com/sitotkfm/items/812766a4a4cf5e6a8501) のでそちらも読んでいただけると良いと思います. FlinkでのEventTimeの使い方は本記事の最後に記載します.

Window関連APIの概要

Apache FlinkでWindowを利用した処理を書く場合、以下のような書き方をします.

val input: DataStream[T] = ...

input
    .keyBy(<key selector>) 
    .window(<window assigner>)
    .trigger(<trigger>)
    .<windowed transformation>(<window function>)

それぞれの要素の役割は以下の通りです.

  • keyBy(<key selector>)
    • ストリームを何かしらのキーで分割します. キーは、集計時のGROUP BYに相当します. 例えばPVだったらサイトのIDとかですね
    • 基本的に各Windowはキーごとに生成されます. (そうでない処理も書けますが、分散処理はできません)
  • window(<window assigner>)
    • 入力のデータが、どこのwindowに属するかを決めます
  • trigger(<trigger>)
    • どのタイミングでWindowの評価を行い結果を出力するか、またWindowの内容を削除するか、を決めます
    • 明示的に書かなくても良いケースもあります
  • <windowed transformation>(<window function>)
    • Window内のデータに対する処理を書く部分です

Window Assigner

Window Assignerは、入力データがどのwindowに属するかを決めます. Flinkでサポートされているwindow assignerは以下の4つがあります. 画像は公式ドキュメントへのリンクです.

Tumbling Windows

例えば10:00〜11:00, 11:00〜12:00, ...のように固定の長さを持ち、お互いに重なり合わない形のwindowです. Windowのサイズは時間またはイベント数で決まります. イベント数の場合は、例えばサイズが10であれば、1〜10番目のイベントが属するWindow, 11〜20番目のWindow, という形になります.

Tumbling Window

Tumbling Windowを使う場合は、以下のように書きます.

// 時間ベースのWindow
stream.timeWindow(Time.hours(1))
// または
stream.TumblingProcessingTimeWindow.of(Time.hours(10))

// イベント数ベースのWindow
stream.countWindow(10)

Sliding Windows

Tumbling Windowのように一定のサイズを持ちますが、それとは別にスライド幅、というパラメータを持ち、その幅ずつスライドしていくwindowです. 従ってwindow同士が重なり合います.
例えば時刻ベースで、windowサイズ1時間, スライド幅10分であれば、10:00〜11:00, 10:10〜11:10, 10:20〜11:20,...のように、各windowが重なり合いながら、スライド幅ずつずれていきます. 時刻ベースまたはイベント数ベースのwindowを使用することができます.

Sliding Window

// 時間ベースのWindow. 第二引数にスライド幅を取る.
stream.timeWindow(Time.hours(10), Time.minutes(10))
// または
stream.TumblingProcessingTimeWindow.of(Time.seconds(10))

// イベント数ベースのwindow.
stream.countWindow(10, 2)

Session Windows

固定幅ではなく、イベントが飛んでくる限り終了しないwindowです. 一定時間イベントが無いと、windowが終了します. Webアプリケーションのセッションのようなイメージのものです. どれくらいの時間イベントが無い場合にwindowを切るか、というパラメータ(session gap)を持ちます.

stream.window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))

Session Window

Global Windows

上記のどれとも異なり、ずっと終了しない、無限のサイズを持つwindowです. 時系列のどこで来たイベントであっても、同じキーであれば同じwindowに所属します.

stream.window(GlobalWindows.create())

終了しない、ということはWindowを評価し、結果を出力するタイミングがありません. 従って、通常はtriggerを指定し、その中でwindow評価のタイミングを記述します.

Window Function

Window Assignerにより、計算対象のデータが属するwindowが決まりました. 通常は、これらに対して何かしらの計算をして、結果を導き出す、Aggregationをすることになります. その計算ロジックを決めるのがWindow Functionです. SQLで言うとselect文の中身、 sum, countなどの部分になります. 基本的には、何かしらの関数を記述する形になります.

例えば、(単語, 数) なタプルを集計するケースを考えます.

reduceを使って書くとこうなります. 2つの値をまとめて一つにする処理を書きます.

.reduce{ (a, b) => (b._2, a._1 + b._1)} 

foldだとこんな感じです. 初期値を設定し、そこに一つの値が来た時に、次の値をどのように計算するか、という形で処理を書きます. reduceよりももう少し書ける処理の幅が広くなります.

.fold("",0){ (r,i) => (i._1, r._1 + i._1 )}

一番自由度が高いのがapplyです. reduceやfoldと違い、そのwindowの対象になっているキーや、window自身の情報にもアクセスすることができます. 例えば、windowの開始/終了タイムスタンプを取得することもできます. また、Collector.collectを複数回呼ぶことで、一つのaggregation関数の中で複数の値を出力することもできます.

.apply(
  {(key, window, records: Iterable[(String, Int)], collector: Collector[(String, Int)]) =>
    val result = records.reduceLeft{ (a, b)=> (a._1, a._2 + b._2)}
    collector.collect((result._1, result._2))
  })

ただし、reduce, foldの場合はwindowにイベントが入ってくる都度集計結果が更新される、というインクリメンタルな処理になりますが、applyの場合は全てのイベントをWindow内に蓄積し、Windowを評価するタイミングで関数が実行される、という動きになります.
apply的にメタ情報へアクセスしつつ、reduce/foldのようにインクリメンタルに処理をする、ということもできますが、ここでは割愛します. 興味があれば冒頭にリンクしたドキュメントを見てみてください.

Trigger

さて、各イベントが所属するwindowが決まり、処理方法も決まったとして、出力が生成されるタイミングは何で決まるのでしょうか? それを決めるのがTriggerです. とは言え、Window Assignerを決めた時点で期待するTriggerの挙動が決まる場合も多いでしょう. 例えば、大抵の場合はWindowの終了時に出力が生成されることを期待すると思います. よって、各Window AssignerにはデフォルトのTriggerが決まっており、明示的に指定しなかった場合はそれが使われます.

カスタムのTriggerを書きたいユースケースとしては、例えば以下のようなものがあります
* Time windowは1日だけど、アップデートされた結果を1分ごとに出力したい
* Time windowでもCount windowでもなく、特定のイベントが来たタイミングをwindowの終了としたい

TriggerのAPIは以下のようになっています. イベントが到着したタイミングや、windowが終了したタイミングでそれぞれのメソッドが呼ばれ、各メソッドはCONTINUE(何もしない), FIRE(その時点でのWindowの内容を評価し、出力する), PURGE(評価せずにwindowの内容を削除する), FIRE_AND_PURGE(評価し、windowを削除する)のいずれかを返します.

// イベントが到着するごとに呼ばれる
abstract TriggerResult  onElement(T element, long timestamp, W window, Trigger.TriggerContext ctx)

// EventTimeを使っていて、指定した時間になった時に呼ばれる
abstract TriggerResult  onEventTime(long time, W window, Trigger.TriggerContext ctx)

// ProcessingTimeを使っていて、指定した時間になった時に呼ばれる
abstract TriggerResult  onProcessingTime(long time, W window, Trigger.TriggerContext ctx)

これにより、出力を生成するタイミング、windowからデータをクリアするタイミングを自由に記述することができます.
実際に私が経験したユースケースとしては、特定のイベントが来るまでwindowを継続したい、かつ一定時間ごとに結果を出力したい、という要件がありました. その時は、上記のGlobal windowとカスタムTriggerを組み合わせて対応しました.

EventTimeを使う

EventTimeを使う場合の書き方にも少しだけ触れておきます. EventTimeというのは各イベントの発生時刻です. デフォルトであるProcessing Timeの場合はイベント到着時のシステムの実時間を使用してWindowなどの処理を行うのに対し、EventTimeの場合はイベントの時間を使います. そして、EventTimeを使う場合はWatermarkという概念も入ってきます. これは、ストリーム処理エンジン内の時間が今どこまで進んでいるか、というのを表すものです. EventTimeを使用することで、例えばログのリプレイを行うケース等でも正確な処理を行うことができます.

FlinkでEventTimeを使うために必要な要素は3つです.

  1. EventTimeを使うよ、という宣言
  2. 飛んできたイベントに対して、EventTimeを割り当てる方法の定義
  3. Watermarkの定義

1.は以下のようにジョブの冒頭で以下のように宣言します. デフォルトはProcessingTimeです.

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2と3についてはストリーム処理を記述する中で、以下のようにassignTimestampsAndWatermarksメソッドを使ってEventTimeの割り当て方、およびWatermarkをの決め方を定義したクラスを指定します. (もしくは、ストリームにデータを取り込む処理の中で、同様のことを行うケースもあります)

stream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks())

MyTimestampAndWatermarksは、以下2つのメソッドを実装します.

// 各イベントに対してEventTimeを割り当てるメソッド
def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long 

// エンジンが、現在のWatermarkを取得する際に呼ばれるメソッド. デフォルトだと200ms間隔で呼ばれる
def override def getCurrentWatermark(): Watermark

実際には幾つか事前定義されているクラスがあったり、他のバリエーションがあったりしますが、そのあたりは公式ドキュメントを見て頂ければと思います.

また、EventTimeを使う場合は、window assigner等一部のクラスをEventTime用のものに変更する必要があります. 例えばTumblingProcessingTimeWindowTumblingEventTimeWindow のようになります.

まとめ

本記事では、Apache FlinkのWindow周りのAPIを紹介しました. ストリーム処理を記述する上でのプログラミングモデルは、Dataflow Model の論文で議論されていますが、その中で提唱されている手法の多くがFlinkでは実装されています.
また、Statefulな処理用のAPIもあるので、必要になりそうな要素は大体今の段階でそろっているのかな、と思っています.

15
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
7