この記事をご覧の方に
この記事は下記記事の部分集合です。
徐々に出していくためにこの記事が存在していますが、すでに完全なものが存在していますので、これから読む方は下記をどうぞ。
前回の要約、【要約】The world beyond batch: Streaming 102 その1の続きです。
前回と同じく、一気に読んで訳したものですので、相応に粗く、用語の統一も多分ずれがあり、流れがわかればいい内容となっていますので、その前提で。
ただ、コメントは歓迎します。ここにまとめた私自身も理解できていない点が多々あると思いますので。
以後の内容はオライリーの記事のライセンスより、CC BY-NC-SA 1.0になります。
The world beyond batch: Streaming 102
Streaming 102
ここまではWindow化されたパイプラインをバッチ処理エンジン上で実行することについて見てきた。
だが、理想的にはこれらの結果はより低レイテンシで得たいし、無限のデータに対して親和性高く扱いたいだろう。
バッチ処理からストリーム処理エンジンに移行するのは正しい一歩だが、バッチ処理エンジンは各Windowのデータが終了したことを前提に処理を行っていた。
ストリーム処理に移行することによってそれがなくなるため、無限のデータを処理する際にWatermarksという概念が必要になる。
- When watermarks
Watermarksは「When in processing time are results materialized?」という質問に対して、半分の回答となる。
WatermarksはEventTime領域において、入力の完了度合いを示す一時的な概念となる。
言い換えれば、これはEventTimeに関連したシステムの処理進捗度合いを示す。
Streaming 101にあった図(Watermarkを追記)を振り返る。
これは主な実システムの分散データ処理システムにおいて汎用的に生じるEventTimeとProcesingTimeのずれを示したもの。
Figure 5. Event time progress, skew, and watermarks.
上記の中の部分にひかれている赤い線が実際にWatermarkを示したものとなる。
WatermarkはEventTimeに対するProcessingTimeの進捗を示す。
概念的に、Watermarkは関数として考えることができる。つまり、 $F(P) -> E$という形でProcessingTimeを指定するとEventTimeを返す。
(更に明確に言うと、関数の入力は実際の上流から流れてくるデータそのものであり、Watermarkはその値をどこまで観察して処理したかということになる。
ただ、ここでは単純化のためEventTimeに対するProcessingTimeの進捗として進める。)
EventTimeの視点で見ると、Eはシステムはそれまで入力して処理したデータの中で、これ以上EventTimeの小さいデータは全て処理したはずだと考える値となる。
この値はあくまでシステムがそう考えているだけで、実際の値は不明となる。
Watermarkの種別として、完璧な場合(Perfect Watermark)は完全な保証、ヒューリスティックな値(Heuristic Watermark)の場合は推測となる。
- Perfect Watermark
- 入力データに対して完全な情報を取得できる場合にのみ、構築可能。
- このようなケースにおいては、データは遅れることはなく、早く到着するか、または時間通り到着する。
- Heuristic Watermark
- ほとんどの分散システムにおいて、入力データに対する完全な情報を取得するのは現実的ではない。
- そういったケースにおける次善策はHeuristic Watermarkの提供となる。
- その場合、入力データに対する情報(パーティション、内部ソート、ファイル数など)を基に出来るだけ正確な値を求める形になる。
- 多くのケースにおいては、このようなWatermarkはかなり正確な予測が可能。
- しかしながら、こういったWatermarkは時には誤ることもあり、データの遅れを招く。
- そのため、この後こういった遅れたデータに対応するための概念としてTriggersを扱う。
Watermarkについての議論は興味深く、困難なものとなる。
このWatermarkの調整についての議論については今後の記事をお待ちいただきたい。
この記事においては、Watermarkの役目と欠点について概要を掴むために、ストリーム処理上でいつ出力をするか決定するためにWatermarkを用いるListing2のWindowingパイプラインの2つの事例を示す。
Figure 6. Windowed summation on a streaming engine with perfect (left) and heuristic (right) watermarks.
上記の両ケースにおいて、WindowはWatermarkがWindowの終了時刻に到達したタイミングで出力している。
この2つの事例の大きな違いは、Heuristic Watermarkの方は「9」のデータについてWatermarkの算出に誤っており、このデータの存在によって両ケースのWatermarkの値は大きく変わってしまうこと。
この事例より、Watermarkは完全性に対する概念の他に、2つの欠点があることがわかる。
その2点について示す。
- Too slow
- 両種別のWatermarkにおいて、現状処理されていないデータが存在することにより、遅れが発生する。この遅れはそのまま出力の遅れに直結する。
- このことは左の図で9が遅れて到着することにより、対象のWindow出力が遅れることからわかる。既に、それ以後のWindowでデータがそろっているにも関わらず。
- 2つ目のWindowについては最初のデータが到着してから7分近く経過してから結果が出力されることになってしまう。
- 対して、この事例におけるHeuristic Watermarkはそこまで影響を受けることはない。だが、影響を受けないわけではない。
- ここでの重要なポイントは下記。
- Watermarkが完全性に対して有用な性質を提供し続ける限り、遅れという視点では理想的な結果にはならない。仮に必要なメトリクスを出力するダッシュボードが存在し、時間か日でWindow分割された事例を考えてみよう。
- この場合、今のWindowの値を見る際に、対象の時間帯が全て終了してから初めて結果が見れるというのは使いにくいはずだ。
- これがこのようなシステムを伝統的なバッチシステムで処理する際の課題の一つであり、その代わりに、該当の時間帯のデータを入力データが到着する度に更新し、最終的に完全な値になるというのはより優れたシステムとなるだろう。
- Too fast
- Heuristic Watermarkが実際にあるべき状態より先に進んでしまった場合、遅れデータが発生する可能性がある。
- これは実際に右の事例を見ていただければわかるだろう。WatermarkがはじめのWindowのデータが全て到着する前に進んでしまっているため、本来出力される14でなく、5が出力されてしまっている。
- この問題点は深刻なものであり、実際のところ、Heuristic Watermarkを決定すると、時々誤りは発生する。その結果、正確性を重視する場合は注意が必要になる。
Streaming 101において、無限のデータストリームに対する堅牢なOut-of-order処理を構築するためには完全性の概念が問題になることを記述した。
これらの2つの欠点、早すぎることと遅すぎることの事象がこの根本問題となる。
完全性を重視する場合、完全性と低レイテンシを容易に実現することは出来ない。
この課題に対応するためにTriggerが存在する。
- When The wonderful thing about triggers, is triggers are wonderful things!
TriggersはWhen in processing time are results materialized? に対する回答の後半となる。
Triggersを用いた場合、Windowの出力タイミングはそのデータを処理したタイミングに出力すべき、となる。
各Windowに対応する出力は半透明の矩形として記述される。
Triggerを実行する際のシグナルの例は下記の通り。
-
Watermark progress (i.e., event time progress
- これは暗黙的な事例であれば、Figure 6で述べられているが、WatermarkがWindowの最後まで来た段階で出力すべきというもの。
- 別の使用方法としては、Windowの有効期間を超過したらGCを起動しようというもので、これも後で振り返る。
-
Processing time progress
- これはProcessingtimeは常に一定ペースで進捗するため、定期的な実行には適している。
-
Element counts
- これはWindowで一定の有限個数のデータを処理したタイミングで実行するのに有効
-
Punctuations
- またはデータに依存したTriggerとなる。これは特定のデータ(ファイルや行の区切り)を検出した場合に出力を行うということ。
これらのシンプルなTriggerに追加して、より複雑な複合Trigger実行ロジックを作成することも可能。
複合Triggerの例は下記の通り。
-
Repetitions
- 特にProcessingTimeTrigerと組み合わせることで定期的な更新をすることに適する。
-
Conjunctions(AND)
- 子Triggerがすべて実行したタイミングで実行される。
- 例えば、WatermarkがWindow終了を過ぎ、かつ区切りのレコードを受信した場合など
-
Disjunctions(OR)
- 子Triggerのいずれかが実行したタイミングで実行される。
- 例えば、WatermarkがWindow終了を過ぎるか、または区切りのレコードを受信した場合など
-
Sequences
- あらかじめ定義された子Triggerが一定個所まで実行されたタイミングで実行される。
Triggersの概念をもう少し明確化してみよう。
これはFigure6、Listing2に対して、デフォルトの暗黙的なTriggerを追加したもの。
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(AtWatermark()))
.apply(Sum.integersPerKey());
Listing 3. Explicit default trigger
Triggersが実際のところ何を提供しなければならないか、を考えると、Too slowなケースとToo fastのケースにおいて発生する問題にどう対処できるかが浮かぶ。
両ケースにおいて、本質的に提供したいのは、Windowの終了時刻を過ぎる前、または後にWindowに対して何かしらの、一定ルールに従ったソートと、更新手段を提供すること。
(さらに、その更新時にウィンドウの終端を通過した旨の通知を受け取りたい。)
そのため、我々は繰り返しのTriggerに対して何かしらのソートが欲しくなる。
ただ、そこで生じる疑問として、「何を繰り返すのか?」がある。
Too slowなケースへの対処(早期に投機的な結果出力を提供)において、我々はWindow出力のために何かしらの出力基準を設定する必要がある。(出力時、データが実際は完全でないことが明確だとしても。)
例えば、Processing Timeが一定時間進んだタイミングで出力する方式は実際のWindowに対して観察されたデータ量に依存することがないため、比較的適切な対処となる。
最悪なケースであっても、定期的なTrigger実行による定常化された出力を得ることができる。
Too fastなケースへの対処(Heuristic Watermark使用時に遅れデータの反映を提供)の場合、まずはHeuristic Watermarkに相応の正確性があるとする。
この場合、遅れデータがそう頻繁に発生することはないと思われるため、すぐに出力結果を修正してしまうというアプローチでいいと思われる。
結果、遅れデータを受信したタイミングで、出力を更新しても想定した頻度であれば特に性能的に大きな影響は出さないとなるだろう。
ただ、これはあくまで例であり、実際のシステムに応じて適切に調整は必要。
その上で、実際にシステムを構築する際にはこれまで出てきた複数のTriggersである、early、on-time、lateを組み合わせる必要が出てくる。
Cloud Dataflowではこの組みあわせをSequence
TriggerとOrFinally
Triggerによって実現可能。
これは親Triggerに子Triggerを設定し、子Trigger発火時に親Triggerも終了するもの。
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(Sequence(
Repeat(AtPeriod(Duration.standardMinutes(1)))
.OrFinally(AtWatermark()),
Repeat(AtCount(1))))
.apply(Sum.integersPerKey());
Listing 4. Manually specified early and late firings
しかし、上記はいまいち長ったらしい。
加えて、このrepeated-early、on-time、repated-late発火パターンは非常に共通的なものとなる。
そのため、Cloud Dataflowでは追加API(意味論としては同じ)を設け、これらのパターンをより明確に、シンプルに記述可能としている。
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1))))
.apply(Sum.integersPerKey());
Listing 5. Early and late firings via the early/late API
このListing 4かListing 5をストリーム処理基盤で実行した場合、下記のように動作する。
Figure 7. Windowed summation on a streaming engine with early and late firings.
上記を見ると、Figure 6のものに比べて2つの明確な改善がみられる。
Watermarks too slowなケースが2個目のWindowで発生する場合:[12:02, 12:04)
この方式を取ることによって、早期の更新を1分毎に実行することが可能。
この違いはPerfect watermarkのケースで大きく表れ、元々7分近く遅延していたものが4分の時点で早期出力される。
また、Heuristic watermarkのケースでも改善が現れる。
両バージョンで、時間経過に伴って7>14>22と出力の更新が行われ、毎回のデータ更新から遅延が小さく更新され続ける。
Heuristic watermarks too fastなケースが1個目のWindowで発生する場合:[12:00, 12:02)
今回のケースにおいて9のデータが遅れデータとして到着した場合、出力結果が14に更新される。
この新たなTrigger群を導入したことによって生じる興味深い副作用として、出力パターンが両ケースで効果的に正規化される。
Figure 6の時点だと両ケースの出力に大きな違いがあるが、Figure 7になると両ケースでかなり共通のものとなる。
ただ、この事例において未だ残っている大きな違いは、Windowの生存期間。
Perfect watermarkのケースでは、Watermarkが過ぎた時点でWindowに対してデータが到着することはなく、内部状態をその時点で破棄することができる。
対して、Heuristic watermarksのケースでは遅れデータが到着した時のためにWindowの内部状態を保持し続ける必要がある。
しかし、現状構築できるシステムにおいて、Windowをどれだけの期間保持すればいいかを決める有効な手段は存在しない。
そのために、allowed lateness(許容遅れ値)という概念が必要となる。
- When : allowed lateness (i.e., garbage collection)
先ほどの問い(How do refinements of results relate?)に戻る前に、実用的な長時間動作するout-of-orderなストリーム処理システムにおけるGarbage collectionについて考えてみよう。
Figure 7のHeuristic watermarksのケースでは各Windowに対する内部状態値が例の間ずっと保持されている。
これはいつ到着するかわからない遅れデータが到着した際に適切に対応するために必要なものとなる。
だが、この状態値を実行時間が完了するまで無期限に保持し続けるのは、無限のデータソースを扱うと考える以上、現実的ではない。
最終的にはディスクやメモリから内部状態を削除する必要がある。
結果として、実世界においてout-of-orderな処理システムは何かしらの形でWindowの生存期間を区切る必要が出てくる。
一つの明確な方式として、システムに対して許容遅れ値を規定する方式がある。
これは許容される遅れ期間の間に到着したデータは処理するが、その期間を過ぎたデータについては単純に破棄するというもの。
一度個々のデータに対してどれだけの遅れを許容するかを決めれば、Windowの生存期間と、どれだけの期間維持する必要があるかが明確にできる。
もちろん、そうすることによってシステムのリソースの無駄を防ぐことが可能となる。
Allowed latenessとWatermarkの連携は微妙にわかりにくいため、例を用いて説明する。
Listing 5/Figure 7のHeuristic watermarksのケースに、許容遅れ値を1分として設定するケースを考えてみよう。
(なお、この1分とするのは図中でわかりやすくするためであり、現実のシステムにおいてはもっと大きな値にする必要が出てくるだろう。)
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.withAllowedLateness(Duration.standardMinutes(1)))
.apply(Sum.integersPerKey());
Listing 6. Early and late firings with allowed lateness
このパイプラインを実行すると、Figure 8のように動作する。
許容遅れ値の概念を加えたことによる変化は下記のようになる。
Figure 8. Windowed summation on a streaming engine with early and late firings and allowed lateness
現状のProcessingTimeを示す白い太線に対して、EventTimeに対する許容遅れ値を生存期間内のWindowに対応させて表示している。
Watermarkが遅れ許容値を過ぎると、該当のWindowはクローズされる。これは該当Windowの状態を破棄することを示す。
その遅れ許容値を白い点線での時刻範囲として示している。
遅れ許容値をWatermarkと対比するために、短い点線で時間地点を示している。
この図でのみ、1個目のWindowに追加の遅れデータである6を追加している。
6は遅れデータではあるものの、遅れ許容値の範囲に収まっている。そのため、1個目のWindowの出力値は6が到着した段階で11に更新される。
ただ、9は遅れ許容値を超過しているため、単純に破棄される。
更に、遅れ許容値について2つ補足する。
当然のことではあるが、もしPerfect Watermarkが使用可能な場合、遅れデータを扱う必要はないため、遅れ許容値は0秒が最適となる。
これについてはFigure 7を参照してほしい。
Heuristic watermarks環境下での遅れ許容値の必要性について、注目すべき一つの例外として、全時間を通して有限の個数に対する統計を取得するケースがある。
(例えば、サイト訪問者のブラウザ別の総数)
その場合、生存期間内のWindowの数は有限個となる。
その数が管理可能なレベルな値であれば、Windowの生存期間を遅れ許容値を用いて区切る必要はなくなる。
では、4つ目、最後の質問に移ろう。
- How : accumulation
(※ここでのペインとは、あるWindowに対する結果算出のスナップショットのようなものを示す。図中では半透明の枠。)
TriggerがあるWindowに対してペインを更新して用いるために使用される場合、最後の問いに直面することになる。
How do refinements of results relate?
これまで見てきた例においては、ペインは遅れデータが到着する度に、前のペインを基に更新されてきた。
しかしながら、実際には遅れデータが到着した時にとり得る累積計算方式としては代表的なものとして下記の3つがある。
-
Discarding
- ペインが具体化した時に保持した状態は破棄される。これはつまり、連続したペインは前のペインの状態とは無関係に決定されるということ。
- このモードは下流のシステムが自前で何かしらの累積計算の機構を保持している場合に有用。例えば、ペインが前回との差分を送信することで、外部システムはその値を合計して最終結果とするなど。
-
Accumulating
- Figure7において、ペインが具体化した場合にはその状態はそのまま保持され、追加の入力があった場合、既存の状態に対して累算する形を取っていた。
- これは、連続したペインは前のペインを基に更新されるということ。
- このモードは新しい結果で古い結果を上書きすればいいケースにおいて有用。例えば、BigTableのようなKVSに結果を出力するケースなど。
-
Accumulating & retracting
- これは累算のモードに似ているが、新たなペインを生成する時には前回の値を基にした相殺するための差分値(後退値)を出力するというもの。
- 後退(前の値と今回の値を基にした結果)の方式で基本的な考え方として、「以前は結果をXとしたが、それは間違っていた。Xの代わりにYで更新するよ。」となる。
- この方式は下記の2つのケースのような場合に有効となる。
- 下流のシステムが違う軸でデータを再グルーピングする場合、新しい値が古い値とは違うキーに紐づいて更新される場合がある。その場合、後退値があれば対応可能。このケースにおいては、新しい値は単純に古い値を上書きするという形にはならず、古いグループから古い値を取り除き、新しいグループに移す必要がある。
- 動的Window(セッションなど)を使用する場合、新しい値は単に前の値を更新するだけでなく、前のWindowの値を統合するケースが発生する。このケースにおいては、単に古い値を新しい値で置き換えるだけでは対応が困難で、特定の古いWindowを新しいWindowと再統合するためにこういった対処が必要となる。
複数のモードを比較することで、各モードの違いを明確にする。
Figure 7の2つ目のWindow([12:02, 12:04))を考えてみよう。
下記の表は、3つの累算モードでどのような結果になるかを示したもの。
尚、Figure 7の動作中においてはAccumlatingのモードを使用している。
Table 1. Comparing accumulation modes using the second window from Figure 7.
-
Discarding
- 各ペインは到着した値を現在の値として使用している。そのため、最終的な出力値は合計値とは異なる。
- しかし、このペインの出力値を全て合計した場合、実際の合計値と同じ22となる。
- そのため、下流のシステムが自前で合計やソートを行う機構を持つ場合、この方式が有用となる。
-
Accumulating
- 各ペインはそれまでに到着した値の総計地となる。そのため、最終的な出力値が合計値となる。
- もし各ペインの出力値を合計していった場合、1ペイン目と2ペイン目出力時の値が重複カウントされ、結果は51となってしまう。
-
Accumulating & retracting
- 各ペインは出力値と、前のペインの値を基にした後退値が併せて出力される。
- そのため、最終的な値と、あとは各ペインの出力値を合計した場合、両方正しい結果となる。
実際にDiscardingモードとする場合、コードを下記のようにする。
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.discardingFiredPanes())
.apply(Sum.integersPerKey());
Listing 7. Discarding mode version of early/late firings
同様の入力データを基にHeuristic watermarksのDiscardingモードで実行した場合、結果はFigure 9のようになる。
Figure 9. Discarding mode version of early/late firings on a streaming engine.
図の構成自体はFigure 7と同じだが、ペインを出力した段階で内部値がクリアされる。
そのため、ペインの出力値は前のペインの出力値とは独立した値となる。
Accumulating & retractingモードで実行したい場合、同様に下記のように修正すればいい。
(ただ、この後退値モードは現状Google Cloud Dataflowでは開発中であり、API名称なども変わる可能性がある。)
PCollection<KV<String, Integer>> scores = input
.apply(Window.into(FixedWindows.of(Duration.standardMinutes(2)))
.triggering(
AtWatermark()
.withEarlyFirings(AtPeriod(Duration.standardMinutes(1)))
.withLateFirings(AtCount(1)))
.accumulatingAndRetractingFiredPanes())
.apply(Sum.integersPerKey());
Listing 8. Accumulating & retracting mode version of early/late firings
同様の入力データを基にHeuristic watermarksのAccumulating & retractingモードで実行した場合、結果はFigure 10のようになる。
Figure 10. Accumulating & retracting mode version of early/late firings on a streaming engine
図の構成自体は同じだが、出力値の形式が合計値と後退値の2つの出力を取るようになっている。
これらの3つのモードを並べて比較するとFigure 11のようになる。
Figure 11. Side-by-side comparison of accumulation modes: discarding (left), accumulating (center), and accumulating & retracting (right).
図からわかるように、図の左側のモードから、より保存領域や計算コストがかかるようになっている。
(discarding, accumulating, accumulating & retractingの順)
そのため、どのモードを選択するかについて考えると、正確性、遅延、コストという新たなトレードオフの軸が発生する。
中間まとめ
このように、挙げられた4つの問いについて答えてきた。
- What results are calculated? > Answered via transformations.
- Where in event time are results calculated? > Answered via windowing.
- When in processing time are results materialized? > Answered via watermarks and triggers.
- How do refinements of results relate? > Answered via accumulation modes.
しかし、現状1つの形式のWindow方式:EventTimeベースの固定長Windowについてしか見ていない。
Streaming 101であったように、Window方式は複数存在する。
この後、ProcessingTimeベースの固定長Window、Sessionについて見ていこう。
著者: Tyler Akidau
本投稿: kimutansk
記事自体が長いので、再度これで区切ります。
続きは次の投稿で。