What are user-defined functions (UDFs)? | Databricks on AWS [2022/12/21時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
ユーザー定義関数(user-defined-function:UDF)はユーザーによって定義される関数であり、ユーザー環境でカスタムロジックを再利用できるようになります。拡張可能なロジックを分散処理できるようにするために、Databricksでは様々な種類のUDFをサポートしています。本書では、UDFの一般的な強みと制限のいくつかをご説明します。
UDFに関する詳細については以下のドキュメントを参照ください。
- DatabricksにおけるSpark pandasユーザー定義関数
- DatabricksにおけるPythonユーザー定義関数(UDF)
- User-defined scalar functions - Scala
- User-defined aggregate functions - Scala
カスタムロジックはどのような場合にUDFではないのか?
厳密にはすべてのカスタム関数がUDFという訳ではありません。SQLやSparkデータフレームを用いて一連のSparkビルトインメソッドを安全に定義することができ、完全に最適された挙動をさせることができます。例えば、以下のSQL、Python関数は再利用可能な関数として単位変換を定義するためにSparkのビルトインメソッドを組み合わせています。
CREATE FUNCTION convert_f_to_c(unit STRING, temp DOUBLE)
RETURNS DOUBLE
RETURN CASE
WHEN unit = "F" THEN (temp - 32) * (5/9)
ELSE temp
END;
SELECT convert_f_to_c(unit, temp) AS c_temp
FROM tv_temp;
def convertFtoC(unitCol, tempCol):
from pyspark.sql.functions import when
return when(unitCol == "F", (tempCol - 32) * (5/9)).otherwise(tempCol)
from pyspark.sql.functions import col
df_query = df.select(convertFtoC(col("unit"), col("temp"))).toDF("c_temp")
display(df_query)
上のUDFを実行するために、サンプルデータを作成することができます。
どのUDFが最も効率的なのか?
UDFはコード実行において多大なる処理ボトルネックを持ち込む場合があります。Databricksでは、Apache Spark、SQL、Delta Lakeの構文を用いて記述されたコードに対して自動で様々なオプティマイザを適用します。UDFによってカスタムロジックが導入されると、これらのオプティマイザはこのカスタムロジックに対してタスクを効率的に計画する能力を発揮することができません。さらに、JVM外で実行されるロジックにおいては、データのシリアライズにおける追加のコストが発生します。
いくつかのUDFは他のものより効率的です。パフォーマンスの観点では:
- Databricksのオプティマイザのおかげでビルトイン関数が最も高速です。
- JVMで実行されるコード(Scala、Java、Hive UDF)はPythonのUDFよりも高速です。
- Pandas UDFはPython UDFに関連づけられるシリアライズ残すとを削減するためにArrowを活用します。
- 通常はPython UDFは避けるべきですが、パフォーマンス劣化なしに、コードを結びつけるためにPythonを活用することができます。
タイプ | 最適化 | 実行環境 |
---|---|---|
Hive UDF | No | JVM |
Python UDF | No | Python |
Pandas UDF | No | Python (Arrow) |
Scala UDF | No | JVM |
Spark SQL | Yes | JVM |
Spark DataFrame | Yes | JVM |
いつUDFを使うべきか?
UDFの主要なメリットは、慣れ親しんだ言語でロジックを表現し、コードのリファクタリングに伴う人間のコストを削減できるというものです。アドホックなクエリー、主導のデータクレンジング、探索的データ分析、小中規模のデータセットに対する大部分のオペレーションにおいては、UDFによるレーテンシーのオーバーヘッドがコードのリファクタリングに関連するコストを上回ることはほとんどありません。
ETLジョブ、ストリーミングオペレーション、超大規模なデータセットに対するオペレーション、その他の定常的、連続的に実行されるワークロードにおいては、ネイティブなApache Sparkのメソッドを用いるためにロジックをリファクタリングするコストはすぐに取り戻すことができます。
サンプルUDF向けのサンプルデータ
本書におけるコードサンプルでは、摂氏と華氏で温度を変換するUDFを使用しています。これらの関数を実行したいのであれば、以下のPythonコードでサンプルデータセットを作成することができます。
import numpy as np
import pandas as pd
Fdf = pd.DataFrame(np.random.normal(55, 25, 10000000), columns=["temp"])
Fdf["unit"] = "F"
Cdf = pd.DataFrame(np.random.normal(10, 10, 10000000), columns=["temp"])
Cdf["unit"] = "C"
df = spark.createDataFrame(pd.concat([Fdf, Cdf]).sample(frac=1))
df.cache().count()
df.createOrReplaceTempView("tv_temp")