こちらのDeltaチェンジデータフィードのサンプルノートブックをウォークスルーします。
ノートブックの日本語訳はこちらです。
テーブルの準備
他のユーザーとテーブル名が重複しないように、データベースを選択します。専用のデータベースが無い場合にはCREATE DATABASE
文でデータベースを作成してからUSE
を実行します。
%sql
-- 他のユーザーと重複しないデータベースを選択
USE 20210712_demo_takaakiyayoi;
このノートブックのデフォルト言語はPythonなので、何も指定しない場合Pythonとしてコマンドが実行されます。ここで、セルの先頭で%sql
と指定することで、このセルの言語がSQLに切り替わります。データベースを操作する場合にはSQLを用いた方がシンプルになるので、このようにセルの言語を柔軟に切り替えることで、さまざまなロジックをシンプルに記述することができます。
国ごとのワクチン接種量と利用可能なワクチンの絶対値を追跡するシルバーテーブルを作成
saveAsTable
を用いて、silverTable
というテーブルとして保存します。上でUSE <database>
を実行しているので、指定したデータベースにテーブルが保存されます。
countries = [("USA", 10000, 20000), ("India", 1000, 1500), ("UK", 7000, 10000), ("Canada", 500, 700) ]
columns = ["Country","NumVaccinated","AvailableDoses"]
spark.createDataFrame(data=countries, schema = columns).write.format("delta").mode("overwrite").saveAsTable("silverTable")
SELECT
で作成したテーブルを確認します。
%sql
SELECT * FROM silverTable
ゴールドテーブル: 国ごとのワクチン接種率を示すゴールドテーブルを作成
上で作成したシルバーテーブルをベースにして、BIや機械学習などで活用するゴールドテーブルを作成します。これらシルバー、ゴールド(そして、ブロンズ)というテーブルにデータと処理を分離するアーキテクチャをデルタアーキテクチャ(メダリオンアーキテクチャ)と呼びます。詳細は、こデルタ vs ラムダ: なぜデータパイプラインのシンプルさは複雑さに勝るのかをご覧ください。
import pyspark.sql.functions as F
spark.read.format("delta").table("silverTable").withColumn("VaccinationRate", F.col("NumVaccinated") / F.col("AvailableDoses")) \
.drop("NumVaccinated").drop("AvailableDoses") \
.write.format("delta").mode("overwrite").saveAsTable("goldTable")
%sql
SELECT * FROM goldTable
シルバーテーブルでチェンジデータフィードを有効化
テーブルプロパティdelta.enableChangeDataFeed = true
を設定します。
%sql
ALTER TABLE silverTable SET TBLPROPERTIES (delta.enableChangeDataFeed = true)
シルバーレコードを更新
シルバーテーブルに更新を加えていきます。これらは全てチェンジデータフィードとしてキャプチャされます。
# 新規レコードをinsert
new_countries = [("Australia", 100, 3000)]
spark.createDataFrame(data=new_countries, schema = columns).write.format("delta").mode("append").saveAsTable("silverTable")
%sql
-- レコードのupdate
UPDATE silverTable SET NumVaccinated = '11000' WHERE Country = 'USA'
%sql
-- レコードの削除
DELETE from silverTable WHERE Country = 'UK'
最新の状態を確認します。
%sql
SELECT * FROM silverTable
SQLとPySparkでチェンジデータを探索する
上で加えた変更を確認します。table_changes
に対象のテーブル、開始バージョン番号、終了バージョン番号を指定します。
%sql
-- 変更の参照
SELECT * FROM table_changes('silverTable', 2, 5) order by _commit_timestamp
PySparkでも同様の結果を得ることができます。
changes_df = spark.read.format("delta").option("readChangeData", True).option("startingVersion", 2).table('silverTable')
display(changes_df)
シルバーテーブルからゴールドテーブルに変更を伝播させる
全てのデータを洗い替えるのではなく、変更があった部分のみを伝播させることでパフォーマンスの改善、処理のシンプル化を図ることができます。
更新箇所のみを保持する一時ビューsilverTable_latest_version
を作成します。
%sql
-- 国ごとの最新バージョンのみを収集
CREATE OR REPLACE TEMPORARY VIEW silverTable_latest_version as
SELECT *
FROM
(SELECT *, rank() over (partition by Country order by _commit_version desc) as rank
FROM table_changes('silverTable', 2, 5)
WHERE _change_type !='update_preimage')
WHERE rank=1
このビューを用いて、更新があった箇所のみをゴールドテーブルにUPSERTします。
%sql
-- ゴールドに変更をマージ
MERGE INTO goldTable t USING silverTable_latest_version s ON s.Country = t.Country
WHEN MATCHED AND s._change_type='update_postimage' THEN UPDATE SET VaccinationRate = s.NumVaccinated/s.AvailableDoses
WHEN NOT MATCHED THEN INSERT (Country, VaccinationRate) VALUES (s.Country, s.NumVaccinated/s.AvailableDoses)
結果を確認します。
%sql
SELECT * FROM goldTable
これで、変更点のみがゴールドテーブルに伝播されました。
テーブルのサイズが大きくなると、さらにデータ更新のパフォーマンスの改善が期待できます。ぜひ、データパイプラインでチェンジデータフィードを活用してみてください。