0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Delta Live Tablesにおけるメタプログラミング

Posted at

こちらをウォークスルーします。

Delta Live Tables(DLT)では、データパイプラインのそれぞれのステップ(テーブル)自体をプログラムで作成することができます。いわゆるメタプログラミングが可能です。

ループを用いたメタプログラミング

で、ウォークスルーしていたのですが、冒頭のこちらの注意書きが気になりました。

重要
Delta Live Tables デコレータを使用する Python 関数は遅延して呼び出されるため、ループでデータセットを作成する場合は、別の関数を呼び出してデータセットを作成し、正しいパラメーター値が使用されるようにする必要があります。 別の関数でデータセットを作成しないと、ループの最終実行のパラメーターを使用する複数のテーブルが作成されます。

def create_table(name):
  @dlt.table(name=name)
  def t():
    return spark.read.table(name)

tables = ["t1", "t2"]
for t in tables:
  create_table(t)

どういうことなのでしょう。実際に試してみます。dlt_sourceというスキーマ配下にダミーテーブルを作成します。

CREATE TABLE takaakiyayoi_catalog.dlt_source.t1
(
  deptcode  INT,
  deptname  STRING,
  location  STRING
);

INSERT INTO takaakiyayoi_catalog.dlt_source.t1 VALUES
  (10, 'FINANCE', 'EDINBURGH'),
  (20, 'SOFTWARE', 'PADDINGTON'),
  (30, 'SALES', 'MAIDSTONE'),
  (40, 'MARKETING', 'DARLINGTON'),
  (50, 'ADMIN', 'BIRMINGHAM');

CREATE TABLE takaakiyayoi_catalog.dlt_source.t2
(
  deptcode  INT,
  deptname  STRING,
  location  STRING
);

INSERT INTO takaakiyayoi_catalog.dlt_source.t2 VALUES
  (60, 'FINANCE', 'TOKYO'),
  (70, 'SOFTWARE', 'OSAKA'),
  (80, 'SALES', 'NAGOYA'),
  (90, 'MARKETING', 'FUKUOKA'),
  (100, 'ADMIN', 'SAPPORO');

以下のようなパイプラインの実装を行います。関数を用いず、ループ内でテーブルを定義しています。

dlt_raw_meta
import dlt
from pyspark.sql.functions import *

tables = ["t1", "t2"]
for table in tables:
  @dlt.table(name=table)
  def t():
    return spark.read.table("takaakiyayoi_catalog.dlt_source." + table)

こちらのパイプラインのターゲットスキーマはdlt_metaとして、テーブル名が重複しないようにします。
Screenshot 2024-05-18 at 9.59.37.png

これで、dlt_sourceのt1とt2テーブルを読み込んで、dlt_meta配下に同名のテーブルが作られるはずです。

しかし、テーブルを確認してみますと、どちらもt2の内容になっています。
Screenshot 2024-05-18 at 10.00.30.png
Screenshot 2024-05-18 at 10.01.02.png

これが遅延評価の影響ということですね。ループはt2で終わっていますので、遅延してテーブルを定義する際にどちらもt2のテーブルを読み込んでしまうという訳ですね。

関数化します。

dlt_function_meta
import dlt
from pyspark.sql.functions import *

def create_table(name):
  @dlt.table(name=name)
  def t():
    return spark.read.table("takaakiyayoi_catalog.dlt_source." + name)

tables = ["t1", "t2"]
for t in tables:
  create_table(t)

これで期待した通りの挙動になりました。
Screenshot 2024-05-18 at 10.37.41.png
Screenshot 2024-05-18 at 10.37.50.png

サンプルパイプライン

それでは本来のサンプルパイプラインを実行します。ただ、こちらも以下のような制約があるので、ソースとなるファイルをUnity Catalogのボリューム配下にコピーします。

この例では、 Databricks データセットに含まれるサンプル データを読み取ります。 Databricks データセットは Unity Catalog に発行するパイプラインではサポートされていないため、この例は、 Hive metastoreに発行するように構成されたパイプラインでのみ機能します。

dbutils.fs.cp(
    "/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv",
    "/Volumes/takaakiyayoi_catalog/dlt_meta/data",
)

これでこちらのパイプラインを実行します。

import dlt
from pyspark.sql.functions import *

@dlt.table(
  name="raw_fire_department",
  comment="消防署レスポンスの生テーブル"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
  return (
    spark.read.format('csv')
      .option('header', 'true')
      .option('multiline', 'true')
      .load('/Volumes/takaakiyayoi_catalog/dlt_meta/data/Fire_Department_Calls_for_Service.csv')
      .withColumnRenamed('Call Type', 'call_type')
      .withColumnRenamed('Received DtTm', 'received')
      .withColumnRenamed('Response DtTm', 'responded')
      .withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
    .select('call_type', 'received', 'responded', 'neighborhood')
  )

all_tables = []

def generate_tables(call_table, response_table, filter):
  @dlt.table(
    name=call_table,
    comment="呼び出しタイプごとのトップレベルテーブル"
  )
  def create_call_table():
    return (
      spark.sql("""
        SELECT
          unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
          unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
          neighborhood
        FROM LIVE.raw_fire_department
        WHERE call_type = '{filter}'
      """.format(filter=filter))
    )

  @dlt.table(
    name=response_table,
    comment="最もレスポンス時間が早いトップ10の地域"
  )
  def create_response_table():
    return (
      spark.sql("""
        SELECT
          neighborhood,
          AVG((ts_received - ts_responded)) as response_time
        FROM LIVE.{call_table}
        GROUP BY 1
        ORDER BY response_time
        LIMIT 10
      """.format(call_table=call_table))
    )

  all_tables.append(response_table)

generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")

@dlt.table(
  name="best_neighborhoods",
  comment="ベストなレスポンスタイムリストに最も出現する地域"
)
def summary():
  target_tables = [dlt.read(t) for t in all_tables]
  unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
  return (
    unioned.groupBy(col("neighborhood"))
      .agg(count("*").alias("score"))
      .orderBy(desc("score"))
  )

動きました!
Screenshot 2024-05-18 at 10.53.53.png

生データのテーブル。
Screenshot 2024-05-18 at 10.55.45.png

呼び出しタイプがAlarmsのテーブル。これ以外にもStructure FireMedical Incidentを格納するテーブルが作られています。
Screenshot 2024-05-18 at 10.56.12.png

呼び出しタイプがAlarmsの地域ごとの平均レスポンス時間を格納するテーブル。
Screenshot 2024-05-18 at 10.58.22.png

呼び出しタイプごとの平均レスポンス時間のテーブルを読み込んで地域ごとに集計した結果を格納するテーブル。
Screenshot 2024-05-18 at 10.55.06.png

メタプログラミングを活用することで柔軟にロジックが組めますね。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?