こちらをウォークスルーします。
Delta Live Tables(DLT)では、データパイプラインのそれぞれのステップ(テーブル)自体をプログラムで作成することができます。いわゆるメタプログラミングが可能です。
ループを用いたメタプログラミング
で、ウォークスルーしていたのですが、冒頭のこちらの注意書きが気になりました。
重要
Delta Live Tables デコレータを使用する Python 関数は遅延して呼び出されるため、ループでデータセットを作成する場合は、別の関数を呼び出してデータセットを作成し、正しいパラメーター値が使用されるようにする必要があります。 別の関数でデータセットを作成しないと、ループの最終実行のパラメーターを使用する複数のテーブルが作成されます。
def create_table(name):
@dlt.table(name=name)
def t():
return spark.read.table(name)
tables = ["t1", "t2"]
for t in tables:
create_table(t)
どういうことなのでしょう。実際に試してみます。dlt_source
というスキーマ配下にダミーテーブルを作成します。
CREATE TABLE takaakiyayoi_catalog.dlt_source.t1
(
deptcode INT,
deptname STRING,
location STRING
);
INSERT INTO takaakiyayoi_catalog.dlt_source.t1 VALUES
(10, 'FINANCE', 'EDINBURGH'),
(20, 'SOFTWARE', 'PADDINGTON'),
(30, 'SALES', 'MAIDSTONE'),
(40, 'MARKETING', 'DARLINGTON'),
(50, 'ADMIN', 'BIRMINGHAM');
CREATE TABLE takaakiyayoi_catalog.dlt_source.t2
(
deptcode INT,
deptname STRING,
location STRING
);
INSERT INTO takaakiyayoi_catalog.dlt_source.t2 VALUES
(60, 'FINANCE', 'TOKYO'),
(70, 'SOFTWARE', 'OSAKA'),
(80, 'SALES', 'NAGOYA'),
(90, 'MARKETING', 'FUKUOKA'),
(100, 'ADMIN', 'SAPPORO');
以下のようなパイプラインの実装を行います。関数を用いず、ループ内でテーブルを定義しています。
import dlt
from pyspark.sql.functions import *
tables = ["t1", "t2"]
for table in tables:
@dlt.table(name=table)
def t():
return spark.read.table("takaakiyayoi_catalog.dlt_source." + table)
こちらのパイプラインのターゲットスキーマはdlt_meta
として、テーブル名が重複しないようにします。
これで、dlt_source
のt1とt2テーブルを読み込んで、dlt_meta
配下に同名のテーブルが作られるはずです。
しかし、テーブルを確認してみますと、どちらもt2の内容になっています。
これが遅延評価の影響ということですね。ループはt2で終わっていますので、遅延してテーブルを定義する際にどちらもt2のテーブルを読み込んでしまうという訳ですね。
関数化します。
import dlt
from pyspark.sql.functions import *
def create_table(name):
@dlt.table(name=name)
def t():
return spark.read.table("takaakiyayoi_catalog.dlt_source." + name)
tables = ["t1", "t2"]
for t in tables:
create_table(t)
サンプルパイプライン
それでは本来のサンプルパイプラインを実行します。ただ、こちらも以下のような制約があるので、ソースとなるファイルをUnity Catalogのボリューム配下にコピーします。
この例では、 Databricks データセットに含まれるサンプル データを読み取ります。 Databricks データセットは Unity Catalog に発行するパイプラインではサポートされていないため、この例は、 Hive metastoreに発行するように構成されたパイプラインでのみ機能します。
dbutils.fs.cp(
"/databricks-datasets/timeseries/Fires/Fire_Department_Calls_for_Service.csv",
"/Volumes/takaakiyayoi_catalog/dlt_meta/data",
)
これでこちらのパイプラインを実行します。
import dlt
from pyspark.sql.functions import *
@dlt.table(
name="raw_fire_department",
comment="消防署レスポンスの生テーブル"
)
@dlt.expect_or_drop("valid_received", "received IS NOT NULL")
@dlt.expect_or_drop("valid_response", "responded IS NOT NULL")
@dlt.expect_or_drop("valid_neighborhood", "neighborhood != 'None'")
def get_raw_fire_department():
return (
spark.read.format('csv')
.option('header', 'true')
.option('multiline', 'true')
.load('/Volumes/takaakiyayoi_catalog/dlt_meta/data/Fire_Department_Calls_for_Service.csv')
.withColumnRenamed('Call Type', 'call_type')
.withColumnRenamed('Received DtTm', 'received')
.withColumnRenamed('Response DtTm', 'responded')
.withColumnRenamed('Neighborhooods - Analysis Boundaries', 'neighborhood')
.select('call_type', 'received', 'responded', 'neighborhood')
)
all_tables = []
def generate_tables(call_table, response_table, filter):
@dlt.table(
name=call_table,
comment="呼び出しタイプごとのトップレベルテーブル"
)
def create_call_table():
return (
spark.sql("""
SELECT
unix_timestamp(received,'M/d/yyyy h:m:s a') as ts_received,
unix_timestamp(responded,'M/d/yyyy h:m:s a') as ts_responded,
neighborhood
FROM LIVE.raw_fire_department
WHERE call_type = '{filter}'
""".format(filter=filter))
)
@dlt.table(
name=response_table,
comment="最もレスポンス時間が早いトップ10の地域"
)
def create_response_table():
return (
spark.sql("""
SELECT
neighborhood,
AVG((ts_received - ts_responded)) as response_time
FROM LIVE.{call_table}
GROUP BY 1
ORDER BY response_time
LIMIT 10
""".format(call_table=call_table))
)
all_tables.append(response_table)
generate_tables("alarms_table", "alarms_response", "Alarms")
generate_tables("fire_table", "fire_response", "Structure Fire")
generate_tables("medical_table", "medical_response", "Medical Incident")
@dlt.table(
name="best_neighborhoods",
comment="ベストなレスポンスタイムリストに最も出現する地域"
)
def summary():
target_tables = [dlt.read(t) for t in all_tables]
unioned = functools.reduce(lambda x,y: x.union(y), target_tables)
return (
unioned.groupBy(col("neighborhood"))
.agg(count("*").alias("score"))
.orderBy(desc("score"))
)
呼び出しタイプがAlarms
のテーブル。これ以外にもStructure Fire
とMedical Incident
を格納するテーブルが作られています。
呼び出しタイプがAlarms
の地域ごとの平均レスポンス時間を格納するテーブル。
呼び出しタイプごとの平均レスポンス時間のテーブルを読み込んで地域ごとに集計した結果を格納するテーブル。
メタプログラミングを活用することで柔軟にロジックが組めますね。