0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

組み込み関数で済むのにUDFを書くと何倍遅いのか — Spark UDFの使い分けを実測で整理する

0
Last updated at Posted at 2026-05-23

はじめに

レビューで、こういうコードを見かけることがあります。

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

add_one = udf(lambda x: x + 1, LongType())
df = df.withColumn("count_up", add_one("value"))

やっていることは「ある列に 1 を足す」だけです。これは組み込みの式で書けます。

from pyspark.sql.functions import col

df = df.withColumn("count_up", col("value") + 1)

両者は同じ結果を返しますが、パフォーマンスはまったく違います。そして Spark には udf のほかに pandas_udf もあり、「結局どれを使えばいいのか」が分かりにくくなっています。

この記事では、ごく単純なカウントアップ処理を題材に、組み込み関数 / Pandas UDF / スカラー UDF の使い分けを整理します。あわせて、組み込み関数で済むところをスカラー UDF にしてしまった場合のパフォーマンスへの影響を実測します。ハンズオン用のノートブックも用意しました。Databricks のサーバーレスコンピュート (Free Edition を含む) でそのまま実行できます。

結論

先に結論です。検討は次の順序で行います。

  1. pyspark.sql.functions の組み込み関数や式で書けるなら、それを使う
  2. 組み込みでは書けないが pandas / numpy のベクトル演算で表現できるなら、Pandas UDF を使う
  3. LLM 推論など AI モデルの呼び出しなら、ai_query などの組み込み AI 関数を検討する
  4. 上記いずれにもあてはまらない Python 固有のロジックなら、スカラー UDF を使う

アンチパターンは「UDF を使うこと」そのものではなく、「組み込み関数で書ける処理を UDF にすること」と「種類を選ばずスカラー UDF を選ぶこと」です。

前提となる用語

本文で繰り返し登場する用語を先に整理します。すでにご存じであれば読み飛ばしてください。

用語 説明
UDF (ユーザー定義関数) 組み込み関数では表現できない処理を、ユーザーが Python などで定義して列に適用する仕組みです。udf() で定義するスカラー UDF と、pandas_udf で定義する Pandas UDF があります。
Catalyst Spark SQL のクエリ最適化エンジンです。書いた処理を解析し、より効率的な実行計画へ書き換えます。組み込み関数の中身は解析できますが、UDF の中身は解析できません。
WholeStageCodegen 複数の処理をまとめて 1 つのコードにコンパイルする仕組みです。中間データの受け渡しを省けるため高速になります。Python UDF は対象外です。
Photon Databricks のネイティブ実行エンジンです。組み込み関数を高速化しますが、Python UDF の処理は Photon の外で実行されます。
JVM Java 仮想マシン。Spark の実行エンジン本体が動作する環境です。Python UDF を使うと、この外側にある Python プロセスとのやり取りが発生します。
Apache Arrow 言語をまたいでデータを効率よく受け渡すための列指向のメモリ形式です。JVM と Python 間の転送に使われ、行ごとの変換コストを抑えます。
ベクトル化 1 件ずつのループではなく、配列やバッチ全体に対してまとめて演算することです。Pandas UDF が速くなる理由のひとつです。
BatchEvalPython / ArrowEvalPython 実行計画に現れる Python UDF の評価ノードです。pickle で 1 行ずつ処理する従来方式が BatchEvalPython、Apache Arrow でバッチ処理する方式が ArrowEvalPython です。

題材: カウントアップ処理

題材は冒頭の「列に 1 を足す」処理です。これを次の 4 通りで実装し、比較します。

パターン 実装 概要
A 組み込み関数 / 式 col("id") + 1
B スカラー Python UDF udf(...)
C Arrow 最適化 Python UDF udf(..., useArrow=True)
D Pandas UDF pandas_udf

サンプルデータは spark.range() で用意します。

ROW_COUNT = 5_000_000
base_df = spark.range(0, ROW_COUNT)

同じ処理を4通りで書く

パターンA: 組み込み関数

from pyspark.sql.functions import col

result_a = base_df.withColumn("result", col("id") + 1)

Catalyst オプティマイザが式の中身を理解でき、WholeStageCodegen によるネイティブコード生成の対象になります。Python のワーカープロセスは起動しません。書ける処理ならこれが最優先です。

パターンB: スカラー Python UDF

from pyspark.sql.functions import udf
from pyspark.sql.types import LongType

add_one_scalar = udf(lambda x: x + 1, LongType())
result_b = base_df.withColumn("result", add_one_scalar(col("id")))

UDF の本体は 1 行ずつ Python で評価されます。Catalyst からは中身の見えないブラックボックスになります。今回のように組み込みで書ける処理を UDF にするのは、典型的なアンチパターンです。

パターンC: Arrow 最適化 Python UDF

add_one_arrow = udf(lambda x: x + 1, LongType(), useArrow=True)
result_c = base_df.withColumn("result", add_one_arrow(col("id")))

useArrow=True を渡すと、JVM と Python 間のデータ転送に Apache Arrow が使われ、行単位のシリアライズのオーバーヘッドが軽減されます。関数の書き方も、本体が 1 行ずつ処理される点もスカラー UDF と同じで、変わるのは「データの運び方」だけです。

ただし重要な注意点があります。この Arrow 最適化は Spark 3.5 / Databricks Runtime 14.0 以降の機能で、最近のランタイムでは通常のスカラー UDF (パターン B) にも既定で適用されます。そのため、最近の Databricks 環境ではパターン B とパターン C は実質的に同じものになります。useArrow=True の明示が効くのは、Arrow 最適化が既定でない古い環境や、useArrow=False で無効化されている場合です。

パターンD: Pandas UDF

import pandas as pd
from pyspark.sql.functions import pandas_udf

@pandas_udf(LongType())
def add_one_pandas(s: pd.Series) -> pd.Series:
    return s + 1  # Series 全体に対するベクトル演算

result_d = base_df.withColumn("result", add_one_pandas(col("id")))

列をバッチに分割し、各バッチを pandas.Series として受け取ります。Arrow によるバッチ転送に加え、関数の中身を pandas / numpy のベクトル演算で書けるため、Python のループを避けられます。

名前に pandas と付いていますが、処理がドライバに集約されるわけではありません。各エグゼキューターのパーティション上で分散実行されます。この点は後述します。

ベンチマーク結果

4 つの実装の実行時間を計測しました。計測には noop シンクへの書き込みを使います。count() は行数しか必要としないため、Catalyst が result 列の計算を不要と判断して刈り取ってしまい、UDF が実行されないことがあるためです。noop は全列を消費しつつどこにも書き出さない計測用のシンクで、列の刈り取りを防げます。

def force(df):
    df.write.format("noop").mode("overwrite").save()

次の値は Databricks のサーバーレスコンピュート (Free Edition) で、500 万行に対して計測した結果です。絶対値は環境によって変わるため、見るべきは実装間の比率です。

実装 実行時間 組み込み比
A 組み込み関数 0.28 秒 1.0 倍
D Pandas UDF 1.41 秒 5.0 倍
C Arrow 最適化 UDF 1.85 秒 6.5 倍
B スカラー Python UDF 1.97 秒 7.0 倍

spark_udf_benchmark.png

ここから読み取れることは次のとおりです。

  • スカラー UDF は組み込み関数より 5〜10 倍ほど遅い
  • Pandas UDF は UDF の中では最速だが、それでも組み込み関数の約 5 倍かかる
  • パターン B (通常のスカラー UDF) とパターン C (useArrow=True) はほぼ同じ速度になった

最後の点は意外に思えるかもしれません。理由は実行計画を見ると分かります。後述するように、最近の Databricks ランタイムでは通常のスカラー UDF も既定で Arrow 最適化されるため、両者は実質的に同じ実装になります。

そして最も重要な点は、「組み込み関数で書ける処理」については、最速の UDF である Pandas UDF を使ったとしても組み込み関数に勝てない、ということです。Pandas UDF を持ち出すことは、組み込み関数を使わない理由にはなりません。

実行計画で確認する

なぜ差が出るのかは explain() で物理プランを見ると分かります。以下は Databricks サーバーレスでの実際の出力を抜粋・整形したものです。

組み込み関数の場合、+1 の計算は Photon のネイティブ演算子 PhotonProject として実行されます。

== Physical Plan ==
PhotonResultStage
+- PhotonColumnarToRow
   +- PhotonProject [id, (id + 1) AS r]
      +- PhotonRange Range (0, 5000000, step=1, splits=8)

== Photon Explanation ==
The query is fully supported by Photon.

スカラー UDF の場合、ArrowEvalPython というノードが現れます。

== Physical Plan ==
PhotonResultStage
+- PhotonColumnarToRow
   +- PhotonProject [id, pythonUDF0 AS r]
      +- PhotonArrowBatchSource
         +- ArrowEvalPython [<lambda>(id)], [pythonUDF0], 101
            +- PhotonArrowResultStage
               +- PhotonArrowBatchSink ...
                  +- PhotonRange Range (0, 5000000, step=1, splits=8)

ここで注目したい点が 2 つあります。

1 つ目は、ノードが BatchEvalPython ではなく ArrowEvalPython であることです。Python UDF の評価ノードには 2 種類あります。BatchEvalPython は pickle で 1 行ずつシリアライズする従来方式、ArrowEvalPython は Apache Arrow で列指向のバッチとしてやり取りする方式です。Arrow 最適化された Python UDF は Spark 3.5 / Databricks Runtime 14.0 で導入され、最近のランタイムでは通常のスカラー UDF も既定で Arrow 最適化されます。そのため useArrow=True を明示しなくても ArrowEvalPython になります。末尾の 101 は Arrow 最適化されたスカラー UDF を表す内部の種別コードです (Pandas UDF は 200 になります)。従来の pickle 方式に戻したい場合は udf(..., useArrow=False) と明示します。

これが、ベンチマークでパターン B (通常のスカラー UDF) とパターン C (useArrow=True) がほぼ同じ速度になった理由です。最近の Databricks ランタイムでは、両者は同じ Arrow 最適化された実装に解決されます。

2 つ目は、UDF の評価ノードが Photon の外にあることです。組み込み関数は PhotonProject として Photon が直接実行しますが、UDF の本体は ArrowEvalPython ノードの中、すなわち Python ワーカープロセスで実行されます。Photon は Arrow バッチを Python へ受け渡す部分 (PhotonArrowBatchSource) までは担当しますが、UDF の中身そのものは高速化できません。Photon の説明に The query is fully supported by Photon と表示されても、それは Photon が担当する範囲を指しているだけで、Python の処理自体が速くなるわけではありません。

そして、ArrowEvalPython であれ BatchEvalPython であれ、この Python 評価ノードは「最適化を分断する壁」になります。壁の内側、つまり UDF の中身は Catalyst から見えません。そのため述語のプッシュダウン (フィルタ条件をデータ読み込みに近い位置へ移動し、早い段階で行を絞り込む最適化) や式の融合といった最適化が、壁をまたいで効かなくなります。

なぜスカラー Python UDF は遅いのか

同じカウントアップ処理でも、実装によってデータの流れはまったく異なります。

spark_udf_ipc_diagram.png

整理すると、UDF が組み込み関数より遅い理由は次の 4 点です。

JVM と Python ワーカー間のシリアライズ

Spark の実行エンジンは JVM 上で動きます。Python UDF を使うと、データを JVM から別プロセスの Python ワーカーへ渡し、結果を受け取る必要があります。従来の pickle 方式では 1 行ずつ、Arrow 最適化では列指向のバッチ単位でやり取りされます。Arrow はこのコストを抑えますが、プロセスの境界をまたぐこと自体はなくなりません。

Catalyst によるブラックボックス化

explain() で見たとおり、UDF の中身は Catalyst から不透明です。最適化の対象にならず、Python 評価ノード (ArrowEvalPython または BatchEvalPython) の壁の前後で最適化が分断されます。

WholeStageCodegen が効かない

組み込みの式は複数の処理をまとめて 1 つのネイティブな関数にコンパイルできます (WholeStageCodegen)。UDF はこの対象外で、生成されたコードの恩恵を受けられません。

Databricks では Photon が効かない

Databricks の Photon エンジンは組み込み関数を高速化しますが、Python UDF の本体は Photon の外、Python ワーカープロセスで実行されます。実行計画でも、組み込み関数は PhotonProject になるのに対し、UDF は Photon の外の ArrowEvalPython ノードになります。Arrow 最適化によって UDF の転送は速くなりますが、UDF の中身が Photon で高速化されることはありません。

「Pandas UDF」という名前の罠

ここで Pandas UDF の名前について触れておきます。「pandas」という単語から「単一マシンの pandas を持ち込むもの」「分散しなくなるのでは」と身構えてしまう人がいます。

実態は逆です。Pandas UDF は Apache Arrow でベクトル化された、各エグゼキューターのパーティション上で分散実行される正統な Spark の機能です。むしろ積極的に使うべき側です。

誤解を生む一因は、隣に本物の罠があることです。toPandas()spark.createDataFrame(pandas_df) のように「データ全体をドライバに集約して素の pandas にする」操作は、たしかに警戒すべきものです。「pandas という単語が出てきたら危険」という雑な見分け方が、ここでは当たってしまいます。その巻き添えで Pandas UDF まで警戒されてしまいます。

Pandas UDF の pandas は、「各パーティションを Arrow 経由で pandas.Series として受け取る」という入出力形式を指しているだけで、処理が分散しなくなるという意味ではありません。説明する際は「ベクトル化 UDF」と呼び替えると、この誤解を避けやすくなります。

それでも UDF が必要なときの選択肢

組み込み関数で素直に書けない処理もあります。その場合の選択肢を整理します。

Pandas UDF を優先する

組み込みでは書けないが pandas / numpy のベクトル演算で表現できるなら、Pandas UDF が第一候補です。例として、バイト数を 1.5 GB のような文字列に変換する処理を考えます。単位を段階的に繰り上げるロジックは組み込み関数だけでは現実的に書けません。

import numpy as np

@pandas_udf(StringType())
def humanize_pandas(s: pd.Series) -> pd.Series:
    values = s.astype("float64").to_numpy()
    unit_idx = np.zeros(len(values), dtype="int64")
    for _ in range(4):  # 4 段階の繰り上げを配列全体に対する演算で行う
        mask = values >= 1024
        values = np.where(mask, values / 1024.0, values)
        unit_idx = np.where(mask, unit_idx + 1, unit_idx)
    units = np.array(["B", "KB", "MB", "GB", "TB"])
    return pd.Series([f"{v:.1f} {units[i]}" for v, i in zip(values, unit_idx)])

注意: 中身がベクトル化されている範囲でしか速くならない

ただし注意点があります。この humanize の例では、Pandas UDF とスカラー UDF の差は +1 のときほど大きく開きません。今回の計測では約 1.1 倍にとどまりました。最後の文字列整形が要素ごとの Python ループとして残っているうえ、最近のランタイムではスカラー UDF も Arrow 最適化されており、転送方式に差がないためです。

Pandas UDF が速い理由は 2 つあります。Arrow による効率的なバッチ転送と、関数の中身をベクトル演算で書けることです。後者を活かせず中身が結局 Python のループのままだと、得られるのは転送効率の改善だけになります。Pandas UDF にしただけで速くなるのではなく、中身をベクトル化して初めて本来の性能が出る、と理解しておくとよいです。

LLM 呼び出しは ai_query を検討する

UDF の中で LLM を呼び出したくなる場面では、Databricks には ai_query という組み込み AI 関数が用意されています。UDF を自前で書かずに、SQL や DataFrame から直接モデルを呼び出せます。並列処理や再試行、スケーリングも自動で扱われます。

スカラー UDF は最後の手段

行ごとに独立した Python 固有のロジックで、ベクトル化もできない場合がスカラー UDF の出番です。最近の Databricks ランタイムではスカラー UDF も既定で Arrow 最適化されるため、useArrow を意識する必要はほとんどありません。古いランタイムや Arrow 最適化が無効な環境では、useArrow=True を付けるか、実行計画に BatchEvalPython が出ていないかを確認するとよいです。

使い分けの判断フロー

最終的な判断の順序です。上から順に検討します。

順序 確認すること あてはまる場合の選択
1 組み込み関数や式で書けるか 組み込み関数 / 式
2 pandas / numpy のベクトル演算で表現できるか Pandas UDF
3 LLM 推論など AI モデルの呼び出しか ai_query などの組み込み AI 関数
4 ベクトル化できない Python 固有のロジックか スカラー UDF

まとめ

  • 同じカウントアップ処理でも、組み込み関数とスカラー UDF では実行時間に 7 倍前後の差が出ます (Databricks サーバーレスでの実測)
  • 差の正体は explain() に現れる Python 評価ノード (ArrowEvalPython / BatchEvalPython)、すなわち JVM と Python ワーカー間の境界をまたぐコストと、Catalyst 最適化の分断です
  • 最近の Databricks ランタイムではスカラー UDF も既定で Arrow 最適化されるため、転送は速くなります。それでも組み込み関数 (PhotonProject) には届きません
  • UDF が必要な場合は Pandas UDF を優先します。ただし Pandas UDF は中身をベクトル化して初めて本領を発揮します
  • 判断の出発点は常に「組み込み関数で書けないか」です

「Python だから UDF」ではなく「組み込みで書けないから UDF」と考えること。これが要点です。

参考リンク

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?