0
1

More than 1 year has passed since last update.

Delta Lakeのチェンジデータフィード(実践編)

Last updated at Posted at 2022-06-04

こちらのDeltaチェンジデータフィードのサンプルノートブックをウォークスルーします。

ノートブックの日本語訳はこちらです。

テーブルの準備

他のユーザーとテーブル名が重複しないように、データベースを選択します。専用のデータベースが無い場合にはCREATE DATABASE文でデータベースを作成してからUSEを実行します。

SQL
%sql
-- 他のユーザーと重複しないデータベースを選択
USE 20210712_demo_takaakiyayoi;

このノートブックのデフォルト言語はPythonなので、何も指定しない場合Pythonとしてコマンドが実行されます。ここで、セルの先頭で%sqlと指定することで、このセルの言語がSQLに切り替わります。データベースを操作する場合にはSQLを用いた方がシンプルになるので、このようにセルの言語を柔軟に切り替えることで、さまざまなロジックをシンプルに記述することができます。

国ごとのワクチン接種量と利用可能なワクチンの絶対値を追跡するシルバーテーブルを作成

saveAsTableを用いて、silverTableというテーブルとして保存します。上でUSE <database>を実行しているので、指定したデータベースにテーブルが保存されます。

Python
countries = [("USA", 10000, 20000), ("India", 1000, 1500), ("UK", 7000, 10000), ("Canada", 500, 700) ]
columns = ["Country","NumVaccinated","AvailableDoses"]
spark.createDataFrame(data=countries, schema = columns).write.format("delta").mode("overwrite").saveAsTable("silverTable")

SELECTで作成したテーブルを確認します。

SQL
%sql
SELECT * FROM silverTable

Screen Shot 2022-06-04 at 12.58.42.png

ゴールドテーブル: 国ごとのワクチン接種率を示すゴールドテーブルを作成

上で作成したシルバーテーブルをベースにして、BIや機械学習などで活用するゴールドテーブルを作成します。これらシルバー、ゴールド(そして、ブロンズ)というテーブルにデータと処理を分離するアーキテクチャをデルタアーキテクチャ(メダリオンアーキテクチャ)と呼びます。詳細は、こデルタ vs ラムダ: なぜデータパイプラインのシンプルさは複雑さに勝るのかをご覧ください。

Python
import pyspark.sql.functions as F
spark.read.format("delta").table("silverTable").withColumn("VaccinationRate", F.col("NumVaccinated") / F.col("AvailableDoses")) \
  .drop("NumVaccinated").drop("AvailableDoses") \
  .write.format("delta").mode("overwrite").saveAsTable("goldTable")
SQL
%sql
SELECT * FROM goldTable

Screen Shot 2022-06-04 at 13.03.56.png

シルバーテーブルでチェンジデータフィードを有効化

テーブルプロパティdelta.enableChangeDataFeed = trueを設定します。

SQL
%sql
ALTER TABLE silverTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)

シルバーレコードを更新

シルバーテーブルに更新を加えていきます。これらは全てチェンジデータフィードとしてキャプチャされます。

Python
# 新規レコードをinsert
new_countries = [("Australia", 100, 3000)]
spark.createDataFrame(data=new_countries, schema = columns).write.format("delta").mode("append").saveAsTable("silverTable")
SQL
%sql
-- レコードのupdate
UPDATE silverTable SET NumVaccinated = '11000' WHERE Country = 'USA'
SQL
%sql
-- レコードの削除
DELETE from silverTable WHERE Country = 'UK'

最新の状態を確認します。

SQL
%sql
SELECT * FROM silverTable

Screen Shot 2022-06-04 at 13.13.06.png

SQLとPySparkでチェンジデータを探索する

上で加えた変更を確認します。table_changesに対象のテーブル、開始バージョン番号、終了バージョン番号を指定します。

SQL
%sql 
-- 変更の参照
SELECT * FROM table_changes('silverTable', 2, 5) order by _commit_timestamp

以下のように変更内容を確認することができます。
Screen Shot 2022-06-04 at 13.14.16.png

PySparkでも同様の結果を得ることができます。

Python
changes_df = spark.read.format("delta").option("readChangeData", True).option("startingVersion", 2).table('silverTable')
display(changes_df)

シルバーテーブルからゴールドテーブルに変更を伝播させる

全てのデータを洗い替えるのではなく、変更があった部分のみを伝播させることでパフォーマンスの改善、処理のシンプル化を図ることができます。

更新箇所のみを保持する一時ビューsilverTable_latest_version を作成します。

SQL
%sql
-- 国ごとの最新バージョンのみを収集
CREATE OR REPLACE TEMPORARY VIEW silverTable_latest_version as
SELECT * 
    FROM 
         (SELECT *, rank() over (partition by Country order by _commit_version desc) as rank
          FROM table_changes('silverTable', 2, 5)
          WHERE _change_type !='update_preimage')
    WHERE rank=1

Screen Shot 2022-06-04 at 13.19.53.png

このビューを用いて、更新があった箇所のみをゴールドテーブルにUPSERTします。

SQL
%sql
-- ゴールドに変更をマージ
MERGE INTO goldTable t USING silverTable_latest_version s ON s.Country = t.Country
        WHEN MATCHED AND s._change_type='update_postimage' THEN UPDATE SET VaccinationRate = s.NumVaccinated/s.AvailableDoses
        WHEN NOT MATCHED THEN INSERT (Country, VaccinationRate) VALUES (s.Country, s.NumVaccinated/s.AvailableDoses)

結果を確認します。

SQL
%sql
SELECT * FROM goldTable

これで、変更点のみがゴールドテーブルに伝播されました。

Screen Shot 2022-06-04 at 13.21.36.png

テーブルのサイズが大きくなると、さらにデータ更新のパフォーマンスの改善が期待できます。ぜひ、データパイプラインでチェンジデータフィードを活用してみてください。

Databricks 無料トライアル

Databricks 無料トライアル

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1