Structured Streaming in production | Databricks on AWS [2022/3/21時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
ノートブックをクラスターにアタッチし、インタラクティブにストリーミングクエリーを実行することは便利です。しかし、本格運用(プロダクション)で実行する際には、より高い堅牢性、可用性の補償が必要となることでしょう。本書では、Databricksジョブを用いて、よりフォールトトレラントなストリーミングアプリケーションの構築方法を議論します。
ストリーミングデータ処理のタイミングの定義
ストリーミングデータ処理のタイミングを定義するためにトリガーを使用します。trigger
の期間に小さすぎる値(数十秒以内)を指定すると、新規データ到着をチェックするためにシステムが不要な処理を行う場合があります。ベストプラクティスとしては、コストを最小化するためにtrigger
を調整することをお勧めします。
クエリー失敗からのリカバリー
プロダクションレベルのストリーミングアプリケーションには、頑健なエラーハンドリング機能が必要となります。構造化ストリーミングにおいて、ストリーミングクエリーに対するチェックポイントを有効化すると、失敗後にくエリーを再起動することができ、耐障害性とデータの一貫性を保障しつつ、失敗が起きたところからクエリーを再開することができます。このため、お使いのクエリーの耐障害性を高めるためには、チェックポイントを有効化し、エラー後に自動でクエリーを再開するようにDatabricksジョブを設定すべきです。
チェックポイントの有効化
チェックポイントを有効化するためには、クエリーを開始する前に、オプションcheckpointLocation
をDBFSあるいはクラウドストレージのパスを設定します。
streamingDataFrame.writeStream
.format("parquet")
.option("path", "dbfs://outputPath/")
.option("checkpointLocation", "dbfs://checkpointPath")
.start()
このチェックポイントの場所には、クエリーを一意に特定する基本的な情報が保持されます。このため、それぞれのクエリーには異なるチェックポイントの場所が必要となり、複数のクエリーで同じ場所を指定することはできません。詳細に関しては構造化ストリーミングプログラミングガイドをご覧ください。
注意
出力シンクの多くのタイプでcheckpointLocation
オプションは必要となりますが、メモリーシンクのような幾つかのタイプのシンクにおいては、checkpointLocation
を指定しない際に、自動でテンポラリなチェックポイント格納場所をDBFSに生成します。テンポラリなチェックポイントの格納場所は、フォールトトレランスやデータの一貫性を保証しませんし、適切にクリーンアップされない場合があります。ベストプラクティスとして、常にcheckpointLocation
オプションを指定することをお勧めします。
ストリーミングクエリー失敗時に再起動を行うジョブを設定する
お使いのストリーミングクエリーを実行するノートブック、JARを実行するDatabricksジョブを作成し、以下のように設定します。
- 常に新規クラスターを使用
- 失敗時には常にリトライ
ジョブは構造化ストリーミングAPIと密接にインテグレーションされており、処理実行(ラン)におけるアクティブなストリーミングクエリー全てを監視することができます。この設定により、クエリーの一部でエラーが起きた際に、ジョブが自動で(他のクエリー含めて)処理を停止し、新規クラスターで新規に処理を開始することを保証します。新規のランにおいては、ノートブックあるいはJARコードを再実行し、全てのクエリーを再起動します。これは正常な状態に復帰することを保証するための最も安全な方法です。
警告!
ノートブックワークフローは長時間処理を実行するジョブではサポートされていません。このため、ストリーミングジョブでノートブックワークフローを使用することはお勧めしません。
注意
- アクティブなストリーミングクエリーにおけるあらゆるエラーは、アクティブなランを失敗させ、他の全てのストリーミングクエリーを停止します。
- ノートブックの最後で
streamingQuery.awaitTermination()
やspark.streams.awaitAnyTermination()
を使用する必要はありません。ストリーミングクエリーがアクティブな際、ジョブは自動で処理の停止を回避します。
以下に推奨のジョブ設定の詳細を示します。
- Cluster: 常に新規クラスターを使用し、最新のSparkバージョン(あるいは少なくともバージョン2.1)を使用するように設定します。Spark 2.1以降のクエリーは、クエリー後の復旧が可能です。
- Alerts: 処理失敗時にメールの通知を受け取りたい場合に設定します。
- Schedule: スケジュールは設定しません。
- Timeout: *タイムアウトは設定しません。*ストリーミングクエリーは無限に長い期間実行されます。
- Maximum concurrent runs: 1に設定します。それぞれのクエリーに対して、同時に1つのインスタンスのみがアクティブであることを許可します。
- Retries: Unlimitedに設定します。
これらの設定を理解するにはジョブを参照してください。こちらに適切なジョブ設定のスクリーンショットを示します。
ストリーミングクエリーにおける変更後のリカバリー
ストリーミングクエリーにおいて、同じチェックポイントからの再起動で許可される変更に関しては制限があります。許可されない、あるいは変更の効果が未定となる変更の種類を示します。いずれのタイプにおいて、以下の点が共通となります。
- 用語「許可される」は、特定の変更を行うことはできますが、効果のセマンティクス(意味)はクエリーと変更内容に基づいて明確に定義される(well-defined)ことを意味します。
- 用語「許可されない」は、予期されないエラーによりクエリーの再起動が失敗する可能性が高いため、特定の変更を行うべきではないことを意味します。
-
sdf
はsparkSession.readStream
によって作成されるストリーミングデータフレーム/データセットを意味します。
変更のタイプ
-
入力ソースの数や型の変更(異なるソースなど): これは許可されません。
-
入力ソースのパラメーターの変更: 変更のセマンティクスがwell-definedになるかどうかは、ソースとクエリーに依存します。こちらにいくつかの例を示します。
- レートリミットの追加、削除、変更は許可されます。
Scalaspark.readStream.format("kafka").option("subscribe", "article")
から以下への変更は許可されます。
Scalaspark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
- サブスクライブされたアーティクル、ファイルの変更は、結果が予期されないため、通常は許可されません。
spark.readStream.format("kafka").option("subscribe", "article")
からspark.readStream.format("kafka").option("subscribe", "newarticle")
への変更は許可されません。
-
出力シンクの変更: いくつかの特定のシンクの組み合わせ間の変更は許可されます。ケースバイケースで検証する必要があります。以下にいくつか例を示します。
- ファイルシンクからKafkaシンクへの変更は許可されます。Kafkaは新規データのみを参照します。
- Kafkaシンクからファイルシンクへの変更は許可されません。
- Kafkaシンクとforeachの相互変換は許可されます。
-
出力シンクのパラメーターの変更: 許可されるかどうか、変更のセマンティクスがwell-definedになるかどうかは、シンクとクエリーに依存します。いかに例を示します。
- ファイルシンクの出力ディレクトリの変更は許可されません。
sdf.writeStream.format("parquet").option("path", "/somePath")
からsdf.writeStream.format("parquet").option("path", "/anotherPath")
への変更は許可されません。 - 出力アーティクルの変更は許可されます。
sdf.writeStream.format("kafka").option("article", "somearticle")
からsdf.writeStream.format("kafka").option("path", "anotherarticle")
への変更は許可されます。 - ユーザー定義のforeachシンク(すなわち
ForeachWriter
コード)への変更は許可されますが、変更のセマンティクスはコードに依存します。
- ファイルシンクの出力ディレクトリの変更は許可されません。
-
projection / filter / mapライクなオペレーションにおける変更: いくつかのケースは許可されます。例えば、
- フィルターの追加削除は許可されます:
sdf.selectExpr("a")
からsdf.where(...).selectExpr("a").filter(...)
への変更は許可されます。 - 同じ出力スキーマによるprojectionの変更は許可されます:
sdf.selectExpr("stringColumn AS json").writeStream
からsdf.select(to_json(...).as("json")).writeStream
への変更は許可されます。 - 異なる出力スキーマによるprojectionの変更は条件付きで許可されます: 出力シンクが
"a"
から"b"
へのスキーマ変更を許可しているのであれば、sdf.selectExpr("a").writeStream
からsdf.selectExpr("b").writeStream
への変更は許可されます。
- フィルターの追加削除は許可されます:
-
ステートフルなオペレーションにおける変更: 結果を継続的に更新するために、ストリーミングクエリーにおける幾つかのオペレーションはステートデータを維持する必要があります。構造化ストリーミングは自動で状態のチェックポイントをフォールトトレラントなストレージ(例えば、DBFS、AWS S3、Azure Blob storage)に作成し、再起動後にレストアします。しかし、ステートデータのスキーマは再起動の合間で変更がないことを前提としています。これは、再起動の合間にストリーミングクエリーのステートフルオペレーションに対するいかなる変更(追加、削除、スキーマ変更)も許可されないことを意味します。ステートのリカバリーを確実にするために、再起動の合間にスキーマを変更すべきではないステートフルオペレーションのリストを示します。
-
ストリーミングの集計: 例えば、
sdf.groupBy("a").agg(...)
。グルーピングのキー、集計の数やタイプの変更は許可されません。 -
ストリーミングの重複排除: 例えば、
sdf.dropDuplicates("a")
。グルーピングのキー、集計の数やタイプの変更は許可されません。 -
ストリームとストリームのjoin: 例えば、
sdf1.join(sdf2, ...)
(ここでは両方の入力がsparkSession.readStream
で生成されます)。スキーマやjoinカラムの変更は許可されません。joinタイプ(inner/outer)の変更は許可されません。他のjoinの条件の変更の結果も未定になる可能性があります。 -
任意のステートフルオペレーション: 例えば、
sdf.groupByKey(...).mapGroupsWithState(...)
やsdf.groupByKey(...).flatMapGroupsWithState(...)
。ユーザー定義のステートのスキーマやタイムアウトのタイプの変更は許可されません。ユーザー定義のステートマッピング関数におけるいかなる変更は許可されますが、変更のセマンティックな影響は、ユーザー定義ロジックに依存します。ステートのスキーマ変更をサポートしたいのであれば、明示的に複雑なステートデータの構造を、スキーママイグレーションをサポートするエンコーディング/デコーディングスキーマを用いて、バイトコードにエンコード/デコードすることができます。例えば、Avroエンコードされたバイトコードとしてステートを保存するのであれば、バイナリーのステートは常に問題なくレストアされるので、クエリー再起動の合間にAvroのステートスキーマを自由に変更することができます。
-
ストリーミングの集計: 例えば、
ストリーミングクエリーのモニタリング
StreamingタブのSpark UIを通じてストリーミングアプリケーションをモニタリングすることができます。df.writeStream.queryName(<query_name>)
でクエリー名をストリームにつけることで、どのメトリクスがどのストリームに属するのかをSpark UIで確認することができます。
Apache SparkのStreaming Query Listenerインタフェースを用いることで、アラートやダッシュボードの目的でストリーミングメトリクスを外部サービスにプッシュすることができます。Streaming Query ListenerインタフェースはScalaでのみ利用できます。
import org.apache.spark.sql.streaming.StreamingQueryListener
import org.apache.spark.sql.streaming.StreamingQueryListener._
val myListener = new StreamingQueryListener {
/**
* Called when a query is started.
* @note This is called synchronously with
* [[org.apache.spark.sql.streaming.DataStreamWriter `DataStreamWriter.start()`]],
* that is, `onQueryStart` will be called on all listeners before
* `DataStreamWriter.start()` returns the corresponding [[StreamingQuery]].
* Do not block in this method as it will block your query.
*/
def onQueryStarted(event: QueryStartedEvent): Unit = {}
/**
* Called when there is some status update (ingestion rate updated, etc.)
*
* @note This method is asynchronous. The status in [[StreamingQuery]] will always be
* latest no matter when this method is called. Therefore, the status of [[StreamingQuery]]
* may be changed before/when you process the event. For example, you may find [[StreamingQuery]]
* is terminated when you are processing `QueryProgressEvent`.
*/
def onQueryProgress(event: QueryProgressEvent): Unit = {}
/**
* Called when a query is stopped, with or without error.
*/
def onQueryTerminated(event: QueryTerminatedEvent): Unit = {}
}
観測方法
観測可能なメトリクスは、クエリー(データフレーム)で定義できる名前付き任意集計関数です。データフレームの実行が完了地点に到達すると、前回の完了地点以降に処理されたデータのメトリクスを含む名前付きイベントが発出されます。
Sparkセッションにリスナーをアタッチすることでこれらのメトリクスを観測することができます。リスナーは実行モードに依存します。
-
バッチモード:
QueryExecutionListener
を使います。クエリーが完了すると
QueryExecutionListener
がコールされます。QueryExecution.observedMetrics
mapを用いてメトリクスにアクセスします。 -
ストリーミング、あるいはマイクロバッチ:
StreamingQueryListener
を使います。ストリーミングクエリーがエポックを完了すると
StreamingQueryListener
がコールされます。StreamingQueryProgress.observedMetrics
mapを用いてメトリクスにアクセスします。Databricksでは連続実行ストリーミングをサポートしていません。
以下に例を示します。
// Observe row count (rc) and error row count (erc) in the streaming Dataset
val observed_ds = ds.observe("my_event", count(lit(1)).as("rc"), count($"error").as("erc"))
observed_ds.writeStream.format("...").start()
// Monitor the metrics using a listener
spark.streams.addListener(new StreamingQueryListener() {
override def onQueryProgress(event: QueryProgressEvent): Unit = {
event.progress.observedMetrics.get("my_event").foreach { row =>
// Trigger if the number of errors exceeds 5 percent
val num_rows = row.getAs[Long]("rc")
val num_error_rows = row.getAs[Long]("erc")
val ratio = num_error_rows.toDouble / num_rows
if (ratio > 0.05) {
// Trigger alert
}
}
}
})
効率改善のためのApache Sparkスケジューラープールの設定
デフォルトではノートブックで起動される全てのクエリーは、同じフェアスケジューリングプールで実行されます。このため、ノートブックで逐次実行される全てのストリーミングクエリーは、ファーストイン・ファーストアウト(FIFO)で処理されます。クエリー間でクラスターの資源を効率的に共有しないことで、クエリーで不必要な遅延を引き起こす場合があります。
全てのストリーミングクエリーが同時にジョブを実行できるようにし、クラスターの資源を効率的に共有できるようにするには、クエリーが別のスケジューラープールで処理を実行するように設定することができます。例えば、以下のように設定します。
// Run streaming query1 in scheduler pool1
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool1")
df.writeStream.queryName("query1").format("parquet").start(path1)
// Run streaming query2 in scheduler pool2
spark.sparkContext.setLocalProperty("spark.scheduler.pool", "pool2")
df.writeStream.queryName("query2").format("orc").start(path2)
注意
ローカルプロパティ設定は、ストリーミングクエリーを起動するのと同じノートブックセルで行う必要があります。
詳細に関してはApache fair scheduler documentationを参照ください。
ステートフルなストリーミングクエリーのパフォーマンスの最適化
ストリーミングクエリーでステートフルなオペレーション(例えば、ストリーミングの集計、ストリーミングに対するdropDuplicates、ストリームとストリームのjoin、mapGroupsWithState、flatMapGroupsWithState)を行なっており、ステートに数百万のキーを維持したい場合、大規模なJVMのガーベージコレクション(GC)による一時停止に起因するマイクロバッチの処理時間の変動などの問題に直面するかもしれません。これは、デフォルトでは、ステートデータがエグゼキューターのJVMのメモリーに保持され、大量のステートオブジェクトがJVMにおけるメモリー消費を引き起こし、大規模なGCによる一時停止を引き起こすためです。
このようなケースにおいては、RocksDBに基づいた、より最適化されたステート管理ソリューションを使用することを選択できます。このソリューションはDatabricksランタイムで利用できます。ステートをJVMメモリーに保持するよりも、このソリューションは、ネイティブなメモリーとローカルSSDでステートを効率的に管理するためにRocksDBを使用します。さらに、このステートに対するあらゆる変更は、自動で構造化ストリーミングによって指定したチェックポイントの場所に保存されるので、(デフォルトのステート管理と同等の)完全なフォールトトレランスを保証することができます。ステートストアとしてRocksDBを設定する方法に関しては、Configure RocksDB state storeを参照ください。
ベストなパフォーマンスのための推奨設定は以下の通りとなります。
- ワーカーとしてcompute-optimizedインスタンスを使用します。例えば、AWSのc3.4xlargeインスタンスなどです。
- クラスターにおけるコア数の1-2倍のシャッフルパーティション数を指定します。
パフォーマンスの利点に関しては、RocksDBベースのステート管理では、デフォルトの100倍以上ののステートキーを維持することができます。例えば、ワーカーとしてAWSのc3.4xlargeインスタンスを用いたSparkクラスターにおいては、デフォルトのステート管理では、エグゼキューターごとに最大1-2百万のステートキーを保持することができますが、そのあとはJVMのGCがスタートし性能に影響を及ぼします。一方、RocksDBベースのステート管理は、GCの問題に直面することなしに容易にエグゼキューターあたり1億のステートキーを保持することができます。
注意
クエリーの再起動の合間にステート管理のスキームを変更することはできません。すなわち、デフォルトのステート管理でクエリーを起動した場合、新たなチェックポイント格納場所を用いて最初からクエリーを起動しない限り、変更することはできません。
RocksDBステートストアの設定
ストリーミングクエリーを開始する前に、SparkSessionで以下の設定を行うことでRocksDBベースのステート管理を有効化することができます。
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider")
RocksDBステートストアのメトリクス
それぞれのステートオペレータは、ステートストアを監視し、遅いジョブのデバッグに役立つように、RocksDBインスタンスで実行されるステート管理オペレーションに関するメトリクスを収集します。これらのメトリクスは、ステートオペレータが実行される全てのタスクにおけるジョブのステートオペレータごとに集計(合計)されます。これらのメトリクスは、StreamingQueryProgress
のstateOperators
の中のcustomMetrics
マップの一部となります。以下に、(StreamingQueryProgress.json()
を用いて取得された)JSON形式のStreamingQueryProgress
の例を示します。
{
"id" : "6774075e-8869-454b-ad51-513be86cfd43",
"runId" : "3d08104d-d1d4-4d1a-b21e-0b2e1fb871c5",
"batchId" : 7,
"stateOperators" : [ {
"numRowsTotal" : 20000000,
"numRowsUpdated" : 20000000,
"memoryUsedBytes" : 31005397,
"numRowsDroppedByWatermark" : 0,
"customMetrics" : {
"rocksdbBytesCopied" : 141037747,
"rocksdbCommitCheckpointLatency" : 2,
"rocksdbCommitCompactLatency" : 22061,
"rocksdbCommitFileSyncLatencyMs" : 1710,
"rocksdbCommitFlushLatency" : 19032,
"rocksdbCommitPauseLatency" : 0,
"rocksdbCommitWriteBatchLatency" : 56155,
"rocksdbFilesCopied" : 2,
"rocksdbFilesReused" : 0,
"rocksdbGetCount" : 40000000,
"rocksdbGetLatency" : 21834,
"rocksdbPutCount" : 1,
"rocksdbPutLatency" : 56155599000,
"rocksdbReadBlockCacheHitCount" : 1988,
"rocksdbReadBlockCacheMissCount" : 40341617,
"rocksdbSstFileSize" : 141037747,
"rocksdbTotalBytesReadByCompaction" : 336853375,
"rocksdbTotalBytesReadByGet" : 680000000,
"rocksdbTotalBytesReadThroughIterator" : 0,
"rocksdbTotalBytesWrittenByCompaction" : 141037747,
"rocksdbTotalBytesWrittenByPut" : 740000012,
"rocksdbTotalCompactionLatencyMs" : 21949695000,
"rocksdbWriterStallLatencyMs" : 0,
"rocksdbZipFileBytesUncompressed" : 7038
}
} ],
"sources" : [ {
} ],
"sink" : {
}
}
メトリクスの詳細な説明に関しては、原文を参照ください。
非同期ステートチェックポイント作成
注意
この機能はDatabricksランタイム10.3以降で利用できます。
大規模なステートの更新を伴うステートフルストリーミングクエリーにおいて非同期ステートチェックポイントを有効化することで、エンドツーエンドのマイクロバッチのレーテンシーを削減できる可能性があります。
構造化ストリーミングは現状、同期型のチェックポイントを使用しており、すべてのマイクロバッチは、次のバッチをスタートする前に、バッチで行われる全てのステート更新はクラウドストレージ(「チェックポイントロケーション」と呼ばれます)にバックアップされることを保証することを意味します。ステートフルなストリーミングクエリーが失敗した場合、最新のマイクロバッチ以外の全てのマイクロバッチのチェックポイントが作成されることを保証します。しかし、同期型チェックポイントによる高速復旧は、それぞれのマイクロバッチの高価なレーテンシーのコストにつながります。
非同期型のステートチェックポイントは、マイクロバッチの実行がチェックポイント作成の完了を待つ必要が無いように、非同期的にチェックポイントの作成を試みます。言い換えると、次のマイクロバッチは前回のマイプロバッチが完了するとすぐに次のマイクロバッチをスタートします。しかし、内部的には、オフセットメタデータ(チェックポイントのロケーションに保存されます)は、マイクロバッチごとにステートのチェックポイント作成が完了したかどうかを追跡します。クエリーを再起動した場合、一つ以上のマイクロバッチを再実行する必要があるかもしれません。これには、計算が完了していない最新のマイクロバッチと、ステートチェックポイント作成が完了していないマイクロバッチが含まれます。そして、同期のチェックポイント作成と同様の、耐障害性保証(すなわち、冪等性のシンクを伴うexactly-once保証)を手に入れられます。
まとめると、ステート更新にボトルネックがあるステートフルなストリーミングクエリーにおいて、非同期的なチェックポイント作成を有効化することで、あらゆる耐障害性保証を損なうことなしに、わずかな再起動の遅延のコストでエンドツーエンドのレーテンシーを削減することができます。
ターゲットワークロードの特定
非同期チェックポイント作成によってメリットを享受できる可能性があるストリーミングジョブの特性を以下に示します。
-
ジョブに1つ以上のステートフルなオペレーション(例えば、集計、[flat]MapGroupsWithState、ストリームとストリームのjoin)が含まれている。
-
ステートのチェックポイントのレーテンシーがバッチ処理全体のレーテーンシーの大部分を占める。この情報はStreamingQueryProgressで確認することができます。これらのイベントはSparkドライバーのlog4jログでも確認することができます。以下にストリーミングクエリーの進捗状況と、全体的なバッチ実行のレーテンシーにおけるステートチェックポイントのインパクトをどのように特定するのかの例を示します。
JSON{ "id" : "2e3495a2-de2c-4a6a-9a8e-f6d4c4796f19", "runId" : "e36e9d7e-d2b1-4a43-b0b3-e875e767e1fe", "...", "batchId" : 0, "durationMs" : { "...", "triggerExecution" : 547730, "..." }, "stateOperators" : [ { "...", "commitTimeMs" : 3186626, "numShufflePartitions" : 64, "..." }] }
- 上記のクエリー進捗イベントのステートチェックポイントのレーテンシーの分析
- バッチ期間(
durationMs.triggerDuration
)は約547秒。 - ステートストアのコミットのレーテンシー(
stateOperations[0].commitTimeMs
)は約3,186秒。コミットのレーテンシーはステートストアを持つタスクで合計されます。この場合、そのようなタスクは64個(stateOperators[0].numShufflePartitions
)です。 - ステートオペレータを持つそれぞれのタスクは、チェックポイント作成に平均50秒(3,186/64)かかっています。これは追加のレーテンシーであり、バッチ期間に加算されます。64個すべてのタスクが同時に実行すると仮定すると、チェックポイントのステップはバッチ処理期間の約9%(50秒 / 547秒)を占めています。最大同時タスク実行数が64より少なくなると、このパーセンテージはさらに増加します。
- バッチ期間(
- 上記のクエリー進捗イベントのステートチェックポイントのレーテンシーの分析
非同期ステートチェックポイント作成の有効化
ストリーミングジョブで以下の設定を行います。非同期ステートチェックポイント作成には、非同期コミットをサポートするステートストアの実装が必要となります。現時点ではRocksDBベースのステートストアのみがサポートしています。
spark.conf.set(
"spark.databricks.streaming.statefulOperator.asyncCheckpoint.enabled",
"true"
)
spark.conf.set(
"spark.sql.streaming.stateStore.providerClass",
"com.databricks.sql.streaming.state.RocksDBStateStoreProvider"
)
制限
- 非同期チェックポイントにおけるあらゆる失敗は、クエリー自体を失敗させます。同期チェックポイント作成モードでは、チェックポイントはタスクの一部として実行され、Sparkはクエリーが失敗する前にタスクを複数回リトライします。この機構は非同期チェックポイント作成モードでは存在しません。しかし、Databricksジョブのリトライを使うことで、そのような処理失敗に対して自動でリトライするようにすることができます。
- 非同期ステートチェックポイント作成とオートスケーリングの組み合わせは動作しません。マイクロバッチの実行の合間にステートストアの場所がへこうされない場合には、非同期チェックポイント作成がもっともうまく動作します。オートスケーリングを有効化すると、オートスケーリングの一部でノードが追加、削除されるたびに、ステートストアのインスタンスが再分散される場合があります。
- 非同期ステートチェックポイント作成はRocksDBステートストアプロバイダー実装でのみサポートされています。デフォルトのインメモリステートストア実装は非同期ステートチェックポイント作成をサポートしていません。
複数のウォーターマークのポリシー
ストリーミングクエリーに、union、joinされる複数の入力ストリームを含めることができます。入力ストリームのそれぞれに、ステートフルなオペレーションで許容すべき遅延データに対して異なる閾値を指定することができます。入力ストリームのそれぞれでwithWatermarks("eventTime", delay)
を用いて、これらの閾値を指定することができます。例えば、ストリームとストリームのjoinのクエリーを考えてみます。
val inputStream1 = ... // delays up to 1 hour
val inputStream2 = ... // delays up to 2 hours
inputStream1.withWatermark("eventTime1", "1 hour")
.join(
inputStream2.withWatermark("eventTime2", "2 hours"),
joinCondition)
クエリーの実行中、構造化ストリーミングはそれぞれの入力ストリームの最大イベント時間を別個に追跡し、対応する遅延に基づきウォーターマークを計算し、ステートフルオペレーションで使用する単一のグローバルウォーターマークを選択します。デフォルトでは、あるストリームが他のストリームよりも遅延することで予期せずにデータがドロップされないように、グローバルウォーターマークとして最小値が使用されます(例えば、上流のストリームの失敗によってあるストリームがデータ受信を停止するなど)。言い換えると、グローバルウォーターマークは、最も遅いストリームにペースを安全に合わせ、クエリーのアウトプットもそれに合わせて遅延させられます。
幾つかのケースでは、最も遅いストリームからデータをドロップしたとしてもで、より迅速に結果を得たいと考えるかもしれません。SQL設定のspark.sql.streaming.multipleWatermarkPolicy
をmax
(デフォルトはmin
)に設定することで、グローバルウォーターマークに最大値を選択するように複数のウォーターマークのポリシーを設定することができます。これにより、グローバルウォーターマークは、最速のストリームにペースを合わせます。しかし、副採用として、最遅のストリームからのデータは積極的にドロップされます。このため、検討した上でこの設定を使用することをお勧めします。
構造化ストリーミングデータフレームの可視化
リアルタイムで構造化ストリーミングデータフレームを可視化するためにdisplay
関数を使用することができます。trigger
とcheckpointLocation
パラメーターはオプションですが、ベストプラクティスとして、プロダクション環境では常にこれらを指定することをお勧めします。
import org.apache.spark.sql.streaming.Trigger
val streaming_df = spark.readStream.format("rate").load()
display(streaming_df.groupBy().count(), trigger = Trigger.ProcessingTime("5 seconds"), checkpointLocation = "dbfs:/<checkpoint-path>")
streaming_df = spark.readStream.format("rate").load()
display(streaming_df.groupBy().count(), processingTime = "5 seconds", checkpointLocation = "dbfs:/<checkpoint-path>")
詳細はStructured Streaming DataFramesを参照ください。
ステートオペレーターflatMapGroupsWithState
の改善
初期ステートの指定
[flat]MapGroupsWithState
オペレータを使用して構造化ストリーミングのステートフル処理におけるユーザー定義の初期ステートを指定することができます。
def mapGroupsWithState[S: Encoder, U: Encoder](
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => U): Dataset[U]
def flatMapGroupsWithState[S: Encoder, U: Encoder](
outputMode: OutputMode,
timeoutConf: GroupStateTimeout,
initialState: KeyValueGroupedDataset[K, S])(
func: (K, Iterator[V], GroupState[S]) => Iterator[U])
flatMapGroupsWithState
オペレータに初期ステートを指定する例を示します。
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
Iterator((key, count.toString))
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.flatMapGroupsWithState(Update, GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
mapGroupsWithState
オペレータに初期ステートを指定する例を示します。
val fruitCountFunc =(key: String, values: Iterator[String], state: GroupState[RunningCount]) => {
val count = state.getOption.map(_.count).getOrElse(0L) + valList.size
state.update(new RunningCount(count))
(key, count.toString)
}
val fruitCountInitialDS: Dataset[(String, RunningCount)] = Seq(
("apple", new RunningCount(1)),
("orange", new RunningCount(2)),
("mango", new RunningCount(5)),
).toDS()
val fruitCountInitial = initialState.groupByKey(x => x._1).mapValues(_._2)
fruitStream
.groupByKey(x => x)
.mapGroupsWithState(GroupStateTimeout.NoTimeout, fruitCountInitial)(fruitCountFunc)
ステート更新関数のテスト
TestGroupState
APIを用いることで、Dataset.groupByKey(...).mapGroupsWithState(...)
とDataset.groupByKey(...).flatMapGroupsWithState(...)
に対するステート更新関数をテストすることができます。
ステート更新関数は、入力としてGroupState
オブジェクトタイプの以前のステートを受け取ります。サンプルについては、Apache SparkのGroupState reference documentationを参照ください。
import org.apache.spark.sql.streaming._
import org.apache.spark.api.java.Optional
test("flatMapGroupsWithState's state update function") {
var prevState = TestGroupState.create[UserStatus](
optionalState = Optional.empty[UserStatus],
timeoutConf = GroupStateTimeout.EventTimeTimeout,
batchProcessingTimeMs = 1L,
eventTimeWatermarkMs = Optional.of(1L),
hasTimedOut = false)
val userId: String = ...
val actions: Iterator[UserAction] = ...
assert(!prevState.hasUpdated)
updateState(userId, actions, prevState)
assert(prevState.hasUpdated)
}