今一度ストリーミング処理を考えてみようということで
Designing Data-Intensive Applications: The Big Ideas Behind Reliable, Scalable, and Maintainable Systems
を参照して整理しました。
event stream
- stream processingではrecordのことをeventと言ったりします
- eventはタイムスタンプを含む場合が多い
- eventの作成者はproducer, publisher, senderなどと呼ばれる
- eventの使用者はconsumers, subscribers, recipentsなどと呼ばれる
- あるeventは1人の作成者によって作られ、複数の使用者によって使用される
- 良くないevent streamの例
- producerは一般的なデータベースにeventを挿入
- consumersは定期的にポーリングして挿入されたデータを取得する
- 一般的なデータベースは差分データだけ取得するのに有効な仕組みではない。
アプリケーションは前回どこまでデータを取得したのか覚えてないといけない。
インデックスがない、効かない場合はポーリングのたびにフルスキャンが走る
messaging system
- eventが作られたらconsumersに通知がされる仕組みが良い。このような仕組みをmessaging systemと呼ぶ。
- eventを含むデータをmessageと呼んだりします
- Unix pipeやTCPコネクションは通知の仕組みを有するが、これらは1対1の通信であり、1対多の通信は不可能
- messaging systemでは以下のことを気をつけないといけない
- producerの方がcomsumersより早いスピードでeventを生成するときはmessageを破棄するか、バッファするか、バックプレッシャーするかしないといけない
- 何かがクラッシュした時にメッセージロストを許容できるのか、それともロストは許されないのかでアーキテクチャは変わる
producerとconsumersが直接messagingする方式
- 中間ノードは用意せずproducerが直接それぞれのconsumersと通信をする方式
- UDPのマルチキャストが良く使われている
- UDPだけだとunreliableなのでアプリケーションプロトコルでパケットロスを救う仕組みが必要
- ZeroMQはこの方式を採用している
- この方式は以下の点でdisadvantageがある
- パケットロスを救う仕組みを作りこむのが大変
- consumerが死んだ場合にproducerにリトライの仕組みを入れないといけない
- producerが死んだ場合はconsumersへ未送信のmessageをロストさせない仕組みを作りこむのが大変
メッセージキューを使う方式
- メッセージキュー(メッセージブローカーとも呼ぶ)はmessage streamを扱うのに最適化されたデータストア
- producersはメッセージキューにmessageを書き込み、consumersはメッセージキューからmessageを読む
- この方式にするとメッセージキューをdurableにしておけば耐障害性を担保できる
- この方式ではproducersとconsumersは非同期で処理が行われることになる
メッセージキューとデータベースの違い
- いくつかのメッセージキューは2phase commitを使ったりなどデータベースと同じ技術を使っていたりもするが違いは色々ある
- メッセージキューはconsumersにmessageが伝達されたらそのmessageは削除される。データベースはexplicitに削除しない限りデータは保存される
- データベースはセカンダリインデックスの付与や多様な検索が可能だがメッセージキューはパターンマッチングでmessageをfilterできるくらい
- データベースの検索はpoint-in-timeのsnapshotを見せる。メッセージキューは追加されたmessageを見せる
複数のconsumer
- 複数のconsumerが同じtopicを読むときに代表的なパターン
- Load balancing
複数のconsumerにmessageを振り分ける方式。messageの量が多くてconsumerの処理が追いつかないときに負荷分散目的で使う - Fan-out
同じmessageをそれぞれのconsumerに割り当てる方式。同じmessageを使うが処理は別々の時に使ったりする
- Load balancing
consumerがクラッシュした時
- メッセージロストを防ぐためにconsumersは明示的にmessageを受け取ったことをメッセージキューに伝える。メッセージキューに伝えることをacknowledgmentsという
- メッセージキューは全てのconsumersからacknowledgmentsを受け取っていないときはmessageを削除しない
- Load balancing方式であれば別のconsumerに再送する。ただし再送が行われる場合にはproducerがmessageを送った順序とずれる場合がある。(再送の間に後続のmessageがすでに送られている可能性もあるため)
- 順序がずれるとまずい場合はLoad balancing方式は使ってはダメ
メッセージキューの実装
-
AMQP/JMS-style
- メッセージキューはconsumerからacknowlegmentsを受け取った後、messageを削除する方式
- 古いメッセージを読む必要のない場合などに利用される
-
Log-based
- append-onlyでdiskにmessageを書いていく方式
- consumerはlogの最後の位置まできたらnotificationが来るまで待つ
- より高いスループットを実現するためにはlogをパーティショニングする
- 異なるパーティションは異なるノードに置くことも可能
- それぞれのパーティションでは順序保証のためにそれぞれのmessageにシーケンスナンバーを割り振る
- 異なるパーティション間ではシーケンスナンバーは共有しないので、パーティション間の順序保証はない
- Apache KafkaやAmazon Kinesis Streamsはこのような実装をしている
- ロードバランシングするときは各consumerに担当するパーティションを割り当てるのが通常だがパーティション数以上にconsumerを増やせないデメリットはある
- メッセージキューではデータベースレプリケーションと同じようにmessageにlog sequence numberを割り振って、consumerとの接続が切れ、再接続する時にデータロストなく読める仕組みがある
- append-onlyでdiskにmessageを書いていく方式
ディスク管理
- logはある一定の単位(セグメントと呼ぶ)で分割されディスクに保存される。古いセグメントは定期的に削除またはアーカイブストレージへ移動させる
- 非常に遅いconsumerがいる場合、セグメントを基本的に削除しないがディスクが一杯になりそうであれば削除を行う。これはcircular buffer, ring bufferと呼ばれる技術で
実装を行う
遅いconsumer対策
- consumerがどれだけ遅れているのかを監視しておき、遅れが顕著な場合はアラートをあげる。それを受けてオペレータが手動でconsumerの遅れ対策を行う。
データベースでのストリーム
- 現在は様々な種類のデータストレージが使われており(データベース、DWH、メッセージキューなど)それらを連携してシステムを作ることが多い。その時に必要になるのがデータ連携。一般的にデータ連携ではETLが使われる
- ETLでは時としてデータベースからデータを取得する時にデータのフルコピーを取ることがある。しかしこれは更新データ以外も含むため明らかに非効率。
- 回避する手段としてdual writesという方法が使われることがある。これはアプリケーションが2つのシステムに書き込む方法。ただ、この方法はrace conditionが発生する可能性があることに注意しないといけない。また片側のシステムに書き込みは成功したが、もう一つには失敗した場合の制御などをアプリケーション側で行うのは非常に厳しい。
Change Data Capture(CDC)
- これらの問題を解決する方法としてChange Data Captureが出てきた。これはデータベースのトランザクションログをデータ連携する技術。
- トランザクションログは今まで仕様が公開されていなかったが、APIを使ってトランザクションログを取得できるようにしたのがChange Data Capture。
- 更新データを取得する方法としてはトリガーを定義してデータベースへの全ての更新操作を特定のテーブル(changelogテーブルと言ったりする)に反映するやり方もあるが、これはクエリを2倍実行することになるのでパフォーマンスオーバーヘッドが大きい
- 初期の構築としてはデータベースのスナップショットをとり、コピー先のシステムに反映させる。スナップショットはログのオフセットが記録されているので、CDCでデータを反映させる時にどのログから反映させれば良いかわかる。
- あるプライマリキーを持つレコードが複数回更新された時に最新のログだけを反映すればデータの整合性は保たれるため、最新のログだけを持つようにcompactionが行われる
Event Sourcing
- Event Sourcingはeventという形で更新操作を追加していくシステムデザインアーキテクチャ
- アプリケーション側で行うので、更新操作を追加していくという点ではCDCと同じだが実現するレイヤが異なる
Processing Streams
- ストリーム処理では何ができるか。バッチとの違いはストリーム処理は終わりがないこと。そのため、ソートなど終わりがないデータに対してはできない
- CEP(Complex Event Processing)では複雑なパターンマッチングをすることができる
- 特定のタイムスパン(windowと呼ぶ)で集計を行う
- 時刻はノードのローカル時刻を使うことが多い
- しかし障害などでイベントの発生時刻とそのイベントが処理されるノードのローカル時刻が大きく異なる場面が出ることがある
- イベントの発生時刻でwindowすることもあるが、それだと全てのイベントがいつ到着するかは保証することができない
- そのため特定の時間だけ待った後にwindowを完了させる。完了した後にイベントがきた場合は以下の2つのどちらかを選択する
- イベントを捨てる
- 更新されたデータにイベントを付加する
- windowのタイプ
- Tumbling Window
固定の時間のwindowで特定のイベントは必ず1つのwindowに入る。例えば5分間隔で09:00 - 09:04, 09:05 - 09:09, 09:10 - 09:14のようにwindow処理が行われる - Hopping Window
固定の時間のwindowだがTumbling Windowとは違いwindowがオーバーラップする。例えば5分のwindowで1分間隔の場合、09:00 - 09:04, 09:01 - 09:05, 09:02 - 09:06のようにwindow処理が行われる - Sliding Window
イベントの発生を契機にイベントの発生時刻からX分前の間をwindowとして処理する。Apache beamでいうSliding WindowはここでのHopping Windowのことです - Session Window
ユーザセッションごとにwindowを定義する。そのため、windowの時間間隔は事前には決まっておらず、ユーザセッションの長さに依存する
- Tumbling Window
stream joins
- stream-stream joins(window join)
- window同士をjoinする方式
- 例えばCTRを見たい時に使う。検索のログと検索結果から何をクリックしたかのログをjoinする。
- stream-table join
- イベントと固定のテーブルをjoinする方式
- 例えばイベントに含まれているユーザIDを使ってユーザのマスタ表からユーザ名などを取得する
- ストリームエンジンからデータベースに接続するやり方もあるが、これだとネットワーク通信によるオーバーヘッド、データベースの負荷上昇が起こる可能性がある。そのため、場合によってはこのテーブルをストリームエンジン内のメモリにハッシュ表として持たせるなどした方が良い
- table-table join
- 固定のテーブル同士をjoinする方式
- 何かのイベントが発生した時にキャッシュの更新などメンテナンス目的で使用したりする
- time-dependence join
- 例えばtax rateテーブル(tax rateが変わるたびに更新される)とイベントをjoinする時にどの時点のtax rateテーブルなのかで結果が変わる。このような時刻に依存するjoinがある
- joinをする時刻によって結果が変わるため非決定的であり好ましくない。解決する手順としてtax rateテーブルをhistoricalにし、各レコードにidentifierを割り当てる。このようにすればどの時点のデータがjoinされたかがわかる。
Fault Tolerance
- 処理が失敗してもexactly onceを保証するにはどうしたら良いか。バッチ処理に関しては失敗した場合はリトライすれば良いだけなので簡単。ストリーム処理に関してはいくつかの方法がある。
- microbatching
バッチ処理を細かいタイミング(1秒間隔など)で実行しストリーム処理に見せる方式。この方式であればバッチ処理と同じようにリトライの概念を適用できる。
microbatchingは必然的にtumbling windowになる
spark streamingはmicrobatchingである - checkpointing
あるタイミングで状態を保存するチェックポイントをストレージに保存する方式。バリア命令をメッセージストリームに流し、そのバリア命令をトリガにしてチェックポイントを行う。
Apache Flinkはcheckpointingを行う - atomic commit
downstreamに状態を渡す時にatomic commit(two-phase commitとか)を行い状態を確実に確定させる方式
Google Cloud Dataflow, VoltDBはatomic commitを行う
- microbatching
- 冪等性
- インクリメント処理は冪等ではない。冪等にするためにApache Kafkaではmessageのオフセットをもつ。ストリーム処理の中でオフセットを確認しすでに処理したmessageなのかどうかを判断して冪等性を維持する。
- 復旧
- Apache FlinkではスナップショットをHDFSなどの共有ストレージに保存する。node failureが起きた時に別のnodeがスナップショットを読み込み処理を再開する。
- Samza, Kafka streamsは状態をKafkaのtopicに保存する
- VoltDBは状態を別のノードにレプリケートする