DatabricksでSparkを取り扱う際、ロジックをモジュール化や共通化のために ユーザー定義関数(User-defined function: UDF) は必須の機能と言えます。
ここ最近でもUDFに関する機能が追加されているので、今時点でどのような選択肢があるのかをチートシートにまとめます。執筆時点では以下の選択肢があります。
- User-defined scalar functions
- pandas user-defined functions
- User-defined table functions
- UDF in Unity Catalog
注意
ここでは、PythonのUDFを対象とします。SQLやScalaは対象外とします。
ベースとしたマニュアルはこちらです。また、効率の観点でのUDFの違いはどの UDF が最も効率的ですか?にまとめられています。こちらも留意ください。例えば、組み込み関数が使える場合はそちらを使った方が高速です。
チートシート
UDF | 説明 | ユースケース | コンピュート | API | スコープ |
---|---|---|---|---|---|
User-defined scalar functions(ユーザー定義スカラー関数) | スカラー値を返却する関数 | Sparkで入出力がスカラー値の関数を作成してロジックをモジュール化したい | 共有クラスターを使っている場合: DBR 13.2以降 | PySpark、Spark SQL | SparkSession |
pandas user-defined functions(pandasユーザー定義関数) | pandasのSeriesあるいはpandasのSeriesに対するIteratorを入力とし、SeriesあるいはIteratorを返却する関数 | Sparkで入出力がSeriesである関数を作成し、ロジックをモジュール化したい。ArrowでUDFの処理を高速化したい | 共有クラスターを使っている場合: DBR 13.2以降 | PySpark、Spark SQL | SparkSession |
User-defined table functions(ユーザー定義テーブル関数) | テーブルを返却する関数 | スカラー値を渡してテーブルを返却する関数を利用したい(注1) | DBR 14.0以降 | PySpark、Spark SQL | SparkSession |
UDF in Unity Catalog(Unity CatalogのUDF) | Unity Catalogで管理されるUDF | 適切なアクセス権を設定して、他のユーザー、ワークスペースとUDFを共有したい(注2) | DBR 13.2以降あるいはサーバレスSQL | Spark SQL | Databricksアカウント(注3) |
- APIとは当該UDFをPySparkから呼び出せるか、Spark SQLから呼び出せるのかを示しています。
- 注1: ユーザー定義テーブル関数の制限事項
- パブリックプレビュー
- 共有クラスターやSQLウェアハウスでは利用不可
- 注2: Unity CatalogのUDFの制限事項
- パブリックプレビュー
- Gravitonクラスターでは未サポート
- UDFはスカラー値を返却する必要がある
- ユーザー定義テーブル関数は登録不可
- ファイルシステムや内部サービスへのアクセス不可
- 標準のPythonライブラリはインポートできるが、カスタムライブラリや外部の依存関係を持ち込むことはできない
- 注3: Unity Catalogのアクセス管理モデルに従います。
ユーザー定義スカラー関数
UDFとして関数を登録
from pyspark.sql.types import LongType
def squared_typed(s):
return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())
Spark SQLでのUDF呼び出し
spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test
PySparkでのUDFの利用
from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
こちらでも同じ結果が得られます。こちらはアノテーションを使用しています。
from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))
pandasユーザー定義関数
ベクトル化UDFとしても知られるpandasユーザー定義関数(UDF)は、データの転送にApache Arrowを使用し、データの操作にpandasを使用するユーザー定義関数です。pandas UDFによって、一度に一行を処理するPython(スカラー)UDFと比べて、最大100倍パフォーマンスを改善できるベクトル化オペレーションを実現します。
背景となる情報については、ブログ記事Spark3.0における新機能: Pandas UDFとPython型ヒントをご覧ください。
デコレーターとしてキーワードpandas_udf
を用い、Python型ヒントを持つ関数をラッピングすることで、pandas UDFを定義します。
Series to Series UDF
スカラーオペレーションをベクトル化するためにSeries to Series pandas UDFを活用します。select
やwithColumn
のようなAPIで使用することができます。
Python関数は、入力としてpandasのSeriesを受け取り、同じ長さのpandas Seriesを返却しなくてはならず、Python型ヒントでそれらを指定しなくてはなりません。Sparkは、列をバッチに分割し、データのサブセットとしてそれぞれのバッチに対して関数を呼び出し、結果を結合することで、pandas UDFを実行します。
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# 関数を宣言し、UDFを作成
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# pandas_udfに渡す関数は、ローカルのpandasデータを処理できなくてはなりません
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Sparkデータフレームの作成、`spark`は既存のSparkSessionです
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Sparkベクトル化UDFとして関数を実行
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
0 1
1 4
2 9
dtype: int64
+-------------------+
|multiply_func(x, x)|
+-------------------+
| 1|
| 4|
| 9|
+-------------------+
Iterator of Series to Iterator of Series UDF
イテレーターUDF(Seriesに対するイテレーターからSeriesに対するイテレーターに変換するUDF)は、以下の点を除いてスカラーpandas UDFと同じです:
- Python関数
- 単一の入力バッチではなく、バッチのイテレーターを受け取ります。
- 単一の出力バッチではなく、出力バッチのイテレーターを返却します。
- イテレーターにおける全体的な出力の長さは、全体的な入力の長さに一致しなくてはなりません。
- ラッピングされたpandas UDFは入力として単一のSparkカラムを受け取ります。
Python型タイプヒントをIterator[pandas.Series] -> Iterator[pandas.Series]
として指定しなくてはなりません。
このpandas UDFは、それぞれの入力バッチに対して推論を適用するための機械学習モデルファイルをロードするような、何かしらの状態の初期化を必要とするUDF実行の際に有用です。
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# 列を指定してUDFが呼び出された際、
# 背後の関数に対する入力はpd.Seriesのイテレーターとなります。
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# このUDFでは、バッチを処理する前に幾つかの状態を初期化することができます。
# ご自身のコードをtry/finallyでラッピングしたり、最後で確実にリソースを解放するように
# コンテキストマネージャを使用することができます
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # 状態の初期化
try:
for x in batch_iter:
yield x + y
finally:
pass # 必要に応じてここでリソースを解放
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
+-----------+
|plus_one(x)|
+-----------+
| 2|
| 3|
| 4|
+-----------+
+---------+
|plus_y(x)|
+---------+
| 2|
| 3|
| 4|
+---------+
Iterator of multiple Series to Iterator of Series UDF
Iterator of multiple Series to Iterator of Series UDF(複数のシリーズに対するイテレーターからシリーズに対するイテレーターに変換するUDF)はIterator of Series to Iterator of Series UDFと似た特性と制約を有しています。指定された関数は、バッチに対するイテレーターを受け取り、バッチに対するイテレーターを出力します。これもまた、何かしらの状態を初期化が必要なUDFの実行で有用です。
違いは:
- 背後のPython関数はpandas Seriesのタプルに対するイテレーターを受け取ります。
- ラッピングされたpandas UDFは入力として、複数のSparkカラムを受け取ります。
Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series]
の型ヒントを指定します。
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
+-----------------------+
|multiply_two_cols(x, x)|
+-----------------------+
| 1|
| 4|
| 9|
+-----------------------+
Series to scalar UDF
Series to scalar UDF(Seriesからスカラーに変換するUDF)は、Sparkの集計関数と似ています。Series to scalar UDFは一つ以上のpandas Seriesからスカラー値に対する集計処理を定義し、それぞれのpandas SeriesはSparkカラムを表現します。select
、withColumn
、groupBy.agg
、pyspark.sql.WindowのようなAPIでSeries to scalar UDFを使うことができます。
pandas.Series, ... -> Any
として型ヒントを表現します。戻り値の型はプリミティブデータ型である必要があり、返却されるスカラーはint
やfloat
のようなPythonのプリミティブ型、あるいはnumpy.int64
やnumpy.float64
のようなNumPyのデータ型にすることができます。理想的にはAny
は特定のスカラー型であるべきです。
このタイプのUDFは部分的な集計処理をサポートしておらず、それぞれのグループのすべてのデータがメモリーにロードされます。
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# 関数を宣言し、UDFを作成
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
+-----------+
|mean_udf(v)|
+-----------+
| 4.2|
+-----------+
+---+-----------+
| id|mean_udf(v)|
+---+-----------+
| 1| 1.5|
| 2| 6.0|
+---+-----------+
+---+----+------+
| id| v|mean_v|
+---+----+------+
| 1| 1.0| 1.5|
| 1| 2.0| 1.5|
| 2| 3.0| 6.0|
| 2| 5.0| 6.0|
| 2|10.0| 6.0|
+---+----+------+
ユーザー定義テーブル関数
こちらをご覧ください。
Unity CatalogのUDF
こちらご覧ください。