2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksにおけるPythonユーザー定義関数チートシート

Last updated at Posted at 2024-04-17

DatabricksでSparkを取り扱う際、ロジックをモジュール化や共通化のために ユーザー定義関数(User-defined function: UDF) は必須の機能と言えます。

ここ最近でもUDFに関する機能が追加されているので、今時点でどのような選択肢があるのかをチートシートにまとめます。執筆時点では以下の選択肢があります。

注意
ここでは、PythonのUDFを対象とします。SQLやScalaは対象外とします。

ベースとしたマニュアルはこちらです。また、効率の観点でのUDFの違いはどの UDF が最も効率的ですか?にまとめられています。こちらも留意ください。例えば、組み込み関数が使える場合はそちらを使った方が高速です。

チートシート

UDF 説明 ユースケース コンピュート API スコープ
User-defined scalar functions(ユーザー定義スカラー関数) スカラー値を返却する関数 Sparkで入出力がスカラー値の関数を作成してロジックをモジュール化したい 共有クラスターを使っている場合: DBR 13.2以降 PySpark、Spark SQL SparkSession
pandas user-defined functions(pandasユーザー定義関数) pandasのSeriesあるいはpandasのSeriesに対するIteratorを入力とし、SeriesあるいはIteratorを返却する関数 Sparkで入出力がSeriesである関数を作成し、ロジックをモジュール化したい。ArrowでUDFの処理を高速化したい 共有クラスターを使っている場合: DBR 13.2以降 PySpark、Spark SQL SparkSession
User-defined table functions(ユーザー定義テーブル関数) テーブルを返却する関数 スカラー値を渡してテーブルを返却する関数を利用したい(注1) DBR 14.0以降 PySpark、Spark SQL SparkSession
UDF in Unity Catalog(Unity CatalogのUDF) Unity Catalogで管理されるUDF 適切なアクセス権を設定して、他のユーザー、ワークスペースとUDFを共有したい(注2) DBR 13.2以降あるいはサーバレスSQL Spark SQL Databricksアカウント(注3)
  • APIとは当該UDFをPySparkから呼び出せるか、Spark SQLから呼び出せるのかを示しています。
  • 注1: ユーザー定義テーブル関数の制限事項
    • パブリックプレビュー
    • 共有クラスターやSQLウェアハウスでは利用不可
  • 注2: Unity CatalogのUDFの制限事項
    • パブリックプレビュー
    • Gravitonクラスターでは未サポート
    • UDFはスカラー値を返却する必要がある
    • ユーザー定義テーブル関数は登録不可
    • ファイルシステムや内部サービスへのアクセス不可
    • 標準のPythonライブラリはインポートできるが、カスタムライブラリや外部の依存関係を持ち込むことはできない
  • 注3: Unity Catalogのアクセス管理モデルに従います。

チートシートの画像はこちらです。
Screenshot 2024-04-17 at 9.17.20.png

ユーザー定義スカラー関数

UDFとして関数を登録

from pyspark.sql.types import LongType
def squared_typed(s):
  return s * s
spark.udf.register("squaredWithPython", squared_typed, LongType())

Spark SQLでのUDF呼び出し

spark.range(1, 20).createOrReplaceTempView("test")
%sql select id, squaredWithPython(id) as id_squared from test

Screenshot 2024-04-17 at 8.58.17.png

PySparkでのUDFの利用

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType
squared_udf = udf(squared, LongType())
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

Screenshot 2024-04-17 at 8.58.17.png

こちらでも同じ結果が得られます。こちらはアノテーションを使用しています。

from pyspark.sql.functions import udf
@udf("long")
def squared_udf(s):
  return s * s
df = spark.table("test")
display(df.select("id", squared_udf("id").alias("id_squared")))

pandasユーザー定義関数

ベクトル化UDFとしても知られるpandasユーザー定義関数(UDF)は、データの転送にApache Arrowを使用し、データの操作にpandasを使用するユーザー定義関数です。pandas UDFによって、一度に一行を処理するPython(スカラー)UDFと比べて、最大100倍パフォーマンスを改善できるベクトル化オペレーションを実現します。

背景となる情報については、ブログ記事Spark3.0における新機能: Pandas UDFとPython型ヒントをご覧ください。

デコレーターとしてキーワードpandas_udfを用い、Python型ヒントを持つ関数をラッピングすることで、pandas UDFを定義します。

Series to Series UDF

スカラーオペレーションをベクトル化するためにSeries to Series pandas UDFを活用します。selectwithColumnのようなAPIで使用することができます。

Python関数は、入力としてpandasのSeriesを受け取り、同じ長さのpandas Seriesを返却しなくてはならず、Python型ヒントでそれらを指定しなくてはなりません。Sparkは、列をバッチに分割し、データのサブセットとしてそれぞれのバッチに対して関数を呼び出し、結果を結合することで、pandas UDFを実行します。

import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType

# 関数を宣言し、UDFを作成
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
    return a * b

multiply = pandas_udf(multiply_func, returnType=LongType())

# pandas_udfに渡す関数は、ローカルのpandasデータを処理できなくてはなりません
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0    1
# 1    4
# 2    9
# dtype: int64

# Sparkデータフレームの作成、`spark`は既存のSparkSessionです
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))

# Sparkベクトル化UDFとして関数を実行
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# |                  1|
# |                  4|
# |                  9|
# +-------------------+
0    1
1    4
2    9
dtype: int64
+-------------------+
|multiply_func(x, x)|
+-------------------+
|                  1|
|                  4|
|                  9|
+-------------------+

Iterator of Series to Iterator of Series UDF

イテレーターUDF(Seriesに対するイテレーターからSeriesに対するイテレーターに変換するUDF)は、以下の点を除いてスカラーpandas UDFと同じです:

  • Python関数
    • 単一の入力バッチではなく、バッチのイテレーターを受け取ります。
    • 単一の出力バッチではなく、出力バッチのイテレーターを返却します。
  • イテレーターにおける全体的な出力の長さは、全体的な入力の長さに一致しなくてはなりません。
  • ラッピングされたpandas UDFは入力として単一のSparkカラムを受け取ります。

Python型タイプヒントをIterator[pandas.Series] -> Iterator[pandas.Series]として指定しなくてはなりません。

このpandas UDFは、それぞれの入力バッチに対して推論を適用するための機械学習モデルファイルをロードするような、何かしらの状態の初期化を必要とするUDF実行の際に有用です。

import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

# 列を指定してUDFが呼び出された際、
# 背後の関数に対する入力はpd.Seriesのイテレーターとなります。
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    for x in batch_iter:
        yield x + 1

df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# |          2|
# |          3|
# |          4|
# +-----------+

# このUDFでは、バッチを処理する前に幾つかの状態を初期化することができます。
# ご自身のコードをtry/finallyでラッピングしたり、最後で確実にリソースを解放するように
# コンテキストマネージャを使用することができます
y_bc = spark.sparkContext.broadcast(1)

@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
    y = y_bc.value  # 状態の初期化
    try:
        for x in batch_iter:
            yield x + y
    finally:
        pass  # 必要に応じてここでリソースを解放

df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# |        2|
# |        3|
# |        4|
# +---------+
+-----------+
|plus_one(x)|
+-----------+
|          2|
|          3|
|          4|
+-----------+

+---------+
|plus_y(x)|
+---------+
|        2|
|        3|
|        4|
+---------+

Iterator of multiple Series to Iterator of Series UDF

Iterator of multiple Series to Iterator of Series UDF(複数のシリーズに対するイテレーターからシリーズに対するイテレーターに変換するUDF)はIterator of Series to Iterator of Series UDFと似た特性と制約を有しています。指定された関数は、バッチに対するイテレーターを受け取り、バッチに対するイテレーターを出力します。これもまた、何かしらの状態を初期化が必要なUDFの実行で有用です。

違いは:

  • 背後のPython関数はpandas Seriesのタプルに対するイテレーターを受け取ります。
  • ラッピングされたpandas UDFは入力として、複数のSparkカラムを受け取ります。

Iterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series]の型ヒントを指定します。

from typing import Iterator, Tuple
import pandas as pd

from pyspark.sql.functions import col, pandas_udf, struct

pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)

@pandas_udf("long")
def multiply_two_cols(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    for a, b in iterator:
        yield a * b

df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# |                      1|
# |                      4|
# |                      9|
# +-----------------------+
+-----------------------+
|multiply_two_cols(x, x)|
+-----------------------+
|                      1|
|                      4|
|                      9|
+-----------------------+

Series to scalar UDF

Series to scalar UDF(Seriesからスカラーに変換するUDF)は、Sparkの集計関数と似ています。Series to scalar UDFは一つ以上のpandas Seriesからスカラー値に対する集計処理を定義し、それぞれのpandas SeriesはSparkカラムを表現します。selectwithColumngroupBy.aggpyspark.sql.WindowのようなAPIでSeries to scalar UDFを使うことができます。

pandas.Series, ... -> Anyとして型ヒントを表現します。戻り値の型はプリミティブデータ型である必要があり、返却されるスカラーはintfloatのようなPythonのプリミティブ型、あるいはnumpy.int64numpy.float64のようなNumPyのデータ型にすることができます。理想的にはAnyは特定のスカラー型であるべきです。

このタイプのUDFは部分的な集計処理をサポートしておらず、それぞれのグループのすべてのデータがメモリーにロードされます。

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))

# 関数を宣言し、UDFを作成
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
    return v.mean()

df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# |        4.2|
# +-----------+

df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# |  1|        1.5|
# |  2|        6.0|
# +---+-----------+

w = Window \
    .partitionBy('id') \
    .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id|   v|mean_v|
# +---+----+------+
# |  1| 1.0|   1.5|
# |  1| 2.0|   1.5|
# |  2| 3.0|   6.0|
# |  2| 5.0|   6.0|
# |  2|10.0|   6.0|
# +---+----+------+
+-----------+
|mean_udf(v)|
+-----------+
|        4.2|
+-----------+

+---+-----------+
| id|mean_udf(v)|
+---+-----------+
|  1|        1.5|
|  2|        6.0|
+---+-----------+

+---+----+------+
| id|   v|mean_v|
+---+----+------+
|  1| 1.0|   1.5|
|  1| 2.0|   1.5|
|  2| 3.0|   6.0|
|  2| 5.0|   6.0|
|  2|10.0|   6.0|
+---+----+------+

ユーザー定義テーブル関数

こちらをご覧ください。

Unity CatalogのUDF

こちらご覧ください。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?