Delta Live Tableって?
Databricksが開発したPipelineツールです。
Notebook上に宣言的にSQL/Pythonで記述することができて
- pipelineのvisualize、Linageが自動で描ける
- ユーザ側でエラーハンドリングのコードなどは書く必要がない(Databricks側でよしなにやる)
- expectation(不正なデータを防ぐための条件が書ける。例えばageカラムのvalueが20以下のデータが入ってきたら、pipelineを停止する、エラーとして別カラムに書くなど)
- SQLがかければ、pipelineを簡単に作れる
といった感じです。
今回、1つのjobの中で、条件に応じて複数のテーブルを生成してみようと思います。
全体像
全体像はこんな感じで、S3にあるデータセットからデータ読み込んで、必要なカラムを抽出して、最終的にCustID(顧客番号)ごとにテーブルを分割します。
コードの説明
コードの全体像はこんな感じです。
Brozeテーブルの生成(cmd1 / cmd2の箇所)
必要なlibraryをimportしたのち、関数(retail_raw)としてspark.readを呼び出して、csvファイルをloadしています。
使ったデータはdatabricksにあるサンプルデータセットで、onlineショップの販売データです。
import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *
csv_path = "/databricks-datasets/online_retail/data-001/data.csv"
@dlt.table(
comment="オンラインショップの注文データ, ingested from /databricks-datasets."
)
def retail_raw():
return (
spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load(csv_path)
)
siliverテーブルの生成 (cmd3)
cmd2で定義したretail_rawからデータを読み込み、関数retail_preparedを定義します。
withColumnRenamedで適度にcolumn名を変更して、最後にselectで必要なデータだけを読み出しています。
@dlt.table(
comment="分析用にテーブルの加工をします"
)
def retail_prepared():
return (
dlt.read("retail_raw")
.withColumnRenamed("StockCode", "zaikoNO")
.withColumnRenamed("CustomerID", "CustID")
.select("InvoiceNo", "InvoiceDate", "zaikoNO", "CustID")
)
goldテーブルの生成(cmd4以降)
こんな感じで、関数(CustID14,15)を定義して、filterメソッドとつかってCustIDを分割しています。
@dlt.table(
comment="CustID14のみのテーブル"
)
def CustID14():
return (
dlt.read("retail_prepared")
.filter(expr("CustID LIKE '14%'"))
.select("CustID", "InvoiceDate")
)
@dlt.table(
comment="CustID15のみのテーブル"
)
def CustID15():
return (
dlt.read("retail_prepared")
.filter(expr("CustID LIKE '15%'"))
.select("CustID", "InvoiceDate")
)
これでとりあえず、種となるnotebookは完了です。
Delta live tableのjobを作る
上の手順で作成したNotebookをDelta live tableに登録します。
Workflowの画面から
- 先ほど作成したnotebookを指定
- Targetに書き込み先のDatabase名を指定
保存後、startボタンを押すとjobが実行され、pipelineが描画されます。
画面右側にはそれぞれの関数(retail_rawなど)のスキーマ情報、処理件数、エラーがどのくらいあったかどうかなどが表示されます。
画面下側にあるAll/info/warning/errorの箇所は、pipelineのエラーが記録され、なにかあればそこからdebugをする形になります。
画面右上の箇所から、スケジュール実行や、設定変更が可能です。
書き込みの結果を見てみる
jobを実行した結果、Deltalake形式のテーブルが自動で生成されるので、確認してみます。
DBSQL(Redashベース)から見てみたいと思います。
各関数で定義した名前で、deltalakeのテーブルが生成されています。
SQLを書いて中身を見てみたいと思います。
最後に
簡単ではありますが、こう言ったふうにpython/sqlでpipelineを簡単に定義できるのと、1つのjobで複数テーブルに書き込めるのはなかなか便利かなとおもうので、ぜひ興味あるひとは使ってみてください。