0
0

More than 1 year has passed since last update.

Delta Live TablesのUDFに複数カラムを渡して処理を行う

Last updated at Posted at 2022-06-12

こちらで紹介したように、Delta Live Tables(DLT)ではUDF(User Defined Function:ユーザー定義関数)を活用することで、処理をモジュール化することができます。

前回紹介したのは、integerを引数として渡すシンプルなUDFでしたが、ケースによっては以下のようにテーブルの全てのカラムを渡して処理を行いたいというケースがあるかと思います。

Python
@dlt.table
def silver_data():
  return (
    spark.sql("SELECT *, makeColSum(*) AS sumResult FROM live.bronze_data")
  )

現状、私が調べた範囲ではこちらのUDFに配列を渡すことができないので、以降で説明する、ある意味泥臭いやり方を取る必要があります。ただ、列数が変動しても柔軟に処理を行えるというメリットがあります。

こちらのアプローチは、列のすべての値を文字列結合してUDFに渡して、UDF側で値に分離して処理を行うというものです。

呼び出し側の準備

まず、ダミーデータを準備します。

dlt-python-caller
@dlt.table
def bronze_data():
  return spark.sql("SELECT id AS id, rand() * 100 AS col1, rand() * 100 AS col2 FROM RANGE(10)")

こちらのデータの全カラムの値の総和を取るUDFを呼び出します。

dlt-python-caller
@dlt.table
def silver_data():
  return (
    spark.sql("SELECT *, makeColSum(CONCAT_WS('_', *)) AS sumResult FROM live.bronze_data")
  )

ここでの肝はCONCAT_WSです。セパレーターを指定して全カラムを文字列結合します。これをUDFmakeColSumに渡します。

呼び出されるUDFの準備

dlt-python-callee
from pyspark.sql.functions import *
from pyspark.sql.types import *
import math

def colSum(x) -> float:
    """
    指定された行の総和を取るUDF
    :param x: Pyspark/SQLのカラムを`_`で連結したもの
    :return: 総和
    """
    
    # separatorで分割
    str_list = x.split('_')
    # floatのlistに変換
    float_list = list(map(float, str_list))
    # 総和の計算
    sum_result = math.fsum(float_list)

    return sum_result

spark.udf.register("makeColSum", colSum, FloatType())

パイプラインの実行

上記のノートブックを用いてパイプラインを作成して実行します。2つノートブックをノートブックライブラリとして追加します。
Screen Shot 2022-06-12 at 11.43.35.png

パイプラインの設定(JSON)は以下のようになります。

JSON
{
    "id": "432c0f50-99be-45e3-8860-02a8d4061dfa",
    "clusters": [
        {
            "label": "default",
            "autoscale": {
                "min_workers": 1,
                "max_workers": 5
            }
        }
    ],
    "development": true,
    "continuous": false,
    "edition": "advanced",
    "photon": false,
    "libraries": [
        {
            "notebook": {
                "path": "/Users/takaaki.yayoi@databricks.com/20220610_dlt_udf_process_all_columns/dlt-python-caller"
            }
        },
        {
            "notebook": {
                "path": "/Users/takaaki.yayoi@databricks.com/20220610_dlt_udf_process_all_columns/dlt-python-callee"
            }
        }
    ],
    "name": "dlt_custom_udf",
    "storage": "dbfs:/user/takaaki.yayoi@databricks.com/dlt",
    "target": "dlt_takaaki_yayoi_databricks_com_db"
}

結果の確認

このように全カラムの総和がsumResultに格納されています。
Screen Shot 2022-06-12 at 11.44.23.png

今回は基本的な処理のみをカバーしました。列名含めた文字列として渡して、特定の列を除外するといった実装を行うことも可能です。

Databricks 無料トライアル

Databricks 無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0