How to Speed Up Streaming Queries With Asynchronous State Checkpointing - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
背景 / モチベーション
ステートフルなストリーミングは、ステークホルダーによる膨大な量のデータに対する高度な要求の高まりを受けて、より一般的になってきています。しかし、ステートフルオペレーションの複雑性が、データのレーテンシーを引き上げるというトレードオフが存在しており、アクションの幅を狭めています。定期的なマイクロバッチのチェックポイント作成から状態の永続化プロセスを分離する非同期ステートチェックポイントは、構造化ストリーミングにおける2つの特質、高いスループットと信頼性を維持しながらも、処理のレーテンシーを最小化します。
詳細に踏み込む前に、我々がこのストリーム処理の機能を開発したのかに関して、いくつかのコンテキストとモチベーションを説明した方が良いでしょう。ストリーミングのパフォーマンスに対する主要なバロメーターに対する業界のコンセンサスは、パイプラインが一つのレコードを処理するのに要する絶対的なレーテンシーです。しかし、全体的なパフォーマンスの評価におけるより詳細な見方を説明したいと思います。単に1レコードのエンドツーエンドのレーテンシーを考慮するのではなく、信頼性のある方法で一定期間におけるスループットとレーテンシーの組み合わせを見ることが重要です。これは、特定のオペレーションのユースケースでは生の最小レーテンシーの絶対値を必要としないということを言っているのではありません。これらは適切かつ重要なものです。しかし、秒間200,000レコードや分間20Mレコードを処理する分析、ETLユースケースに適しているものでしょうか?これは常にユースケースに依存しますが、ストリーミングパイプラインにおいては、速度と同じくらいボリュームとコスト効率性が重要であると信じています。ストリーミングエンジンの実装においては、効率と非常に低いレーテンシーのサポートとの間には根本的なトレードオフが存在するので、我々はお客さまに対して、インクリメンタルなコストがデータレーテンシーの最小化に見合うのかどうかを決定するためのエクササイズを行うことをお勧めしています。
構造化ストリーミングのマイクロバッチの実行モデルは、高いスループット、信頼性とデータのレーテンシーの間のバランスを最適化しようとします。
高いスループット
概念的にストリーミングを考えてみると、ボリュームや速度に関係なく、すべての入力データは無限のものと考えられます。構造化ストリーミングのコンセプトを適用することで、すべてのクエリーは無限のデータフレームを生成すると考えることができます。内部では、境界のないデータフレームとして到着するデータを、Apache Spark™がこれもデータフレームであるより小さいマイクロバッチに分割します。これは2つの理由から重要であると言えます。
- エンジンはこれらのデータフレームのそれぞれに対して、バッチ/アドホッククエリーで利用できるのと同じ最適化技術を適用することができ、効率とスループっっとを最大化します。
- ユーザーに対して、バッチ/アドホッククエリーと同じシンプルなインタフェースと耐障害性を提供します。
信頼性
信頼性の観点においては、構造化ストリーミングはそれぞれのマイクロバッチの後にチェックポイントを書き出し、データソースから処理した内容の進捗、(集計やjoinの)中間状態、データシンクに書き込まれた内容を追跡します。障害発生や再スタートの際には、データが必ず一度のみ処理されることを保証するためにエンジンはこの情報を使用します。構造化ストリーミングは、障害発生後にクエリーが適切に復旧することを保証するために、これらのチェックポイントをいくつかのタイプの頑健性のあるストレージ(クラウドblobストレージなど)に格納します。ステートフルなクエリーにおいては、クエリーが適切な値を用いて再起動できるように、ステートフルなオペレーションに含まれるすべてのキーの状態をチェックポイントに書き出します。
データのレーテンシー
データボリュームの増加に従い、保持するキーの数と状態のサイズが増加し、状態管理をより重要なものにすると同時に、時間を要するものとなりました。ステートフルクエリーのデータレーテンシーをさらに削減するために、特にステートフルオペレーションに含まれるさまざまなキーのステートに用いる、非同期チェックポイントを開発しました。これを通常のチェックポイントプロセスから分離し、バックグラウンドスレッドとすることで、クエリーが次のマイクロバッチに進めるようにし、信頼性を維持しつつも、エンドユーザーがよりクイックにデータを利用できるようにしました。
動作原理
通常は、構造化ストリーミングは同期ステートチェックポイントを使用し、エンジンは次のマイクロバッチに進む前に通常のチェックポイント作成の一部として、ステートフルオペレーションに含まれるすべてのキーの現在の状態を書き出すことを意味します。このアプローチの利点として、ストリーミングクエリーが失敗した際にアプリケーションはストリームをクイックに回復することができ、失敗したマイクロバッチから再処理を行うだけで済むということが挙げられます。高速なリカバリーのトレードオフとして、通常のマイクロバッチ実行の処理時間が増加します。
これを説明するメタファーとして、パン屋でパン生地をこねることを考えてみます。パン屋さんは通常一つのパン生地をこねるのに両方の手を使用し、時間はかかりますが間違いをした時にはその一つの記事をやり直すだけで済みます。あるパン屋さんが一度に2つのパン生地をこねることにしたとします。この場合スループットは増加しますが、間違いがあった時には両方のパン生地をやり直す必要が出てきます。この例では、同期処理は一つの生地を作るのに2つの手を用いており、非同期処理は別々のパン生地をつくるのに2つの手を使っています。
ステートの更新でボトルネックが生じるクエリーにおいては、非同期ステートチェックポイントは、いかなる信頼性も犠牲にすることなしに、データのレー点シーを削減する低コストな方法を提供します。
候補クエリーの特定
非同期ステートチェックポイントは特定のワークロードにのみ有効であることを繰り返させてください。すなわち、全体的なマイクロバッチ実行のレーテンシーにおいて、ステートのチェックポイントのコミットのレーテンシーが支配的であるステートフルなストリーミングです。
良い候補を特定する方法を以下に示します。
- ステートフルなオペレーション: ウィンドウや集計、[flat]mapGroupsWithStateやストリーム・ストリーム間のjoinのようなステートフルオペレーションを含むクエリーです。
- ステートチェックポイントのコミットのレーテンシー: 全体的なマイクロバッチ実行時間におけるコミットのレーテンシーのインパクトを理解するために、ユーザーはStreamingQueryListenerのイベントでメトリクスを調査することができます。ドライバーのlog4jのログにも同じ情報が含まれています。
適切な候補クエリーを特定するためのStreamingQueryListenerイベントの分析方法の例を以下に示します。
Streaming query made progress: {
"id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19",
"runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe",
…
"batchId" : 0,
"durationMs" : {
"addBatch" : 519387,
…
"triggerExecution" : 547730,
…
},
"stateOperators" : [ {
…
"commitTimeMs" : 3186626,
…
"numShufflePartitions" : 64,
…
}]
}
上述の例には数多くの情報が含まれていますが、特定のメトリクスにフォーカスする必要があります。
- バッチ期間(durationMs.triggerExecution)は547秒周辺です。
- すべてのタスクにおける集計ステートの格納のコミット(stateOperators[0].commitTimeMs)に要する時間は、3186秒周辺となっています。
- ステート格納に関連するタスクの数(stateOperators[0].numShufflePartitions)は64であり、ステートオペレーターを含むそれぞれのタスクは、それぞれのバッチに平均50秒(3186秒 / 64タスク)を追加していることを意味します。64タスクが同時実行すると仮定すると、コミットのステップはバッチ期間の約9%(50秒 / 547秒)を占めることになります。同時実行タスクの最大数が64よりも少ない場合、このパーセンテージは増加します。例えば、同時実行タスクが32の場合、トータル実行時間の18%を占めることになります。
非同期ステートチェックポイントの有効化
Databricksランタイム10.4以降のクラスターを起動し、以下のSpark設定を使用します。
spark.conf.set(
"spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
"true"
)
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass", "com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)
注意すべき点がいくつかあります。
- 非同期ステートチェックポイントは、RocksDBベースのステートストアのみをサポートします。
- 非同期ステートチェックポイントの格納に関係する障害は、事前定義されたリトライの後にクエリーの失敗を引き起こします。この挙動は、Sparkがクエリーの実行前に失敗したタスクを複数回リトライする能力を持っている(タスクの一部として実行される)同期チェックポイント作成とは異なります。
ファイルとメッセージバスのソース両方における内製と顧客のワークロードの組み合わせに対するテストを通じて、数百万のエントリーを持つ大規模なステートサイズを持つストリームに対して、マイクロバッチ期間が平均25%まで改善できることを発見しました。個人的な経験によるものですが、ピークのマイクロバッチ期間(マイクロバッチを処理するためのストリームに要する最長時間)でさらなる改善を目撃しました。
まとめ
非同期ステートチェックポイントは別個に開発した機能はありません。ステートフルなストリーミングクエリーのオペレーションとメンテナンスをシンプルにする一連の新機能の次に来るものです。我々はストリーミング機能に膨大な投資をしており、お客様がより多くのデータを容易、かつ、よりクイックにエンドユーザーにデリバリーできるようにすることに非常にフォーカスしています。続報をお待ちください!