2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Delta live tableを使って1つのjobで複数テーブルに分割する

Posted at

Delta Live Tableって?

Databricksが開発したPipelineツールです。
Notebook上に宣言的にSQL/Pythonで記述することができて

  • pipelineのvisualize、Linageが自動で描ける
  • ユーザ側でエラーハンドリングのコードなどは書く必要がない(Databricks側でよしなにやる)
  • expectation(不正なデータを防ぐための条件が書ける。例えばageカラムのvalueが20以下のデータが入ってきたら、pipelineを停止する、エラーとして別カラムに書くなど)
  • SQLがかければ、pipelineを簡単に作れる

といった感じです。
今回、1つのjobの中で、条件に応じて複数のテーブルを生成してみようと思います。

全体像

Screenshot 2022-07-19 at 16.37.01.jpg

全体像はこんな感じで、S3にあるデータセットからデータ読み込んで、必要なカラムを抽出して、最終的にCustID(顧客番号)ごとにテーブルを分割します。

コードの説明

コードの全体像はこんな感じです。

Screenshot 2022-07-19 at 16.48.13.jpg

Brozeテーブルの生成(cmd1 / cmd2の箇所)

必要なlibraryをimportしたのち、関数(retail_raw)としてspark.readを呼び出して、csvファイルをloadしています。

使ったデータはdatabricksにあるサンプルデータセットで、onlineショップの販売データです。

Screenshot 2022-07-19 at 16.54.49.jpg

import dlt
from pyspark.sql.functions import *
from pyspark.sql.types import *

csv_path = "/databricks-datasets/online_retail/data-001/data.csv"

@dlt.table(
  comment="オンラインショップの注文データ, ingested from /databricks-datasets."
)

def retail_raw():
  return (
    spark.read.format("csv")
    .option("header", "true")
    .option("inferSchema", "true")
    .load(csv_path)
  )

siliverテーブルの生成 (cmd3)

cmd2で定義したretail_rawからデータを読み込み、関数retail_preparedを定義します。
withColumnRenamedで適度にcolumn名を変更して、最後にselectで必要なデータだけを読み出しています。

@dlt.table(
  comment="分析用にテーブルの加工をします"
)
def retail_prepared():
  return (
    dlt.read("retail_raw")
      .withColumnRenamed("StockCode", "zaikoNO")
      .withColumnRenamed("CustomerID", "CustID")
      .select("InvoiceNo", "InvoiceDate", "zaikoNO", "CustID")
  )

goldテーブルの生成(cmd4以降)

こんな感じで、関数(CustID14,15)を定義して、filterメソッドとつかってCustIDを分割しています。

@dlt.table(
  comment="CustID14のみのテーブル"
)
def CustID14():
  return (
    dlt.read("retail_prepared")
      .filter(expr("CustID LIKE '14%'"))
      .select("CustID", "InvoiceDate")
  )

@dlt.table(
  comment="CustID15のみのテーブル"
)
def CustID15():
  return (
    dlt.read("retail_prepared")
      .filter(expr("CustID LIKE '15%'"))
      .select("CustID", "InvoiceDate")
  )

これでとりあえず、種となるnotebookは完了です。

Delta live tableのjobを作る

上の手順で作成したNotebookをDelta live tableに登録します。
Workflowの画面から

  • 先ほど作成したnotebookを指定
  • Targetに書き込み先のDatabase名を指定

Screenshot_2022-07-19_at_16_59_45.jpg

保存後、startボタンを押すとjobが実行され、pipelineが描画されます。

Screenshot_2022-07-19_at_17_04_18.jpg

画面右側にはそれぞれの関数(retail_rawなど)のスキーマ情報、処理件数、エラーがどのくらいあったかどうかなどが表示されます。
画面下側にあるAll/info/warning/errorの箇所は、pipelineのエラーが記録され、なにかあればそこからdebugをする形になります。
画面右上の箇所から、スケジュール実行や、設定変更が可能です。

書き込みの結果を見てみる

jobを実行した結果、Deltalake形式のテーブルが自動で生成されるので、確認してみます。

DBSQL(Redashベース)から見てみたいと思います。
各関数で定義した名前で、deltalakeのテーブルが生成されています。

Screenshot 2022-07-19 at 17.13.35.jpg

SQLを書いて中身を見てみたいと思います。

Screenshot 2022-07-19 at 17.15.35.jpg

Screenshot 2022-07-19 at 17.15.19.jpg

最後に

簡単ではありますが、こう言ったふうにpython/sqlでpipelineを簡単に定義できるのと、1つのjobで複数テーブルに書き込めるのはなかなか便利かなとおもうので、ぜひ興味あるひとは使ってみてください。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?