0
0

More than 1 year has passed since last update.

Delta Live Tablesを用いたインクリメンタルなデータの取り込み

Last updated at Posted at 2023-07-13

こちらでは、ダミーデータを使ってDelta Live Tables(DLT)のインクリメンタルなデータの取り込みをデモします。

Delta Live Tablesとは

データパイプラインの変換処理を全て記述することなしに、パイプラインの各ステップ(テーブル)を宣言することで、Delta Live Tablesはテーブル間の処理を補完するので、シンプルなコードで複雑なパイプラインを構築することができます。SQLとPythonをサポートしているので、得意な言語でデータパイプラインを構築できます。また、イベントログ、エラー時のリトライ、自動テスト、オートスケーリングなどがサポートされています。処理の背後ではSparkの構造化ストリーミングやDelta Lakeを活用しています。

前提条件

注意事項

Delta Live Tablesのテーブルの保存先をUnity Catalogにする際、いくつかの制限事項が存在します。

  • Unity Catalog配下に作成されたDelta Live Tablesのテーブルは、共有モードのUnity Catalog対応クラスター、あるいはSQLウェアハウスからのみクエリーすることができます。ユーザー割り当てモード、分離なしモードの共有クラスターからクエリーすることはできません。
  • 共有モードのクラスターでは機械学習ランタイムはサポートされていません。

データの準備

# データ保存先パス
data_path = "/tmp/takaaki.yayoi@databricks.com/dlt/landing"
# ランディングゾーンを初期化
dbutils.fs.rm(data_path, True)

データを読み込みます。このデータのカラム名には空白が含まれています。

df = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=";", header=True)
display(df)

Screenshot 2023-07-13 at 16.57.25.png

データを保存します。

save_path = f"{data_path}/2023/07/06/"
df.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)
display(dbutils.fs.ls(f"{data_path}/2023/07"))

Screenshot 2023-07-13 at 16.58.17.png

DLTパイプラインの定義

SQLノートブックを作成して以下の内容を記述します。TBLPROPERTIESを指定してカラムマッピングを有効化し、処理時刻のカラムを追加しています。

dlt_incremental_load_sql
CREATE
OR REFRESH STREAMING TABLE bronze TBLPROPERTIES (
  'delta.minReaderVersion' = '2',
  'delta.minWriterVersion' = '5',
  'delta.columnMapping.mode' = 'name'
) COMMENT "Parquetをそのまま保持するブロンズテーブル" AS
SELECT
  *,
  current_timestamp() as processed -- 処理時刻
FROM
  cloud_files(
    "/tmp/takaaki.yayoi@databricks.com/dlt/landing/",
    "parquet"
  )

パイプラインの実行

  1. サイドメニューのDelta Live Tablesをクリックします。
    Screenshot 2023-07-13 at 16.59.57.png
  2. パイプラインを作成ボタンをクリックします。
  3. パイプライン名を入力します、ソースコードで、上で作成したノートブックを選択します。
  4. 配信先でUnity Catalogを選択し、カタログを選択、ターゲットスキーマでスキーマ(データベース)を選択します。ここにテーブルが作成されます。
    Screenshot 2023-07-13 at 15.55.23.png
  5. 作成をクリックします。
  6. 開始をクリックします。
  7. 処理が完了すると、テーブルがグリーンになります。
    Screenshot 2023-07-13 at 17.04.24.png

テーブルの確認

  1. サイドメニューからSQLエディタにアクセスし、稼働中のSQLウェアハウスを選択します。
  2. 以下のクエリーを実行します。
    SELECT * FROM takaakiyayoi_catalog.dlt.bronze ORDER BY processed DESC
    
  3. 結果を確認することができます。
    Screenshot 2023-07-13 at 17.07.05.png
  4. 件数も確認します。
    Screenshot 2023-07-13 at 17.07.53.png

なお、再度パイプラインを実行しても、元のデータに増分がないので処理は行われません。
Screenshot 2023-07-13 at 17.08.59.png

データの追加

同じデータをパスを変えて保存します。

save_path = f"{data_path}/2023/07/07/"
df.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)
display(dbutils.fs.ls(f"{data_path}/2023/07"))

Screenshot 2023-07-13 at 17.10.19.png

再度パイプラインの実行

Delta Live Tablesのパイプラインに移動し、開始をクリックします。今回は増分のデータが処理されます。

Screenshot 2023-07-13 at 17.12.20.png

件数も2倍になっています。
Screenshot 2023-07-13 at 17.12.40.png

Databricksクイックスタートガイド

Databricksクイックスタートガイド

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0