こちらでは、ダミーデータを使ってDelta Live Tables(DLT)のインクリメンタルなデータの取り込みをデモします。
Delta Live Tablesとは
データパイプラインの変換処理を全て記述することなしに、パイプラインの各ステップ(テーブル)を宣言することで、Delta Live Tablesはテーブル間の処理を補完するので、シンプルなコードで複雑なパイプラインを構築することができます。SQLとPythonをサポートしているので、得意な言語でデータパイプラインを構築できます。また、イベントログ、エラー時のリトライ、自動テスト、オートスケーリングなどがサポートされています。処理の背後ではSparkの構造化ストリーミングやDelta Lakeを活用しています。
前提条件
- SQLを使用します。
- あるパスにParquetが逐次配置され、新規に到着したファイルのみを処理します。これが増分のみを処理するインクリメンタルなデータ取り込みです。
- 取り込むデータのカラムに空白が含まれているので、カラムマッピングの機能を活用します。
- 取り込んだデータはUntiy Catalogで管理します。
注意事項
Delta Live Tablesのテーブルの保存先をUnity Catalogにする際、いくつかの制限事項が存在します。
- Unity Catalog配下に作成されたDelta Live Tablesのテーブルは、共有モードのUnity Catalog対応クラスター、あるいはSQLウェアハウスからのみクエリーすることができます。ユーザー割り当てモード、分離なしモードの共有クラスターからクエリーすることはできません。
- 共有モードのクラスターでは機械学習ランタイムはサポートされていません。
データの準備
# データ保存先パス
data_path = "/tmp/takaaki.yayoi@databricks.com/dlt/landing"
# ランディングゾーンを初期化
dbutils.fs.rm(data_path, True)
データを読み込みます。このデータのカラム名には空白が含まれています。
df = spark.read.csv("/databricks-datasets/wine-quality/winequality-white.csv", sep=";", header=True)
display(df)
データを保存します。
save_path = f"{data_path}/2023/07/06/"
df.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)
display(dbutils.fs.ls(f"{data_path}/2023/07"))
DLTパイプラインの定義
SQLノートブックを作成して以下の内容を記述します。TBLPROPERTIES
を指定してカラムマッピングを有効化し、処理時刻のカラムを追加しています。
CREATE
OR REFRESH STREAMING TABLE bronze TBLPROPERTIES (
'delta.minReaderVersion' = '2',
'delta.minWriterVersion' = '5',
'delta.columnMapping.mode' = 'name'
) COMMENT "Parquetをそのまま保持するブロンズテーブル" AS
SELECT
*,
current_timestamp() as processed -- 処理時刻
FROM
cloud_files(
"/tmp/takaaki.yayoi@databricks.com/dlt/landing/",
"parquet"
)
パイプラインの実行
- サイドメニューのDelta Live Tablesをクリックします。
- パイプラインを作成ボタンをクリックします。
- パイプライン名を入力します、ソースコードで、上で作成したノートブックを選択します。
-
配信先でUnity Catalogを選択し、カタログを選択、ターゲットスキーマでスキーマ(データベース)を選択します。ここにテーブルが作成されます。
- 作成をクリックします。
- 開始をクリックします。
- 処理が完了すると、テーブルがグリーンになります。
テーブルの確認
- サイドメニューからSQLエディタにアクセスし、稼働中のSQLウェアハウスを選択します。
- 以下のクエリーを実行します。
SELECT * FROM takaakiyayoi_catalog.dlt.bronze ORDER BY processed DESC
- 結果を確認することができます。
- 件数も確認します。
なお、再度パイプラインを実行しても、元のデータに増分がないので処理は行われません。
データの追加
同じデータをパスを変えて保存します。
save_path = f"{data_path}/2023/07/07/"
df.write.format("parquet").option("header", "true").mode("overwrite").save(save_path)
display(dbutils.fs.ls(f"{data_path}/2023/07"))
再度パイプラインの実行
Delta Live Tablesのパイプラインに移動し、開始をクリックします。今回は増分のデータが処理されます。