Streaming data processing | Databricks on AWS [2022/7/12時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
多くのアプリケーションでは、継続的に到着するデータに基づいてテーブルを更新する必要があります。しかし、データサイズが増加すると、アップデートごとにデータを再処理するのに必要なリソースは膨大なものとなります。そこで、継続的に到着するデータをインクリメンタルに処理するためにストリーミングテーブルあるいはストリーミングビューを定義することができます。ストリーミングテーブル、ビューは新規データを取り込むコストと新規データを利用できるようになるまでのレーテンシーを削減します。
パイプラインのアップデートが起動されると、ストリーミングテーブル、ストリーミングビューは最後のアップデート以降に到着した新規データのみを処理します。Delta Live Tablesランタイムによって、既に処理済みのデータは自動で追跡されます。
外部データソースからのストリーミングデータ取り込み
ストリーミングデータを取り込むには、ストリーミングソースからストリーミングライブテーブルを定義する必要があります。例えば、以下のコードを用いて外部のデータをストリームとして読み込むことができます。
inputPath = "/databricks-datasets/structured-streaming/events/"
@dlt.table
def streaming_bronze_table():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(inputPath)
)
CREATE OR REFRESH STREAMING LIVE TABLE streaming_bronze_table
AS SELECT * FROM cloud_files("/databricks-datasets/structured-streaming/events/", "json")
パイプライン内の別のデータセットからのストリーミング読み込み
また、同じパイプラインの別のテーブルからデータをストリーミングすることができます。
@dlt.table
def streaming_silver_table:
return dlt.read_stream("streaming_bronze_table").where(...)
CREATE OR REFRESH STREAMING LIVE TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(LIVE.streaming_bronze_table)
WHERE ...
単一のパイプラインでストリーミングデータとバッチデータを処理する
ストリーミングライブテーブルではApache Sparkの構造化ストリーミングを使用しているので、ストリーミングライブテーブルは、ソーステーブルに新規の行が追加される追加クエリーのみを処理することができます。ソーステーブルからのアップデートの処理、例えば、mergeやdeleteはサポートされていません。updateを処理するには、APPLY CHANGES INTOコマンドを使用します。
一般的なストリーミングのパターンには、パイプラインの初期データセットを作成するためにソースデータを取り込むというものがあります。これらの初期データセットは一般的にはブロンズテーブルと呼ばれ、多くの場合シンプルな変換処理が行われます。このようなシンプルな変換処理であっても、JSONのような非効率的なフォーマットの再処理は膨大な時間を要するようになり、このようなケースでストリーミングライブテーブルが役立ちます。
一方、パイプラインの最終テーブルは一般的にゴールドテーブルと呼ばれ、多くの場合、複雑な集計処理やAPPLY CHANGES INTO
コマンドのターゲットであるソースからの読み込みを必要とします。これらのオペレーションは本来、追加ではなく更新を行うので、これらはストリーミングライブテーブルの入力としてサポートされていません。これらの変換処理はライブテーブルとしてのマテリアライゼーションの方が適しています。
一つのパイプラインにストリーミングライブテーブルとライブテーブルを混在させることで、パイプラインをシンプルにすることができ、生データの再取り込み、再処理のコストを回避し、効率的にエンコードされ、フィルタリングされたデータセットに対して複雑な集計処理を行うために、SQLのフルパワーを活用することができます。以下のサンプルでは、このようなタイプの混成処理を示しています。
@dlt.table
def streaming_bronze():
return (
# Since this is a streaming source, this table is incremental.
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("s3://path/to/raw/data")
)
@dlt.table
def streaming_silver():
# Since we read the bronze table as a stream, this silver table is also
# updated incrementally.
return dlt.read_stream("streaming_bronze").where(...)
@dlt.table
def live_gold():
# This table will be recomputed completely by reading the whole silver table
# when it is updated.
return dlt.read("streaming_silver").groupBy("user_id").count()
CREATE OR REFRESH STREAMING LIVE TABLE streaming_bronze
AS SELECT * FROM cloud_files(
"s3://path/to/raw/data", "json"
)
CREATE OR REFRESH STREAMING LIVE TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...
CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id
インクリメンタル処理のためにS3からJSONファイルを効率的に読み込むために、どのようにAuto Loaderを使うのかに関してはこちらを参照ください。
ストリーミングjoin
Delta Live Tablesでは、テーブルを更新するために様々なjoin戦略をサポートしています。
ストリーム・バッチjoin
ストリーム・バッチjoinは、追加のみのデータの継続的ストリーミングを主要な静的ディメンジョンテーブルで非正規化する際には良い選択肢となります。派生データセットがアップデートされるたびに、ストリームからの新規レコードはバッチテーブルのアップデートが開始した際の静的スナップショットとjoinされます。静的テーブルに追加、更新されたレコードは、テーブルのフルリフレッシュが行われるまでは反映されません。
以下の例ではストリーム・バッチjoinを行なっています。
@dlt.table
def customer_sales():
return dlt.read_stream("sales").join(read("customers"), ["customer_id"], "left")
CREATE OR REFRESH STREAMING LIVE TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
INNER JOIN LEFT LIVE.customers USING (customer_id)
連続パイプラインでは、joinのバッチ側はマイクロバッチごとに定期的にポーリングされ、アップデートされます。
ストリーミングの集計処理
count、min、max、sumのようなシンプルな配分的集計や、平均値や標準偏差のような代数集計は、ストリーミングライブテーブルでインクリメンタルに計算することができます。GROUP BY country
句を伴うクエリーのように、限定数のグループを伴うクエリーに対するインクリメンタル集計を行うことをお勧めします。それぞれのアップデートで新規の入力データのみが読み込まれます。