本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
注意
2018年の記事です。
Databricksランタイム4.0で利用できます
いくつかの理由から、Apache Spark 2.0で構造化ストリーミングは自身の高レベルAPIからマイクロバッチ処理を分離しました。第一に、APIによるエクスペリエンスをよりシンプルなものにしました。APIはマイクロバッチを考慮する必要がありませんでした。第二に、開発者はストリームを無限のテーブルとして取り扱い、静的なテーブルを操作するのと同様にクエリーを発行することができました。
このメリットを活用するために、Databricksレイクハウスプラットフォームの一部として利用できるDatabricks Runtime 4.0のApache Spark 2.3で連続モードと呼ばれる新たなミリ秒レベルの低レーテンシーモードのストリーミングに取り組みました。
この記事では、連続処理モードの使用法、メリット、ミリ秒の低レーテンシー要件を持つ連続ストリーミングアプリケーションの記述するために、開発者がどのように活用するのかを説明します。動機づけとなるようなシナリオからスタートしましょう。
低レーテンシーのシナリオ
不正のあるクレジットカードトランザクションを特定するためにリアルタイムパイプラインを構築したいものとします。理想的には、容疑者がクレジットカードをスワイプしたらすぐに、不正なトランザクションを特定し、拒否したいと考えます。しかし、お客様を怒らせることになるので、適切なトランザクションを遅延させたくはありません。これは、我々のパイプラインのエンドツーエンドのレーテンシーに対して厳密な上限値を設定することになります。取引には他の遅延要素もあるため、パイプラインはそれぞれのトランザクションを10-20msで処理しなくてはなりません。
構造化ストリーミングでこのパイプラインを構築してみましょう。不正なトランザクションを特定することができるユーザー定義関数isPaymentFlagged
があるものとします。レーテンシーを最小化するために、遅延のないように可能な限り高速にそれぞれのマイクロバッチをスタートさせるようにSparkに指示する0秒の処理時間トリガーを使用します。ハイレベルでは、クエリーは以下のようなものになります。
payments \
.filter("isPaymentFlagged(paymentId)") \
.writeStream \
{...}
.trigger(processingTime = "0 seconds") \
.start()
このサンプルノートブックをダウンロードし、ご自身のDatabricksワークスペース(Databricksコミュニティエディションを使うこともできます)にインポートすることで完全なコードを参照することができます。エンドツーエンドのレーテンシーがどのようになるのかを見てみましょう。
レコードはSparkを通じて100ms以上かかって流れています!多くのストリーミングパイプラインではこれは問題ありませんが、このユースケースでは不十分です。連続処理モード(Continuous Processing mode)は助けになるのでしょうか?
payments \
.filter("isPaymentFlagged(paymentId)") \
.writeStream \
{...}
.trigger(continuous = "5 seconds") \
.start
これで、1ms以下のレーテンシーとなり、2桁もの改善を成し遂げターゲットのレーテンシーを下回りました!マイクロバッチ処理のレーテンシーがこれほど高く、連続処理が助けになったのかを理解するために、構造化ストリーミングエンジンの詳細を見ていく必要があります。
マイクロバッチ処理
構造化ストリーミングはデフォルトでマイクロバッチ実行モデルを採用します。これは、Sparkストリーミングエンジンは定期的にストリーミングのソースをチェックし、最後のバッチが終了した後に新たに到着したデータに対してバッチクエリーを実行することを意味します。ハイレベルでは以下のようになります。
このアーキテクチャでは、クエリーを再起動するために使われることがある先行書き込みログにレコードのオフセットを保存することで、ドライバーが進捗のチェックポイントを作成します。決定論的再実行とエンドツーエンドのセマンティクスを得るために、マイクロバッチがスタートする前に、次マイクロバッチで処理されるレンジオフセットがログに保存されることに注意してください。このため、ソースで利用可能なレコードはオフセットが記録される前に現在のマイクロバッチの完了と次のマイクロバッチが処理をするのを待つ場合があります。レコードレベルでは、タイムラインは以下のようになります。
結果として、ソースでイベントが利用できるようになる時刻と、シンクに出力が書き込まれるまでの間には、ベストでも100ミリ秒のレーテンシーが発生することになります。
当初我々は、パフォーマンスがすでに最適化されたSpark SQLにおける既存のバッチ処理エンジンを容易に活用できるように、このマイクロバッチエンジンで構造化ストリーミングを開発しました(コードジェネレーションやプロジェクトTungstenに関する過去の記事をご覧ください)。これによって、100msというレーテンシーで高いスループットを実現することができました。過去数年を通じて、数千の開発者と共に数百の異なるユースケースに取り組むことで、ETLやリアルタイムモニタリングのような多くの実践的なワークロードにおいては秒レベルのレーテンシーは十分なものであることを知りました。しかし、いくつかのワークロードにおいては、さらに低いレーテンシーによって利益を享受することができ、そして、これが連続処理モードの開発のモチベーションとなりました。この動作原理を見ていきましょう。
連続処理
連続処理モードでは、定期的なタスクを起動するのではなく、Sparkは連続的にデータの読み込み、処理、書き込みを行う長時間稼働する一連のタスクを起動します。ハイレベルでは、環境とレコードレベルのタイムラインは以下のようになります(上のマイクロバッチ実行の図と対比してみてください)。
ソースでイベントが利用できるようになるとすぐに処理されシンクに書き込まれるので、エンドツーエンドのレーテンシーは数ミリ秒となります。
さらにクエリーの進捗は、よく知られるChandy-Lamportアルゴリズムのを用いてチェックポイントが作成されます。すべてのタスクの入力データストリームに特殊なマーカーレコードが挿入されます。我々はこれらを「エポックマーカー(epoch marker)」と呼び、これらの間のギャップを「エポック」と呼びます。マーカーがタスクに遭遇すると、タスクは最後に処理されたオフセットを非同期的にドライバーに報告します。ドライバーがシンクに書き込むすべてのタスクからオフセットを受け取ると、上述した先行書き込みログに書き込みます。チェックポイント作成は完全に非同期処理となるので、タスクは邪魔されずに処理を継続でき、一貫性のあるミリ秒レベルのレーテンシーを提供することができます。
Apache Spark 2.3.0での実験的リリース
Apache Spark 2.3.0で、この連続処理モードは実験的機能となっており、構造化ストリーミングソースのサブセットとデータフレーム/データセット/SQLオペレーションがこのモードでサポートされています。特に、以下の条件を満たすクエリーでオプションとして連続トリガーを設定することができます。
- Kafkaのようにサポートされているソースからの読み込み、Kafka、メモリー、コンソールのようなサポートされているシンクへの書き込み(メモリーやコンソールはデバッグに適しています)。
- mapのようなオペレーション(例: select、where、map、flatMap、filterのような選択、プロジェクション)のみを含む。
- 集計関数、
current_timestamp()
やcurrent_date()
のような現在時刻ベースの関数以外のSQL関数を含む。
詳細に関しては以下を参照ください。
- 現在の実装と制限の詳細については、Structured Streaming programming guide
- ミリ秒レーテンシーでのモデル予測をデモするSpark Summit Keynote Demo
まとめ
Apache Spark 2.3のリリースによって、開発者はレーテンシーの要件に応じて連続モード、マイクロバッチモードのいずれかのストリーミングモードを選択できるようになりました。デフォルトの構造化ストリーミングモード(マイクロバッチ)は多くのリアルタイムストリーミングアプリケーションで許容できるレーテンシーを提供しますが、ミリ秒規模のレーテンシー要件がある場合には、この連続モードを選択することができます。
DatabricksでこのContinuous Processing mode notebookをインポートして確認してみてください。