こちらで紹介したように、Delta Live Tables(DLT)ではUDF(User Defined Function:ユーザー定義関数)を活用することで、処理をモジュール化することができます。
前回紹介したのは、integerを引数として渡すシンプルなUDFでしたが、ケースによっては以下のようにテーブルの全てのカラムを渡して処理を行いたいというケースがあるかと思います。
@dlt.table
def silver_data():
return (
spark.sql("SELECT *, makeColSum(*) AS sumResult FROM live.bronze_data")
)
現状、私が調べた範囲ではこちらのUDFに配列を渡すことができないので、以降で説明する、ある意味泥臭いやり方を取る必要があります。ただ、列数が変動しても柔軟に処理を行えるというメリットがあります。
こちらのアプローチは、列のすべての値を文字列結合してUDFに渡して、UDF側で値に分離して処理を行う
というものです。
呼び出し側の準備
まず、ダミーデータを準備します。
@dlt.table
def bronze_data():
return spark.sql("SELECT id AS id, rand() * 100 AS col1, rand() * 100 AS col2 FROM RANGE(10)")
こちらのデータの全カラムの値の総和を取るUDFを呼び出します。
@dlt.table
def silver_data():
return (
spark.sql("SELECT *, makeColSum(CONCAT_WS('_', *)) AS sumResult FROM live.bronze_data")
)
ここでの肝はCONCAT_WS
です。セパレーターを指定して全カラムを文字列結合します。これをUDFmakeColSum
に渡します。
呼び出されるUDFの準備
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math
def colSum(x) -> float:
"""
指定された行の総和を取るUDF
:param x: Pyspark/SQLのカラムを`_`で連結したもの
:return: 総和
"""
# separatorで分割
str_list = x.split('_')
# floatのlistに変換
float_list = list(map(float, str_list))
# 総和の計算
sum_result = math.fsum(float_list)
return sum_result
spark.udf.register("makeColSum", colSum, FloatType())
パイプラインの実行
上記のノートブックを用いてパイプラインを作成して実行します。2つノートブックをノートブックライブラリとして追加します。
パイプラインの設定(JSON)は以下のようになります。
{
"id": "432c0f50-99be-45e3-8860-02a8d4061dfa",
"clusters": [
{
"label": "default",
"autoscale": {
"min_workers": 1,
"max_workers": 5
}
}
],
"development": true,
"continuous": false,
"edition": "advanced",
"photon": false,
"libraries": [
{
"notebook": {
"path": "/Users/takaaki.yayoi@databricks.com/20220610_dlt_udf_process_all_columns/dlt-python-caller"
}
},
{
"notebook": {
"path": "/Users/takaaki.yayoi@databricks.com/20220610_dlt_udf_process_all_columns/dlt-python-callee"
}
}
],
"name": "dlt_custom_udf",
"storage": "dbfs:/user/takaaki.yayoi@databricks.com/dlt",
"target": "dlt_takaaki_yayoi_databricks_com_db"
}
結果の確認
このように全カラムの総和がsumResult
に格納されています。
今回は基本的な処理のみをカバーしました。列名含めた文字列として渡して、特定の列を除外するといった実装を行うことも可能です。