Feature Deep Dive: Watermarking in Apache Spark Structured Streaming - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
キーとなる知見
- Sparkがイベント時間に基づいた処理や、いつウィンドウ処理された集計結果を生成するのか、いつ集計状態を切り捨てるのかを理解する際に、ウォーターマークが役立ちます。
- データのストリームをjoinする際、Sparkはデフォルトでは入力ストリームで確認される最小イベント時間に基づいて状態を削除する単一のグローバルウォーターマークを使用します。
- クラスターのメモリーの負荷やGCによる一時停止を削減するためにRocksDBを活用することができます。
- StreamingQueryProgressやStateOperatorProgressオブジェクトには、お使いのストリームにウォーターマークがどのような影響を及ぼすのかに関する情報が含まれています。
イントロダクション
リアルタイムのパイプラインを構築する際、チームが直面する現実の一つは分散されたデータの取り込みは本質的に順序よく並び替えられていないということです。さらに、ステートフルなストリーミングオペレーションの文脈においては、チームは時間ウィンドウの集計やそのほかのステートフルなオペレーションにおいてイベント時間の進捗を適切に追跡できる必要があります。我々は、これら全てを構造化ストリーミングで解決できます。
例えば、お客様にリースしている採掘マシンに対するプロアクティブなメンテナンスを行う企業を支援するパイプラインの構築にチームが取り組んでいるとしましょう。これらのマシンは常に最高の状態で動作する必要があるので、リアルタイムでこれらを監視します。マシンの問題を理解し特定するためにストリーミングデータに対するステートフルな集計処理を実行する必要があるでしょう。
ここで、予兆メンテナンスに関する意思決定やこれらマシンに関するその他の事柄に関して必要な集計処理を行うために、我々は構造化ストリーミングとウォーターマーキングを活用する必要があります。
ウォーターマーキングとは?
一般的に、リアルタイムのストリーミングデータを取り扱う際、データの取り込み方法やアプリケーション全体がダウンタイムのような問題を体験するかどうかによって、イベント時間と処理時間の間には遅れが生じます。これらの潜在的に変動する遅延によって、このデータの処理に使用するエンジンには、いつ集計ウィンドウをクローズし、集計結果を生成するのかを決定するメカニズムが必要となります。
これらの問題に対応する自然なアプローチは、現実世界の時間に基づいた固定の遅延時間を用いることかも知れませんが、なぜこれがベストなソリューションでは無いのかを以降のサンプルで説明します。
これをビジュアルで説明するために、10:50 AM → 11:20 AMの期間のさまざまなタイミングでデータを受信するシナリオを考えてみます。ウィンドウの期間を通じて到着する気温と気圧の読み取り値の平均値を計算する10分のタンブリングウィンドウを作成します。
最初の図では、11:00 AM、11:10 AM、11:20 AMに起動されるタンブリングウィンドウを持っており、それぞれの時間に表示される結果を生成します。データの二番目のバッチでイベント時間10:53 AMであるデータが11:10 AMごろに到着し、11:10 AMにクローズする11:00 AM → 11:10 AMのウィンドウの気温と気圧の平均に組み込まれることになり、適切な結果にならなくなってしまいます。
生成したい集計結果を適切なものにするために、Sparkが集計ウィンドウをいつクローズし、適切な集計結果を生成するのかを理解できるようにウォーターマークを定義する必要があります。
構造化ストリーミングアプリケーションにおいては、ウォーターマーキングと呼ばれる機能を用いることで、計算したい集計結果に適したすべてのデータが収集されるようにすることができます。基本的には、ウォーターマークを定義することで、Spark構造化ストリーミングは(一連の予想遅延期間に基づく)ある時点Tまでのすべてデータがいつ取り込まれたのかを認識することができるので、タイムスタンプTまでのウィンドウ集計をクローズし、結果を生成することができます。
この2つ目の図では、10分のウォーターマークとSpark構造化ストリーミングのAppendモードの実装の効果を示しています。
10分ごとに以前の10分のウィンドウ集計結果を開放(例: 11:10 AMに11:00 AM →11:10 AMウィンドウを開放)する最初のシナリオと異なり、Sparkは観測されるイベント時間の最大値マイナス指定されたウォーターマークがウィンドウの上限値を上回った際に集計ウィンドウをクローズし、出力します。
言い換えると、Sparkは10:50 AM → 11:00 AMの集計ウィンドウを開放するには、最新のイベント時間マイナス10分が 11:00 AMを上回るまで待たなくてはいけません。11:00 AMにおいて、この状態を確認していないので、Sparkの内部状態ストアの集計計算を初期化するのみです。11:10 AMにおいてもこの条件はまだ満たされていませんが、10:53 AMの新たなデータポイントを取得したので、内部状態がアップデートされ、**解放はされません。**そして、11:20 AMまでに11:15 AMのイベント時間のデータポイントが観測され、11:15 AMマイナス10分は11:05 AMとなり、11:00 AMよりも遅いため、10:50 AM → 11:00 AMのウィンドウは結果テーブルに開放されます。
これによって、ウォーターマークによって定義した予想遅延時間に基づいてデータを適切に組み込むことで、正しい結果を生成することになります。結果が開放されると、対応する状態は状態ストアから削除されます。
パイプラインへのウォーターマーキングの組み込み
これらのウォーターマークを、ご自身の構造化ストリーミングパイプラインにどのように組み込むのかを理解するために、この記事のイントロダクションで説明したユースケースに基づく実際のコードサンプルをウォークスルーすることでこのシナリオを探索します。
ここでは、クラウドにあるKafkaクラスターからのすべてのセンサーデータを取り込み、時間の偏りが10分であると予想されるものとし、10分ごとの気温と気圧の平均を計算したいものとします。ウォーターマーキングを用いた構造化ストリーミングパイプラインは以下のようになります:
PySpark
sensorStreamDF = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "host1:port1,host2:port2") \
.option("subscribe", "tempAndPressureReadings") \
.load()
sensorStreamDF = sensorStreamDF \
.withWatermark("eventTimestamp", "10 minutes") \
.groupBy(window(sensorStreamDF.eventTimestamp, "10 minutes")) \
.avg(sensorStreamDF.temperature,
sensorStreamDF.pressure)
sensorStreamDF.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/delta/events/_checkpoints/temp_pressure_job/")
.start("/delta/temperatureAndPressureAverages")
ここでは、シンプルにKafkaから読み込み、変換処理と集計処理を適用し、Databricks SQLで可視化、監視されるDelta Lakeのテーブルに書き出します。テーブルに書き込まれる特定のデータサンプルに対する出力は以下のようになります:
ウォーターマーキングを組み込むには、最初に2つの項目を特定する必要があります:
- センサー読み取り値のイベント時間を表現するカラム
- データの偏りの推定予測時間
上のサンプルでは、.withWatermark()
メソッドとイベント時間のカラムとして使用されるeventTimestamp
カラム、予想する時間の偏りを表現する10 minutes
で定義されるウォーターマークを確認できます。
PySpark
sensorStreamDF = sensorStreamDF \
.withWatermark("eventTimestamp", "10 minutes") \
.groupBy(window(sensorStreamDF.eventTimestamp, "10 minutes")) \
.avg(sensorStreamDF.temperature,
sensorStreamDF.pressure)
構造化ストリーミングパイプラインでどのようにウォーターマークを実装するのかが分かったら、ストリーミングのjoinオペレーションやウォーターマークによって影響を受ける状態の管理のような他の事柄を理解することが重要になります。さらに、パイプラインをスケールさせると、パフォーマンス問題を避けるために、データエンジニアが注意し監視する必要があるキーとなるメトリクスが存在します。ウォーターマーキングへのディープダイブの過程でこれら全てを探索します。
異なる出力モードでのウォーターマーク
ディープダイブする前に、選択する出力モードが設定するウォーターマークにどのような影響を与えるのかを理解しておくことが重要です。
ウォーターマークは、お使いのストリーミングアプリケーションがappend、update出力モードで動作している場合にのみ使用できます。すべての結果テーブルがストレージに書き込まれる3番目の出力モード、completeモードが存在します。このモードではすべての集計データを保持する必要があり、中間状態を削除するためにウォーターマーキングを使用できないので、このモードは使用できません。
ウィンドウ集計やウォーターマークの文脈においてこれらの出力モードが示唆することは、append
モードでは集計結果は一度のみ生成され、エンジンは集計状態を削除でき、全体の集計状態は有限に保たれるということです。近似的なウォーターマークのヒューリスティックが適用されない(これらはウォーターマークの遅延期間より遅い)遅延レコードは必要性に基づいて削除され、集計結果が生成され、集計状態は削除されます。
逆に、update
モードにおいては、最初のレコード以降、それぞれの受信レコードに基づいて集計結果が繰り返し生成されるので、ウォーターマークは任意となります。エンジンが集計処理に使用するレコードがもう受信されないことをヒューリスティックに認識している際に、状態を切り詰める際にのみウォーターマークは有用となります。状態が削除されると、集計時は失われ、更新できなくなるので、すべての遅延レコードは削除される必要があります。
状態、遅延レコード、異なる出力モードは、Sparkで動作するご自身のアプリケーションの異なる挙動にどのような影響を与えるのかを理解することが重要です。ここで理解すべき重要なことは、appendモード、updateモードの両方において、ウォーターマークが集計時間ウィンドウに必要なすべてのデータを受信したことを示したら、エンジンはウィンドウの状態を削除できるということです。appendモードでは、集計結果は時間ウィンドウプラスウォーターマークの遅延時間のウィンドウをクローズする際に一度のみ生成され、updateモードではウィンドウのアップデートの都度、結果が生成されます。
最後になりますが、ウォーターマークの遅延ウィンドウを増加させることで、パイプラインのデータの待ち時間が長くなり、データを削除する可能性が低下するので、精度向上につながりますが、集計結果を生成するレーテンシーが増加します。逆に、ウォーターマークの遅延時間を小さくすると、精度は低下しますが、集計結果生成のレーテンシーが低減されます。
ウィンドウ遅延時間の長さ | 精度 | レーテンシー |
---|---|---|
長い遅延ウィンドウ | 高い精度 | 高いレーテンシー |
短い遅延ウィンドウ | 低い精度 | 低いレーテンシー |
ウォーターマーキングへのさらなるディープダイブ
Joinとウォーターマーキング
ストリーミングアプリケーションでjoinオペレーションを実行する際、特に2つのストリームをjoinする際に注意べき点がいくつかあります。我々のユースケースにおいて、マシンに設置された他のセンサーからキャプチャされるストリーミングデータセットとjoinを行いたいものとしましょう。
構造化ストリーミングで実装できるストリーム-ストリームjoinには3つの重要なタイプが存在します: inner、outer、semi joinです。ストリーミングアプリケーションでjoinを行う際の主要な問題は、joinの一方のサイドの全体像がない場合があるということです。Sparkがどの時点で今後マッチすることがないだろうということを理解させるということは、Sparkが結果を解放する前に、いつ計算処理に組み込む新規のレコードが存在しないということを理解させる必要がある前述の問題と似たものとなっています。
Sparkがこれに対応できるようにするために、ストリーム-ストリームjoinのjoin条件内で、ウォーターマークとイベント時間の制約の組み合わせを活用することができます。この組み合わせによって、Sparkは遅延レコードを除外し、joinの時間レンジ条件を通じて、joinオペレーションの状態を切り詰めることができます。このサンプルを以下に示します:
PySpark
sensorStreamDF = spark.readStream.format("delta").table("sensorData")
tempAndPressStreamDF = spark.readStream.format("delta").table("tempPressData")
sensorStreamDF_wtmrk = sensorStreamDF.withWatermark("timestamp", "5 minutes")
tempAndPressStreamDF_wtmrk = tempAndPressStreamDF.withWatermark("timestamp", "5 minutes")
joinedDF = tempAndPressStreamDF_wtmrk.alias("t").join(
sensorStreamDF_wtmrk.alias("s"),
expr("""
s.sensor_id == t.sensor_id AND
s.timestamp >= t.timestamp AND
s.timestamp <= t.timestamp + interval 5 minutes
"""),
joinType="inner"
).withColumn("sensorMeasure", col("Sensor1")+col("Sensor2")) \
.groupBy(window(col("t.timestamp"), "10 minutes")) \
.agg(avg(col("sensorMeasure")).alias("avg_sensor_measure"), avg(col("temperature")).alias("avg_temperature"), avg(col("pressure")).alias("avg_pressure")) \
.select("window", "avg_sensor_measure", "avg_temperature", "avg_pressure")
joinedDF.writeStream.format("delta") \
.outputMode("append") \
.option("checkpointLocation", "/checkpoint/files/") \
.toTable("output_table")
しかし、上のサンプルと異なり、それぞれのストリームが異なる時間のウォーターマークを必要とするケースが存在します。このシナリオにおいては、Sparkは複数のウォーターマークの定義を取り扱うポリシーを持っています。Sparkはデータを損失しないように最大限の安全を保障するために、最も遅いストリームに基づく一つのグローバルウォーターマークを維持します。
開発者はspark.sql.streaming.multipleWatermarkPolicy
をmax
に変更することで、この挙動を変更することができます。しかs、これは遅いストリームからのデータが削除されることを意味します。
ウォーターマークを必要とする、あるいは活用できるすべてのjoinオペレーションに関しては、Sparkドキュメントのこちらのセクションをチェックしてください。
ウォーターマークによるストリームの監視と管理
Sparkが数百万のキーを管理する必要があるストリーミングクエリーを管理し、それらの状態を維持する必要がある場合、Databricksクラスターで提供されるデフォルトの状態ストアが効果的ではない場合があります。メモリー使用率の増加や長時間のガーベージコレクションによる一時停止を目撃するかもしれません。これらは、お使いのストリーミングアプリケーションのパフォーマンスとスケーラビリティの両方を阻害します。
ここでRocksDBが役立ちます。Spark設定で有効化することで、DatabricksでネイティブにRocksDBを活用することができます。
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
これによって、構造化ストリーミングアプリケーションを実行しているクラスターは、ネイティブメモリーで状態を効率的に管理し、すべての状態をメモリーに保持するのではなく、ローカルディスク/SSDを活用するRocksDBを活用できるようになります。
メモリー使用量やガーベージコレクションのメトリクスの追跡に加え、ウォーターマークや構造化ストリーミングを取り扱う際に収集、追跡すべきキーとなるその他のインジケーターやメトリクスが存在します。これらのメトリクスにアクセスするために、StreamingQueryProgressオブジェクトやStateOperatorProgressオブジェクトを参照することができます。これらをどのように活用するのかに関しては、ドキュメントのこちらをチェックしてください。
StreamingQueryProgressオブジェクトには、呼び出すことでタイムスタンプのmax、min、avg、watermarkを返却するeventTime
メソッドが存在します。最初の3つは当該トリガーで観測されるイベント時間の最大値、最小値、平均値です。最後は当該トリガーで使用されるウォーターマークです。
StreamingQueryProgressオブジェクトのサンプル
{
"id" : "f4311acb-15da-4dc3-80b2-acae4a0b6c11",
. . . .
"eventTime" : {
"avg" : "2021-02-14T10:56:06.000Z",
"max" : "2021-02-14T11:01:06.000Z",
"min" : "2021-02-14T10:51:06.000Z",
"watermark" : "2021-02-14T10:41:06.000Z"
},
"stateOperators" : [ {
"operatorName" : "stateStoreSave",
"numRowsTotal" : 7,
"numRowsUpdated" : 0,
"allUpdatesTimeMs" : 205,
"numRowsRemoved" : 0,
"allRemovalsTimeMs" : 233,
"commitTimeMs" : 15182,
"memoryUsedBytes" : 91504,
"numRowsDroppedByWatermark" : 0,
"numShufflePartitions" : 200,
"numStateStoreInstances" : 200,
"customMetrics" : {
"loadedMapCacheHitCount" : 4800,
"loadedMapCacheMissCount" : 0,
"stateOnCurrentVersionSizeBytes" : 25680
}
}
. . . .
}
これらの情報は、お使いのストリーミングクエリーが出力する結果テーブルのデータを調整するために活用でき、使用しているウォーターマークが意図したeventTimeのタイムスタンプになっていることを検証するために活用することもできます。これは、データストリームをjoinする際には重要なものとなります。
StateOperatorProgressオブジェクトにはnumRowsDroppedByWatermarkメトリックがあります。このメトリックは、ステートフルな集計処理に含めるには遅すぎると考えられるレコードがいくつあるのかを示しています。このメトリックは、集計後に削除されたレコード数を計測しており生の入力行ではないため、値は正確ではありませんが、遅延データが削除されたことを示すインジケーターになることに注意してください。StreamingQueryProgressオブジェクトの情報と、この情報を組み合わせることで、開発者がウォーターマークが適切に設定されているかどうかを確認する役に立ちます。
複数の集計、ストリーミング、ウォーターマーク
構造化ストリーミングクエリーに存在する制限は、単一のストリーミングクエリーにおける複数のステートフルオペレーター(例: 集計、ストリーミングjoin)のチェーンです。複数のステートフル集計に対する単一のグローバルウォーターマークの制限に関しては、Databrickでソリューションに取り組んでおり、向こう数ヶ月でリリースがされる予定です。詳細に関しては、Project Lightspeedの記事: Project Lightspeed: Faster and Simpler Stream Processing With Apache Spark (databricks.com)をご覧ください。
まとめ
Databricksで構造化ストリーミングとウォーターマーキングを活用することで、上述したユースケースを持っている企業において、データが適切な順序となっていない、データが遅延するような場合においても、リアルタイムの集計処理が正確に実行されることで、メトリクスが生成される回復力の高いリアルタイムアプリケーションを構築することができます。Databricksでどのようにリアルタイムアプリケーションを構築できるのかを知りたいのであれば、Databricks担当者にお問い合わせください。