Recover from Structured Streaming query failures | Databricks on AWS [2022/8/25時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
構造化ストリーミングは、ストリーミングクエリーにおける耐障害性とデータの一貫性を提供します。Databricksワークフローを用いることで、構造化ストリーミングの障害時に再起動する様に容易に設定することができます。ストリーミングクエリーのチェックポイント作成を有効化することで、障害の後にクエリーを再起動することができます。再起動されたクエリーは障害時点から処理を継続します。
構造化ストリーミングクエリーのチェックポイント作成を有効にする
以下の例の様に、クエリーをスタートする前に、常にcheckpointLocation
オプションを指定することをお勧めします。
streamingDataFrame.writeStream
.format("parquet")
.option("path", "/path/to/table")
.option("checkpointLocation", "/path/to/table/_checkpoint")
.start()
このチェックポイントのロケーションは、クエリーを特定する重要な情報のすべてを保持します。それぞれのクエリーごとのチェックポイントロケーションを設けるべきです。複数のクエリーで同じロケーションを使ってはいけません。詳細は、Structured Streaming Programming Guideをご覧ください。
注意
ほとんどのタイプの出力シンクでcheckpointLocation
が必要となりますが、メモリーシンクの様な幾つかのシンクではcheckpointLocation
を指定しなくても自動で一時的なチェックポイントロケーションを生成します。これらの一時的なチェックポイントロケーションは耐障害性やデータの一貫性を保証しませんし、適切にクリーンアップされない場合があります。常にcheckpointLocation
を指定して潜在的な落とし穴を避ける様にしてください。
失敗したストリーミングクエリーを再起動する様に構造化ストリーミングジョブを設定する
ストリーミングクエリーを持つノートブックやJARを用いてDatabricksジョブを作成し、以下の設定を行うことができます。
- 常に新規クラスターを使用する
- 障害時には常にリトライする
ジョブは、構造化ストリーミングAPIと密接にインテグレーションされており、実行中のすべてのクエリーがアクティブであることを監視することができます。この設定によって、クエリーの一部が失敗した際、ジョブは自動で(他のすべてのクエリーと共に)処理を停止し、新規クラスターで新規に処理をスタートします。このノートブックやJARコードの再実行によって、すべてのクエリーが再度再起動されますこれは良い状態に復旧するために最も安全な方法です。
警告!
ノートブックワークフローは長時間実行されるジョブをサポートしていません。このため、ストリーミングジョブでノートブックワークフローを使うことはお勧めしません。
注意
- いかなるアクティブなストリーミングクエリーの障害は、アクティブな処理の実行(ラン)を失敗させ、すべての他のストリーミングクエリーを停止します。
- ノートブックの最後で
streamingQuery.awaitTermination()
やspark.streams.awaitAnyTermination()
を使う必要はありません。ジョブはストリーミングクエリーがアクティブな際には、自動でランを終了させない様にします。
以下に推奨ジョブ設定のサンプルを示します。
- クラスター: 常に新規クラスターを設定し、最新のSparkバージョン(あるいは最低でもバージョン2.1)を使う様にします。Spark 2.1以降で起動されたクエリーは、クエリーの実行後やSparkのバージョンアップ後に復旧可能です。
- 通知: 失敗時にメールの通知が必要であれば設定します。
- スケジュール: スケジュールを設定しません。
- タイムアウト: タイムアウトを設定しません。 ストリーミングクエリーは永遠に実行されます。
- 最大同時実行数: 1に設定します。それぞれのクエリーのインスタンスは1つ存在する様にします。
- リトライ: Unlimitedに設定します。
これらの設定を理解するには、Databricksにおけるジョブ管理をご覧ください。
構造化ストリーミングクエリー変更後に復旧する
同じチェックポイントロケーションから再起動するまでに許可されるストリーミングクエリーの変更内容には制限があります。許可されない、あるいは変更の影響が未定義である変更の種類をいくつか示します。
- 許可されるという用語は、その変更を行うことができますが、クエリーと変更内容に基づいてその影響のセマンティクスは決定されます。
- 許可されないという用語は、クエリーの再起動は予期しないエラーで失敗するため、その変更は行うべきではないことを意味します。
-
sdf
はsparkSession.readStream
で生成されたストリーミングデータフレーム/データセットを表現します。
構造化ストリーミングクエリーの変更のタイプ
-
入力ソースの数やタイプの変更(異なるソース): これは許可されません。
-
入力ソースのパラメーターの変更: 許可されるかどうか、変更のセマンティクスが決定的かどうかはソースとクエリーに依存します。以下に例を示します。
- レートリミットの追加、削除、変更は許可されます。
Scalaspark.readStream.format("kafka").option("subscribe", "article")
から
Scalaspark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
- サブスクライブしているアーティクルやファイルの変更は、結果が予測できないため通常は許可されません。
spark.readStream.format("kafka").option("subscribe", "article")
からspark.readStream.format("kafka").option("subscribe", "newarticle")
への変更 -
出力シンクの変更: いくつかの特定のシンクの組み合わせ間の変更は許可されます。ケースバイケースで検証する必要があります。以下にいくつか例を示します。
- ファイルシンクからKafkaシンクへの変更は許可されます。Kafkaは新規データのみを参照します。
- Kafkaシンクからファイルシンクへの変更は許可されません。
- Kafkaシンクとforeachの相互変換は許可されます。
-
出力シンクのパラメーターの変更: 許可されるかどうか、変更のセマンティクスがwell-definedになるかどうかは、シンクとクエリーに依存します。いかに例を示します。
- ファイルシンクの出力ディレクトリの変更は許可されません。
sdf.writeStream.format("parquet").option("path", "/somePath")
からsdf.writeStream.format("parquet").option("path", "/anotherPath")
への変更は許可されません。 - 出力アーティクルの変更は許可されます。
sdf.writeStream.format("kafka").option("article", "somearticle")
からsdf.writeStream.format("kafka").option("path", "anotherarticle")
への変更は許可されます。 - ユーザー定義のforeachシンク(すなわち
ForeachWriter
コード)への変更は許可されますが、変更のセマンティクスはコードに依存します。
- ファイルシンクの出力ディレクトリの変更は許可されません。
-
projection / filter / mapライクなオペレーションにおける変更: いくつかのケースは許可されます。例えば、
- フィルターの追加削除は許可されます:
sdf.selectExpr("a")
からsdf.where(...).selectExpr("a").filter(...)
への変更は許可されます。 - 同じ出力スキーマによるprojectionの変更は許可されます:
sdf.selectExpr("stringColumn AS json").writeStream
からsdf.select(to_json(...).as("json")).writeStream
への変更は許可されます。 - 異なる出力スキーマによるprojectionの変更は条件付きで許可されます: 出力シンクが
"a"
から"b"
へのスキーマ変更を許可しているのであれば、sdf.selectExpr("a").writeStream
からsdf.selectExpr("b").writeStream
への変更は許可されます。
- フィルターの追加削除は許可されます:
-
ステートフルなオペレーションにおける変更: 結果を継続的に更新するために、ストリーミングクエリーにおける幾つかのオペレーションはステートデータを維持する必要があります。構造化ストリーミングは自動で状態のチェックポイントをフォールトトレラントなストレージ(例えば、DBFS、AWS S3、Azure Blob storage)に作成し、再起動後にレストアします。しかし、ステートデータのスキーマは再起動の合間で変更がないことを前提としています。これは、再起動の合間にストリーミングクエリーのステートフルオペレーションに対するいかなる変更(追加、削除、スキーマ変更)も許可されないことを意味します。ステートのリカバリーを確実にするために、再起動の合間にスキーマを変更すべきではないステートフルオペレーションのリストを示します。
-
ストリーミングの集計: 例えば、
sdf.groupBy("a").agg(...)
。グルーピングのキー、集計の数やタイプの変更は許可されません。 -
ストリーミングの重複排除: 例えば、
sdf.dropDuplicates("a")
。グルーピングのキー、集計の数やタイプの変更は許可されません。 -
ストリームとストリームのjoin: 例えば、
sdf1.join(sdf2, ...)
(ここでは両方の入力がsparkSession.readStream
で生成されます)。スキーマやjoinカラムの変更は許可されません。joinタイプ(inner/outer)の変更は許可されません。他のjoinの条件の変更の結果も未定になる可能性があります。 -
任意のステートフルオペレーション: 例えば、
sdf.groupByKey(...).mapGroupsWithState(...)
やsdf.groupByKey(...).flatMapGroupsWithState(...)
。ユーザー定義のステートのスキーマやタイムアウトのタイプの変更は許可されません。ユーザー定義のステートマッピング関数におけるいかなる変更は許可されますが、変更のセマンティックな影響は、ユーザー定義ロジックに依存します。ステートのスキーマ変更をサポートしたいのであれば、明示的に複雑なステートデータの構造を、スキーママイグレーションをサポートするエンコーディング/デコーディングスキーマを用いて、バイトコードにエンコード/デコードすることができます。例えば、Avroエンコードされたバイトコードとしてステートを保存するのであれば、バイナリーのステートは常に問題なくレストアされるので、クエリー再起動の合間にAvroのステートスキーマを自由に変更することができます。
-
ストリーミングの集計: 例えば、