Write to arbitrary data sinks | Databricks on AWS [2021/6/6時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
既存のストリーミングシンクを持たないデータソースにストリーミングクエリーの出力を書き込む方法として、構造化ストリーミングAPIは二つの手段foreachBatch()
とforeach()
を提供しています。
foreachBatch()
による既存のバッチデータソースの再利用
streamingDF.writeStream.foreachBatch(...)
を用いることで、ストリーミングクエリーの個々のマイクロバッチの出力データに対して実行される関数を指定することができます。二つのパラメーターを受け取ります。マイクロバッチの出力データを保持するデータフレームかデータセットと、マイクロバッチのユニークIDです。foreachBatch
を用いることで、以下のことが可能となります。
既存バッチデータソースの再利用
多くのストレージシステムにおいては、ストリーミングシンクが利用できない場合がありますが、バッチクエリーに対するデータライターは既に存在する場合があります。foreachBatch()
を用いることで、マイクロバッチごとの出力に対してバッチのデータライターを使用することができます。以下にいくつかの例を示します。
この他の多くのバッチデータソースをforeachBatch()
から使用することができます。
複数の場所への書き込み
複数の場所にストリーミングクエリーの出力を書き込みたい場合には、シンプルに複数回データフレーム/データセットの出力を書き込むことができます。しかし、書き込みのそれぞれの試行は、(入力データの再読み込みの可能性を含み)出力データの再計算が必要になる場合があります。再計算を避けるためには、出力のデータフレーム/データセットをキャッシュし、それを複数の場所に書き込み、キャッシュを解除します。概要を以下に示します。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
batchDF.persist()
batchDF.write.format(...).save(...) // location 1
batchDF.write.format(...).save(...) // location 2
batchDF.unpersist()
}
注意
batchDF
に複数のSparkジョブを実行する際、(StreamingQueryProgress
やノートブックのレートグラフで参照できる)ストリーミングクエリーの入力データレートが、ソースでデータが生成される実際のレートの倍数としてレポートされる場合があります。これは、バッチごとに複数のSparkジョブで複数回入力データが読み込まれることがあるためです。
追加のデータフレームオペレーションの適用
Sparkでは多くの場合でインクリメンタルなプランの生成をサポートしていないため、ストリーミングデータフレームでは、多くのデータフレーム、データセットオペレーションがサポートされていません。foreachBatch()
を用いることで、それぞれのマイクロバッチの出力に対して、これらのオペレーションのいくつかを適用することができます。例えば、ストリーミング集計処理の出力をアップデートモードでDeltaテーブルに書き込むために、foreachBath()
とMERGE INTO
オペレーションを使用することができます。詳細はMERGE INTOをご覧ください。
重要!
-
foreachBatch()
は最低限1回(at-least-once)の書き込み保証のみを提供します。しかし、出力の重複排除を行い、確実に一回処理(exactly-once)の保証を行うために、関数にbatchId
を渡すことができます。いずれの場合、自身でエンドツーエンドのセマンティクスに対する理由づけを行う必要があります。 -
foreachBatch()
はストリーミングクエリーのマイクロバッチ実行に基本的に依存しているので、連続処理モードでは動作しません。連続モードでデータを書き込む際には、代わりにforeach()
を使ってください。
foreach()
を用いた任意の場所への書き込み
foreachBatch()
を使用できない場合(例えば、Databricksランタイム4.2以下を使用している、対応しているバッチデータライターが無いなど)、foreach()
を用いてカスタムライターロジックを表現することができます。特に、データ書き込みロジックを3つのメソッドopen()
、process()
、close()
に分割して表現することができます。
例えば、Write to Amazon DynamoDB using foreach() in Scala and Pythonをご覧ください。
ScalaあるいはJavaの使用
ScalaあるいはJavaでは、ForeachWriterクラスを拡張します。
datasetOfString.writeStream.foreach(
new ForeachWriter[String] {
def open(partitionId: Long, version: Long): Boolean = {
// Open connection
}
def process(record: String) = {
// Write string to connection
}
def close(errorOrNull: Throwable): Unit = {
// Close the connection
}
}
).start()
Pythonの使用
Pythonでは、2つの方法でforeach
を呼び出します。関数内からかオブジェクト内からです。関数は処理ロジックを表現するシンプルな方法を提供しますが、障害によって入力データの再処理が必要となった際に、生成されたデータの重複排除を行うことができません。このような場合には、オブジェクト内で処理ロジックを指定しなくてはなりません。
-
行として入力を受け取る関数
Pythondef processRow(row): // Write row to storage query = streamingDF.writeStream.foreach(processRow).start()
-
process
メソッドとオプションでopen
とclose
メソッドを持つオブジェクトPythonclass ForeachWriter: def open(self, partition_id, epoch_id): // Open connection. This method is optional in Python. def process(self, row): // Write row to connection. This method is not optional in Python. def close(self, error): // Close the connection. This method is optional in Python. query = streamingDF.writeStream.foreach(ForeachWriter()).start()
実行セマンティクス
ストリーミングクエリーがスタートすると、Sparkは以下の方法で関数あるいはオブジェクトのメソッドを呼び出します。
- このオブジェクトの単一のコピーが、クエリーの単一のタスクで生成されるすべてのデータに責任を持ちます。言い換えると、分散処理で生成されたデータの一つのパーティションの処理に一つのインスタンスが責任を持ちます。
- それぞれのタスクは、提供されたオブジェクトの最新のシリアライズされた、あるいは、デシリアライズされたコピーを取得するので、オブジェクトはシリアライズ可能でなくてはなりません。このため、データ書き込みにおけるいかなる初期化処理は、他タスクがデータ生成をできることを意味する
open()
メソッドの呼び出しの後に行うことを強くお勧めします。 - メソッドのライフサイクルは以下のようになります。
-
partition_id
を持つそれぞれのパーティションに対して: -
epoch_id
を持つそれぞれのストリーミングデータのバッチ/エポックに対して: -
open(partitionId, epochId)
がコールされます。 -
open(...)
がtrueを返すと、パーティションとバッチ/エポックのそれぞれの行に対してprocess(row)
がコールされます。 - 行を処理している間にエラーが発生すると
close(error)
メソッドがコールされます
-
-
open()
メソッドが存在し、(戻り値に関係なく)戻り値が返却されると、JVMやPythonプロセスが途中でクラッシュしない限り、(存在する場合には)close()
メソッドがコールされます。
注意
open()
メソッドのpartitionId
とepochId
は、入力データの再処理を必要とする障害があった際に重複排除を行う際に使用することができます。これは、クエリーの実行モードに依存します。マイクロバッチモードでストリーミングクエリーが実行されている場合、ユニークなタプル(partition_id
, epoch_id
)で表現されるすべてのパーティションは同じデータを持つことが保証されます。 このため、(partition_id
, epoch_id
)は重複排除や、トランザクション的にデータをコミットしたり、exactly-once保証を実現するために活用することができます。しかし、ストリーミングクエリーが連続モードで実行されている場合、保証はできないので重複排除に使用すべきではありません。