Table streaming reads and writes | Databricks on AWS [2022/5/26時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Delta LakeはreadStream
とwriteStream
を通じてSpark構造化ストリーミングと深くインテグレーションされています。Delta Lakeは以下のようなストリーミングシステム、ファイルと関連する多くの制限を克服します。
- 低レーテンシーのデータ取り込みによって生成される小規模ファイルのコンパクト化
- 一つ以上のストリーミング(あるいは同時実行バッチジョブ)による「一度のみ(exactly-once)」の処理の維持
- ストリームのソースとしてファイルを用いる際に新規ファイルを効率的に特定
プロダクションにおける構造化ストリーミングもご覧ください。
ソースとしてのDeltaテーブル
Deltaテーブルをストリームのソースとしてロードし、ストリーミングクエリーで使用する際、クエリーはテーブルにある全てのデータ、そしてストリームが起動後に到着する新規のデータを処理します。
ストリームとしてパスあるいはテーブルの両方をロードすることができます。
spark.readStream.format("delta")
.load("/tmp/delta/events")
import io.delta.implicits._
spark.readStream.delta("/tmp/delta/events")
あるいは
import io.delta.implicits._
spark.readStream.format("delta").table("events")
入力レートの制限
マイクロバッチを制御するために以下のオプションを利用することができます。
-
maxFilesPerTrigger
: マイクロバッチごとにどのくらいの新規ファイルが想定されるか。デフォルトは1000です。 -
maxBytesPerTrigger
: マイクロバッチごとにどのくらいのデータを処理するのか。このオプションは「ソフトマックス」を設定し、最小の入力ユニットがこの制限を超えた場合に、ストリーミングクエリーが前進できるようにするために、バッチはこの値の近似値の量のデータを処理し、この制限以上のデータを処理できるようにしていることを意味します。ストリーミングTrigger.Once
を使用する際、このオプションは無視されます。デフォルトでこれは設定されません。
maxBytesPerTrigger
とmaxFilesPerTrigger
を組み合わせて使用する場合、マイクロバッチはmaxFilesPerTrigger
かmaxBytesPerTrigger
のどちらかが制限に達するまで処理を行います。
注意
logRetentionDuration
設定によって、ソーステーブルのトランザクションがクリーンアップされ、ストリームの処理に遅延が起きている場合、Delta Lakeはソーステーブルで利用できる最新のトランザクション履歴に対応するデータを処理しますが、ストリームの処理を失敗させません。これによって、データの削除が起こる場合があります。
アップデートとデリートの無視
構造化ストリーミングは追記(append)ではない入力を取り扱わず、ソースとして使われているテーブルに変更が生じると例外をスローします。自動で後段に伝播されない変更を取り扱う際には主要な戦略が2つ存在します。
- アウトプットとチェックポイントを削除し、最初からストリームを再起動することができます。
- 以下のいずれかのオプションを設定することができます。
-
ignoreDeletes
: パーティション境界でデータを削除するトランザクションを無視します。 -
ignoreChanges
: (パーティション内の)UPDATE
、MERGE INTO
、DELETE
、そしてOVERWRITE
のようなデータ変更オペレーションにより、ファイルがソーステーブルに再度書き込みをしなくてはならない場合、アップデートを再処理します。変更がない行も放出される場合があり、後段のデータ消費者においては重複を取り扱わなくてはならないケースがあります。削除処理は後段に伝播しません。ignoreChanges
はignoreDeletes
を含みます。このため、ignoreChanges
を使用している場合、お使いのストリームは、ソーステーブルに対する削除や更新に邪魔されることはありません。
-
サンプル
例えば、date
でパーティショニングされており、date
、user_email
、action
カラムを持つテーブルuser_events
があるとします。お使いのストリームはuser_events
テーブルから流れ出しており、GDPR対応のためにデータを削除しなくてはならないとします。
パーティション境界で削除を行う際(パーティションカラムに対するWHERE
が存在する)、ファイルは既に値でセグメント分けされているので、DELETEは単純にメタデータからこれらのファイルを削除します。このため、いくつかのパーティションからデータを削除したい場合には、以下を使用することができます。
spark.readStream.format("delta")
.option("ignoreDeletes", "true")
.load("/tmp/delta/user_events")
しかし、user_email
に基づいてデータを削除したい場合には、以下を使用する必要があります。
spark.readStream.format("delta")
.option("ignoreChanges", "true")
.load("/tmp/delta/user_events")
UPDATE
を用いてuser_email
をアップデートした場合、対象となるuser_email
を含むファイルは再書き込みされます。ignoreChanges
を使うと、新規レコードは同じファイルの存在する変更のない全てのレコードとともに後段に伝播します。お使いのロジックにおいては、これらの重複レコードを取り扱えるようになっている必要があります。
初期ポジションの指定
注意
この機能はDatabricksランタイム7.3LTS以降で利用できます。
テーブル全体を処理することなしに、Delta Lakeストリーミングソースの開始地点を指定するために以下のオプションを使用することができます。
-
startingVersion
: スタートするDelta Lakeのバージョン。このバージョン(このバージョンを含みます)からスタートする全てのテーブルの変更は、ストリーミングソースによって読み込まれます。DESCRIBE HISTORYコマンドのversion
カラムからコミットバージョンを取得することができます。Databricksランタイム7.4以降で最新の変更のみを取得するには
latest
を指定します。 -
startingTimestamp
: スタートするタイムスタンプ。このタイムスタンプ以降(このタイムスタンプを含みます)にコミットされた全てのテーブル変更コミットはストリーミングソースによって読み込まれます。以下のいずれかを指定します。- タイムスタンプの文字列。例えば、
"2019-01-01T00:00:00.000Z"
- dateの文字列。例えば、
"2019-01-01"
- タイムスタンプの文字列。例えば、
同時に両方のオプションを設定することはできません。いずれか一方のみを指定できます。新規にストリーミングクエリーを起動した場合にのみ効果を発揮します。ストリーミングクエリーが起動しており、進捗がチェックポイントに記録されている場合は、このオプションは無視されます。
重要!
指定したバージョン、タイムスタンプからストリーミングソースを起動することはできますが、ストリーミングそーうのスキーマは常にDeltaテーブルの最新のスキーマとなります。指定したバージョンあるいはタイムスタンプ以降にDeltaテーブルに互換性のないスキーマ変更がないことを確認してください。さもないと、ストリーミングソースが不正なスキーマを用いてデータを読み込んだ際に不正な結果を返す場合があります。
サンプル
例えば、テーブルuser_events
があるとします。バージョン5以降の変更を読み込みたい場合には、以下を使用します。
spark.readStream.format("delta")
.option("startingVersion", "5")
.load("/tmp/delta/user_events")
2018-10-18以降の変更を読み込みたい場合には、以下を使用します。
spark.readStream.format("delta")
.option("startingTimestamp", "2018-10-18")
.load("/tmp/delta/user_events")
シンクとしてのDeltaテーブル
構造化ストリーミングを用いてデータをDeltaテーブルに書き込むこともできます。トランザクションログを用いることで、テーブルに対して別のストリームやバッチクエリーが同時に動作していたとしても、Delta Lakeはexactly-onceの処理を保証します。
注意
Delta LakeのVACUUM
機能は、Delta Lakeによって管理されな全てのファイルを削除しますが、_
で始まるディレクトリは全てスキップします。<table_name>/_checkpoints
のようなディレクトリ構造を用いることで他のデータ、メタデータとともに安全にチェックポイントを格納することができます。
メトリクス
注意
Databricksランタイム8.1以降で利用できます。
numBytesOutstanding
とnumFilesOutstanding
としてストリーミングクエリー処理で処理されるバイト数とファイル数を確認することができます。ノートブックでストリームを実行している場合、ストリーミングクエリー進捗ダッシュボードのRaw Dataタブでこれらのメトリクスを確認することができます。
{
"sources" : [
{
"description" : "DeltaSource[file:/path/to/source]",
"metrics" : {
"numBytesOutstanding" : "3456",
"numFilesOutstanding" : "8"
},
}
]
}
Appendモード
デフォルトでは、ストリームはappendモードで動作し、新規レコードをテーブルに追加します。
pathの方法を使用することができます。
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/_checkpoints/")
.start("/delta/events")
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.start("/tmp/delta/events")
import io.delta.implicits._
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.delta("/tmp/delta/events")
あるいは、Spark 3.1以降(Databricksランタイム8.3以降)では以下のようにtoTable
メソッドを使い、Spark 3.1以前(Databricksランタイム8.2以前)ではtable
メソッドを使います。
events.writeStream
.format("delta")
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
events.writeStream
.outputMode("append")
.option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
.toTable("events")
Completeモード
バッチごとにテーブル全体を置き換えるために構造化ストリーミングを使用することもできます。ユースケールの例としては、集計処理を用いたサマリーの計算があります。
(spark.readStream
.format("delta")
.load("/tmp/delta/events")
.groupBy("customerId")
.count()
.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.start("/tmp/delta/eventsByCustomer")
)
spark.readStream
.format("delta")
.load("/tmp/delta/events")
.groupBy("customerId")
.count()
.writeStream
.format("delta")
.outputMode("complete")
.option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
.start("/tmp/delta/eventsByCustomer")
上のサンプルでは、顧客によるイベントの数の集計値を含むテーブルを継続的に更新します。
より緩いレーテンシー要件を持つアプリケーションにおいては、一度のみのトリガーを用いることで計算リソースを節約することができます。特定のスケジュールでサマリーの集計テーブルを更新するためにこれらを使用し、最後のアップデート以降に到着した新規データのみを処理します。
冪等数のあるマルチテーブルの書き込み
注意
Databricksランタイム8.4以降で利用できます。
このセクションでは、foreachBatchコマンドを用いた複数テーブルへの書き込み方を説明します。foreachBatch
を用いることで、ストリーミングクエリーにおけるマイクロバッチの出力が複数のターゲットに対して書き込まれます。しかし、foreachBatch
は書き込み試行はバッチが再実行されたかどうかに関する情報を持っていないので、書き込みは冪等性があるものではありません。例えば、失敗したバッチの再実行によって、重複したデータの書き込みにつながる場合があります。
これに対応するために、書き込みを冪等性あるものにするために、Deltaテーブルは以下のDataFrameWriterオプションをサポートしています。
-
txnAppId
: データフレームの書き込みごとに引き渡すユニークな文字列。例えば、txnAppId
としてストリーミングクエリーのIDを使用することができます。 -
txnVersion
: トランザクションのバージョンとして動作する短調増加する数字。
重複する書き込みを特定し、無視するためにtxnAppId
とtxnVersion
の組み合わせを使用することができます。
障害によってバッチ書き込みが阻害されると、バッチの再実行では同じアプリケーションID、バッチIDを使用し、ランタイムが適切に重複した書き込みを特定しそれらを無視する役に立ちます。アプリケーションID(txnAppId
)には、ユーザーが生成したユニークな文字列を指定することができ、必ずしもストリームIDである必要はありません。
警告!
ストリーミングのチェックポイントを削除し、クエリーを再起動した場合、ことなるappId
を指定しなくてはなりません。さもないと、同じtxnAppId
と0からスタートするバッチIDを含むので、再起動したクエリーからの書き込みは無視されます。
サンプル
app_id = ... # A unique string that is used as an application ID.
def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 1
batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...) // location 2
}
ストリーム-スタティックjoinの実行
ストリーム-スタティックjoinを実行するために、Detla Lakeのバージョン管理のプロトコルとトランザクション保証を活用することができます。ストリーム-スタティックjoinはDeltaテーブルの最新の適切なバージョンとステートレスjoinを用いてデータストリームをjoinします。
Databricksでストリーム-スタティックjoinでマイクロバッチのデータを処理する際、スタティックなDeltaテーブルの最新の適正なバージョンと、現在のマイクロバッチに存在するレコードをjoinします。joinはステートレスなので、ウォーターマークを設定する必要はなく、低レーテンシーで処理をこなうことができます。joinで使用するスタティックなDeltaテーブルのデータは変化の速度が遅い(slowly-changing)ものである必要があります。
streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")
query = (streamingDF
.join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
.writeStream
.option("checkpointLocation", checkpoint_path)
.table("orders_with_customer_info")
)