LoginSignup
0
0

More than 1 year has passed since last update.

Delta Live Tablesにおけるストリーミングデータ処理

Posted at

Streaming data processing | Databricks on AWS [2022/7/12時点]の翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

多くのアプリケーションでは、継続的に到着するデータに基づいてテーブルを更新する必要があります。しかし、データサイズが増加すると、アップデートごとにデータを再処理するのに必要なリソースは膨大なものとなります。そこで、継続的に到着するデータをインクリメンタルに処理するためにストリーミングテーブルあるいはストリーミングビューを定義することができます。ストリーミングテーブル、ビューは新規データを取り込むコストと新規データを利用できるようになるまでのレーテンシーを削減します。

パイプラインのアップデートが起動されると、ストリーミングテーブル、ストリーミングビューは最後のアップデート以降に到着した新規データのみを処理します。Delta Live Tablesランタイムによって、既に処理済みのデータは自動で追跡されます。

外部データソースからのストリーミングデータ取り込み

ストリーミングデータを取り込むには、ストリーミングソースからストリーミングライブテーブルを定義する必要があります。例えば、以下のコードを用いて外部のデータをストリームとして読み込むことができます。

Python
inputPath = "/databricks-datasets/structured-streaming/events/"

@dlt.table
def streaming_bronze_table():
  return (
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load(inputPath)
  )
SQL
CREATE OR REFRESH STREAMING LIVE TABLE streaming_bronze_table
AS SELECT * FROM cloud_files("/databricks-datasets/structured-streaming/events/", "json")

パイプライン内の別のデータセットからのストリーミング読み込み

また、同じパイプラインの別のテーブルからデータをストリーミングすることができます。

Python
@dlt.table
def streaming_silver_table:
  return dlt.read_stream("streaming_bronze_table").where(...)
SQL
CREATE OR REFRESH STREAMING LIVE TABLE streaming_silver_table
AS SELECT
  *
FROM
  STREAM(LIVE.streaming_bronze_table)
WHERE ...

単一のパイプラインでストリーミングデータとバッチデータを処理する

ストリーミングライブテーブルではApache Sparkの構造化ストリーミングを使用しているので、ストリーミングライブテーブルは、ソーステーブルに新規の行が追加される追加クエリーのみを処理することができます。ソーステーブルからのアップデートの処理、例えば、mergeやdeleteはサポートされていません。updateを処理するには、APPLY CHANGES INTOコマンドを使用します。

一般的なストリーミングのパターンには、パイプラインの初期データセットを作成するためにソースデータを取り込むというものがあります。これらの初期データセットは一般的にはブロンズテーブルと呼ばれ、多くの場合シンプルな変換処理が行われます。このようなシンプルな変換処理であっても、JSONのような非効率的なフォーマットの再処理は膨大な時間を要するようになり、このようなケースでストリーミングライブテーブルが役立ちます。

一方、パイプラインの最終テーブルは一般的にゴールドテーブルと呼ばれ、多くの場合、複雑な集計処理やAPPLY CHANGES INTOコマンドのターゲットであるソースからの読み込みを必要とします。これらのオペレーションは本来、追加ではなく更新を行うので、これらはストリーミングライブテーブルの入力としてサポートされていません。これらの変換処理はライブテーブルとしてのマテリアライゼーションの方が適しています。

一つのパイプラインにストリーミングライブテーブルとライブテーブルを混在させることで、パイプラインをシンプルにすることができ、生データの再取り込み、再処理のコストを回避し、効率的にエンコードされ、フィルタリングされたデータセットに対して複雑な集計処理を行うために、SQLのフルパワーを活用することができます。以下のサンプルでは、このようなタイプの混成処理を示しています。

Python
@dlt.table
def streaming_bronze():
  return (
    # Since this is a streaming source, this table is incremental.
    spark.readStream.format("cloudFiles")
      .option("cloudFiles.format", "json")
      .load("s3://path/to/raw/data")
  )

@dlt.table
def streaming_silver():
  # Since we read the bronze table as a stream, this silver table is also
  # updated incrementally.
  return dlt.read_stream("streaming_bronze").where(...)

@dlt.table
def live_gold():
  # This table will be recomputed completely by reading the whole silver table
  # when it is updated.
  return dlt.read("streaming_silver").groupBy("user_id").count()
SQL
CREATE OR REFRESH STREAMING LIVE TABLE streaming_bronze
AS SELECT * FROM cloud_files(
  "s3://path/to/raw/data", "json"
)

CREATE OR REFRESH STREAMING LIVE TABLE streaming_silver
AS SELECT * FROM STREAM(LIVE.streaming_bronze) WHERE...

CREATE OR REFRESH LIVE TABLE live_gold
AS SELECT count(*) FROM LIVE.streaming_silver GROUP BY user_id

インクリメンタル処理のためにS3からJSONファイルを効率的に読み込むために、どのようにAuto Loaderを使うのかに関してはこちらを参照ください。

ストリーミングjoin

Delta Live Tablesでは、テーブルを更新するために様々なjoin戦略をサポートしています。

ストリーム・バッチjoin

ストリーム・バッチjoinは、追加のみのデータの継続的ストリーミングを主要な静的ディメンジョンテーブルで非正規化する際には良い選択肢となります。派生データセットがアップデートされるたびに、ストリームからの新規レコードはバッチテーブルのアップデートが開始した際の静的スナップショットとjoinされます。静的テーブルに追加、更新されたレコードは、テーブルのフルリフレッシュが行われるまでは反映されません。

以下の例ではストリーム・バッチjoinを行なっています。

Python
@dlt.table
def customer_sales():
  return dlt.read_stream("sales").join(read("customers"), ["customer_id"], "left")
SQL
CREATE OR REFRESH STREAMING LIVE TABLE customer_sales
AS SELECT * FROM STREAM(LIVE.sales)
  INNER JOIN LEFT LIVE.customers USING (customer_id)

連続パイプラインでは、joinのバッチ側はマイクロバッチごとに定期的にポーリングされ、アップデートされます。

ストリーミングの集計処理

count、min、max、sumのようなシンプルな配分的集計や、平均値や標準偏差のような代数集計は、ストリーミングライブテーブルでインクリメンタルに計算することができます。GROUP BY country句を伴うクエリーのように、限定数のグループを伴うクエリーに対するインクリメンタル集計を行うことをお勧めします。それぞれのアップデートで新規の入力データのみが読み込まれます。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0