1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta Lakeテーブルに対するストリーミングの読み書き

Posted at

Table streaming reads and writes | Databricks on AWS [2022/5/26時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Delta LakeはreadStreamwriteStreamを通じてSpark構造化ストリーミングと深くインテグレーションされています。Delta Lakeは以下のようなストリーミングシステム、ファイルと関連する多くの制限を克服します。

  • 低レーテンシーのデータ取り込みによって生成される小規模ファイルのコンパクト化
  • 一つ以上のストリーミング(あるいは同時実行バッチジョブ)による「一度のみ(exactly-once)」の処理の維持
  • ストリームのソースとしてファイルを用いる際に新規ファイルを効率的に特定

プロダクションにおける構造化ストリーミングもご覧ください。

ソースとしてのDeltaテーブル

Deltaテーブルをストリームのソースとしてロードし、ストリーミングクエリーで使用する際、クエリーはテーブルにある全てのデータ、そしてストリームが起動後に到着する新規のデータを処理します。

ストリームとしてパスあるいはテーブルの両方をロードすることができます。

Scala
spark.readStream.format("delta")
  .load("/tmp/delta/events")

import io.delta.implicits._
spark.readStream.delta("/tmp/delta/events")

あるいは

Scala
import io.delta.implicits._

spark.readStream.format("delta").table("events")

入力レートの制限

マイクロバッチを制御するために以下のオプションを利用することができます。

  • maxFilesPerTrigger: マイクロバッチごとにどのくらいの新規ファイルが想定されるか。デフォルトは1000です。
  • maxBytesPerTrigger: マイクロバッチごとにどのくらいのデータを処理するのか。このオプションは「ソフトマックス」を設定し、最小の入力ユニットがこの制限を超えた場合に、ストリーミングクエリーが前進できるようにするために、バッチはこの値の近似値の量のデータを処理し、この制限以上のデータを処理できるようにしていることを意味します。ストリーミングTrigger.Onceを使用する際、このオプションは無視されます。デフォルトでこれは設定されません。

maxBytesPerTriggermaxFilesPerTriggerを組み合わせて使用する場合、マイクロバッチはmaxFilesPerTriggermaxBytesPerTriggerのどちらかが制限に達するまで処理を行います。

注意
logRetentionDuration設定によって、ソーステーブルのトランザクションがクリーンアップされ、ストリームの処理に遅延が起きている場合、Delta Lakeはソーステーブルで利用できる最新のトランザクション履歴に対応するデータを処理しますが、ストリームの処理を失敗させません。これによって、データの削除が起こる場合があります。

アップデートとデリートの無視

構造化ストリーミングは追記(append)ではない入力を取り扱わず、ソースとして使われているテーブルに変更が生じると例外をスローします。自動で後段に伝播されない変更を取り扱う際には主要な戦略が2つ存在します。

  • アウトプットとチェックポイントを削除し、最初からストリームを再起動することができます。
  • 以下のいずれかのオプションを設定することができます。
    • ignoreDeletes: パーティション境界でデータを削除するトランザクションを無視します。
    • ignoreChanges: (パーティション内の)UPDATEMERGE INTODELETE、そしてOVERWRITEのようなデータ変更オペレーションにより、ファイルがソーステーブルに再度書き込みをしなくてはならない場合、アップデートを再処理します。変更がない行も放出される場合があり、後段のデータ消費者においては重複を取り扱わなくてはならないケースがあります。削除処理は後段に伝播しません。ignoreChangesignoreDeletesを含みます。このため、ignoreChangesを使用している場合、お使いのストリームは、ソーステーブルに対する削除や更新に邪魔されることはありません。

サンプル

例えば、dateでパーティショニングされており、dateuser_emailactionカラムを持つテーブルuser_eventsがあるとします。お使いのストリームはuser_eventsテーブルから流れ出しており、GDPR対応のためにデータを削除しなくてはならないとします。

パーティション境界で削除を行う際(パーティションカラムに対するWHEREが存在する)、ファイルは既に値でセグメント分けされているので、DELETEは単純にメタデータからこれらのファイルを削除します。このため、いくつかのパーティションからデータを削除したい場合には、以下を使用することができます。

Scala
spark.readStream.format("delta")
  .option("ignoreDeletes", "true")
  .load("/tmp/delta/user_events")

しかし、user_emailに基づいてデータを削除したい場合には、以下を使用する必要があります。

Scala
spark.readStream.format("delta")
  .option("ignoreChanges", "true")
  .load("/tmp/delta/user_events")

UPDATEを用いてuser_emailをアップデートした場合、対象となるuser_emailを含むファイルは再書き込みされます。ignoreChangesを使うと、新規レコードは同じファイルの存在する変更のない全てのレコードとともに後段に伝播します。お使いのロジックにおいては、これらの重複レコードを取り扱えるようになっている必要があります。

初期ポジションの指定

注意
この機能はDatabricksランタイム7.3LTS以降で利用できます。

テーブル全体を処理することなしに、Delta Lakeストリーミングソースの開始地点を指定するために以下のオプションを使用することができます。

  • startingVersion: スタートするDelta Lakeのバージョン。このバージョン(このバージョンを含みます)からスタートする全てのテーブルの変更は、ストリーミングソースによって読み込まれます。DESCRIBE HISTORYコマンドのversionカラムからコミットバージョンを取得することができます。

    Databricksランタイム7.4以降で最新の変更のみを取得するにはlatestを指定します。

  • startingTimestamp: スタートするタイムスタンプ。このタイムスタンプ以降(このタイムスタンプを含みます)にコミットされた全てのテーブル変更コミットはストリーミングソースによって読み込まれます。以下のいずれかを指定します。

    • タイムスタンプの文字列。例えば、"2019-01-01T00:00:00.000Z"
    • dateの文字列。例えば、"2019-01-01"

同時に両方のオプションを設定することはできません。いずれか一方のみを指定できます。新規にストリーミングクエリーを起動した場合にのみ効果を発揮します。ストリーミングクエリーが起動しており、進捗がチェックポイントに記録されている場合は、このオプションは無視されます。

重要!
指定したバージョン、タイムスタンプからストリーミングソースを起動することはできますが、ストリーミングそーうのスキーマは常にDeltaテーブルの最新のスキーマとなります。指定したバージョンあるいはタイムスタンプ以降にDeltaテーブルに互換性のないスキーマ変更がないことを確認してください。さもないと、ストリーミングソースが不正なスキーマを用いてデータを読み込んだ際に不正な結果を返す場合があります。

サンプル

例えば、テーブルuser_eventsがあるとします。バージョン5以降の変更を読み込みたい場合には、以下を使用します。

Scala
spark.readStream.format("delta")
  .option("startingVersion", "5")
  .load("/tmp/delta/user_events")

2018-10-18以降の変更を読み込みたい場合には、以下を使用します。

Scala
spark.readStream.format("delta")
  .option("startingTimestamp", "2018-10-18")
  .load("/tmp/delta/user_events")

シンクとしてのDeltaテーブル

構造化ストリーミングを用いてデータをDeltaテーブルに書き込むこともできます。トランザクションログを用いることで、テーブルに対して別のストリームやバッチクエリーが同時に動作していたとしても、Delta Lakeはexactly-onceの処理を保証します。

注意
Delta LakeのVACUUM機能は、Delta Lakeによって管理されな全てのファイルを削除しますが、_で始まるディレクトリは全てスキップします。<table_name>/_checkpointsのようなディレクトリ構造を用いることで他のデータ、メタデータとともに安全にチェックポイントを格納することができます。

メトリクス

注意
Databricksランタイム8.1以降で利用できます。

numBytesOutstandingnumFilesOutstandingとしてストリーミングクエリー処理で処理されるバイト数とファイル数を確認することができます。ノートブックでストリームを実行している場合、ストリーミングクエリー進捗ダッシュボードRaw Dataタブでこれらのメトリクスを確認することができます。

JSON
{
  "sources" : [
    {
      "description" : "DeltaSource[file:/path/to/source]",
      "metrics" : {
        "numBytesOutstanding" : "3456",
        "numFilesOutstanding" : "8"
      },
    }
  ]
}

Appendモード

デフォルトでは、ストリームはappendモードで動作し、新規レコードをテーブルに追加します。

pathの方法を使用することができます。

Python
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/_checkpoints/")
  .start("/delta/events")
Scala
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .start("/tmp/delta/events")

import io.delta.implicits._
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .delta("/tmp/delta/events")

あるいは、Spark 3.1以降(Databricksランタイム8.3以降)では以下のようにtoTableメソッドを使い、Spark 3.1以前(Databricksランタイム8.2以前)ではtableメソッドを使います。

Python
events.writeStream
  .format("delta")
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")
Scala
events.writeStream
  .outputMode("append")
  .option("checkpointLocation", "/tmp/delta/events/_checkpoints/")
  .toTable("events")

Completeモード

バッチごとにテーブル全体を置き換えるために構造化ストリーミングを使用することもできます。ユースケールの例としては、集計処理を用いたサマリーの計算があります。

Python
(spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")
)
Scala
spark.readStream
  .format("delta")
  .load("/tmp/delta/events")
  .groupBy("customerId")
  .count()
  .writeStream
  .format("delta")
  .outputMode("complete")
  .option("checkpointLocation", "/tmp/delta/eventsByCustomer/_checkpoints/")
  .start("/tmp/delta/eventsByCustomer")

上のサンプルでは、顧客によるイベントの数の集計値を含むテーブルを継続的に更新します。

より緩いレーテンシー要件を持つアプリケーションにおいては、一度のみのトリガーを用いることで計算リソースを節約することができます。特定のスケジュールでサマリーの集計テーブルを更新するためにこれらを使用し、最後のアップデート以降に到着した新規データのみを処理します。

冪等数のあるマルチテーブルの書き込み

注意
Databricksランタイム8.4以降で利用できます。

このセクションでは、foreachBatchコマンドを用いた複数テーブルへの書き込み方を説明します。foreachBatchを用いることで、ストリーミングクエリーにおけるマイクロバッチの出力が複数のターゲットに対して書き込まれます。しかし、foreachBatchは書き込み試行はバッチが再実行されたかどうかに関する情報を持っていないので、書き込みは冪等性があるものではありません。例えば、失敗したバッチの再実行によって、重複したデータの書き込みにつながる場合があります。

これに対応するために、書き込みを冪等性あるものにするために、Deltaテーブルは以下のDataFrameWriterオプションをサポートしています。

  • txnAppId: データフレームの書き込みごとに引き渡すユニークな文字列。例えば、txnAppIdとしてストリーミングクエリーのIDを使用することができます。
  • txnVersion: トランザクションのバージョンとして動作する短調増加する数字。

重複する書き込みを特定し、無視するためにtxnAppIdtxnVersionの組み合わせを使用することができます。

障害によってバッチ書き込みが阻害されると、バッチの再実行では同じアプリケーションID、バッチIDを使用し、ランタイムが適切に重複した書き込みを特定しそれらを無視する役に立ちます。アプリケーションID(txnAppId)には、ユーザーが生成したユニークな文字列を指定することができ、必ずしもストリームIDである必要はありません。

警告!
ストリーミングのチェックポイントを削除し、クエリーを再起動した場合、ことなるappIdを指定しなくてはなりません。さもないと、同じtxnAppIdと0からスタートするバッチIDを含むので、再起動したクエリーからの書き込みは無視されます。

サンプル

Pytohn
app_id = ... # A unique string that is used as an application ID.

def writeToDeltaLakeTableIdempotent(batch_df, batch_id):
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 1
  batch_df.write.format(...).option("txnVersion", batch_id).option("txnAppId", app_id).save(...) # location 2
Scala
val appId = ... // A unique string that is used as an application ID.
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 1
  batchDF.write.format(...).option("txnVersion", batchId).option("txnAppId", appId).save(...)  // location 2
}

ストリーム-スタティックjoinの実行

ストリーム-スタティックjoinを実行するために、Detla Lakeのバージョン管理のプロトコルとトランザクション保証を活用することができます。ストリーム-スタティックjoinはDeltaテーブルの最新の適切なバージョンとステートレスjoinを用いてデータストリームをjoinします。

Databricksでストリーム-スタティックjoinでマイクロバッチのデータを処理する際、スタティックなDeltaテーブルの最新の適正なバージョンと、現在のマイクロバッチに存在するレコードをjoinします。joinはステートレスなので、ウォーターマークを設定する必要はなく、低レーテンシーで処理をこなうことができます。joinで使用するスタティックなDeltaテーブルのデータは変化の速度が遅い(slowly-changing)ものである必要があります。

Python
streamingDF = spark.readStream.table("orders")
staticDF = spark.read.table("customers")

query = (streamingDF
  .join(staticDF, streamingDF.customer_id==staticDF.id, "inner")
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .table("orders_with_customer_info")
)

Databricks 無料トライアル

Databricks 無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?