Streaming data processing | Databricks on AWS [2022/4/5時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
多くのアプリケーションでは、継続的に到着するデータに基づいて更新されるテーブルを必要とします。しかし、データサイズが大きくなると、それぞれの更新でデータを処理するのに必要となるリソースは法外なものとなり得ます。継続的に到着するデータをインクリメンタルに計算するためのストリーミングテーブルあるいはビューを定義することができます。ストリーミングテーブルとビューは、新規データ取り込みのコストと新規データが利用可能になるまでのレーテンシーを削減します。
パイプラインに対するアップデートがトリガーされると、ストリーミングテーブル、ストリーミングビューは、最後のアップデート以降に到着した新規データのみを処理をします。既に処理されたデータはDelta Live Tablesのランタイムによって自動で追跡されます。
外部データソースからのストリーミング取り込み
ストリーミングデータを取り込むには、ストリーミングソースからストリーミングライブテーブルを定義する必要があります。例えば、以下のコードで外部データをストリームとして読み込むことができます。
inputPath = "/databricks-datasets/structured-streaming/events/"
@dlt.table
def streaming_bronze_table():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(inputPath)
)
CREATE STREAMING LIVE TABLE streaming_bronze_table
AS SELECT * FROM cloud_files("/databricks-datasets/structured-streaming/events/", "json")
パイプラインにおける他のデータセットからのストリーミング
同じパイプラインの他のテーブルからデータをストリーミングすることもできます。
inputPath = "/databricks-datasets/structured-streaming/events/"
@dlt.table
def streaming_bronze_table():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(inputPath)
)
CREATE STREAMING LIVE TABLE streaming_bronze_table
AS SELECT * FROM cloud_files("/databricks-datasets/structured-streaming/events/", "json")
単一パイプラインにおけるストリーミングとバッチデータの処理
ストリーミングライブテーブルは構造化ストリーミングを使用するので、ストリーミングライブテーブルは追加クエリー、すなわち、ソーステーブルに追加されるクエリーのみを処理することができます。ソーステーブルからの更新処理、例えば、マージや削除はサポートされません。更新を処理するためには、APPLY CHANGES INTOコマンドを参照ください。
一般的なストリーミングパターンには、パイプラインの初期データセットを作成するためのソースデータの取り込みが含まれます。これらの初期データセットは一般的にはブロンズテーブルと呼ばれ、多くの場合シンプルな変換処理が行われます。JSONのような非効率的なフォーマットの再処理はシンプルな変換処理であってもコストが法外なものとなり、まさにここでストリーミングライブテーブルが最適な選択肢となります。
逆に、ゴールドテーブルと呼ばれるパイプラインの最終テーブルは多くの場合、複雑な集計処理やAPPLY CHANGES INTO
オペレーションのターゲットであるソースからの読み込みの結果となります。これらのオペレーションは性質上、追加よりも更新処理となり、これらはストリーミングライブテーブルの入力としてはサポートされません。これらの変換はライブテーブルとしてのマテリアライゼーションの方が適しています。
ストリーミングライブテーブルとライブテーブルを一つのパイプラインに組み込むことで、パイプラインをシンプルにし、コストの高い生データの再取り込みや再処理を避けるとことができ、効率的にエンコードされフィルターされたデータセットに対して複雑な集計処理を行うためにSQLのフルパワーを活用することができます。以下の例では、この種の混成処理の例を示しています。
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return dlt.read_stream("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return dlt.read("streaming_silver").groupBy("user_id").count()
CREATE STREAMING LIVE TABLE streaming_bronze
AS SELECT * FROM cloud_files(
"s3://path/to/raw/data", "json"
)
CREATE STREAMING LIVE TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
インクリメンタルな処理でS3から効率的にJSONファイルを読み込むためのAuto Loaderの活用方法に関してはリンク先を参照ください。
ストリーミングのjoin
Delta Live Tablesでは、テーブルを更新するために様々なjoin戦略をサポートしています。
ストリーム・バッチjoin
ストリーム・バッチjoinは、プライマリーの静的次元テーブルによる追加のみの継続的ストリームの非正規化を行う際には良い選択肢となります。派生データセットが更新されるたびに、ストリームからの新規レコードは、アップデートがスタートする際バッチテーブルの静的なスナップショットとjoinされます。静的テーブルの追加、更新レコードはフルリフレッシュが実行されるまでは反映されません。
以下にストリーム・バッチjoinの例を示します。
@dlt.table
def customer_sales():
return dlt.read_stream("sales").join(read("customers"), ["customer_id"], "left")
CREATE STREAMING LIVE TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
ストリーミングの集計
count、 min、max、sumのようなシンプルな分散可能な集計や、平均や標準偏差のような代数集計も、ストリーミングライブテーブルでインクリメンタルに計算することができます。限定的なグループ、例えばGROUP BY country
句を伴うクエリーを用いてインクリメンタルな集計を行うことをお勧めします。アップデートごとに新規の入力データのみが読み込まれます。