pandas user-defined functions | Databricks on AWS [2021/6/11時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
ベクトライズドUDFとしても知られるpandasのユーザー定義関数(UDF)は、データを操作するためのpandasにデータを転送するためにApache Arrowを用いるユーザー定義関数です。pandasのUDFを用いることで、行単位で処理を行うPython UDFと比べて100倍もの性能改善を実現することができます。
背景となる情報に関しては、ブログ記事New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0、New Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0を参照ください。
デコレーターとしてpandas_udf
キーワードを用いてpandas UDFを定義し、Python型ヒントで関数をラップします。本書では、いろいろなタイプのpandas UDFを説明し、型ヒントを用いたpandas UDFの使い方を説明します。
シリーズ to シリーズUDF
スカラーオペレーションをベクトル化するために、シリーズ to シリーズpandas UDFを使用します。select
、withColumn
のようなAPIと共に使用します。
Python関数はpandasのSeriesを入力として受け取り、同じ長さのpandas Seriesを返却する必要があり、これらをPythonの型ヒントで指定する必要があります。Sparkはカラムをバッチに分割し、データのサブセットとしてそれぞれのバッチに対して関数を呼び出し、結果を結合することでpandas UDFを実行します。
以下の例では、2つのカラムの積を計算するpandas UDFの作成方法を示しています。
import pandas as pd
from pyspark.sql.functions import col, pandas_udf
from pyspark.sql.types import LongType
# Declare the function and create the UDF
def multiply_func(a: pd.Series, b: pd.Series) -> pd.Series:
return a * b
multiply = pandas_udf(multiply_func, returnType=LongType())
# The function for a pandas_udf should be able to execute with local pandas data
x = pd.Series([1, 2, 3])
print(multiply_func(x, x))
# 0 1
# 1 4
# 2 9
# dtype: int64
# Create a Spark DataFrame, 'spark' is an existing SparkSession
df = spark.createDataFrame(pd.DataFrame(x, columns=["x"]))
# Execute function as a Spark vectorized UDF
df.select(multiply(col("x"), col("x"))).show()
# +-------------------+
# |multiply_func(x, x)|
# +-------------------+
# | 1|
# | 4|
# | 9|
# +-------------------+
シリーズイテレーター to シリーズイテレーターUDF
イテレーターUDFは以下の点を除いてスカラーのpandas UDFと同じです。
- Python関数
- 入力として単一のバッチを受け取るのではなく、バッチのイテレーターを受け取ります。
- 単一のアウトプットバッチを返却するのではなく、アウトプットバッチのイテレーターを返却します。
- イテレーターにおけるアウトプット全体の長さは、インプット全体の長さと同じである必要があります。
- ラップされたpandas UDFは、入力として単一のSparkカラムを受け取ります。
Pythonの型ヒントとしてIterator[pandas.Series] -> Iterator[pandas.Series]
を指定する必要があります。
このpandas UDFは、UDFの実行で何かしらの状態の初期化、例えば、それぞれの入力バッチに対して推論を適用するために機械学習モデルファイルをロードするような場合には有用です。
以下のサンプルでは、イテレーターのサポートを用いてpandas UDFを作成する方法を説明しています。
import pandas as pd
from typing import Iterator
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
# When the UDF is called with the column,
# the input to the underlying function is an iterator of pd.Series.
@pandas_udf("long")
def plus_one(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
for x in batch_iter:
yield x + 1
df.select(plus_one(col("x"))).show()
# +-----------+
# |plus_one(x)|
# +-----------+
# | 2|
# | 3|
# | 4|
# +-----------+
# In the UDF, you can initialize some state before processing batches.
# Wrap your code with try/finally or use context managers to ensure
# the release of resources at the end.
y_bc = spark.sparkContext.broadcast(1)
@pandas_udf("long")
def plus_y(batch_iter: Iterator[pd.Series]) -> Iterator[pd.Series]:
y = y_bc.value # initialize states
try:
for x in batch_iter:
yield x + y
finally:
pass # release resources here, if any
df.select(plus_y(col("x"))).show()
# +---------+
# |plus_y(x)|
# +---------+
# | 2|
# | 3|
# | 4|
# +---------+
複数シリーズイテレーター to シリーズイテレーターUDF
複数シリーズイテレーター to シリーズイテレーターUDFはシリーズイテレーター to シリーズイテレーターUDFと同様の特定、制限を持っています。指定された関数は、バッチのイテレーターを受け取り、バッチのイテレーターを出力します。この関数も、UDFの実行で何かしらの状態初期化が必要な場合に有用です。
相違点は以下の通りです。
- 内部のPython関数はpandasシリーズのタプルのイテレーターを受け取ります。
- ラップされたpandas UDFは入力として複数のSparkカラムを受け取ります。
型ヒントとしてIterator[Tuple[pandas.Series, ...]] -> Iterator[pandas.Series]
を指定します。
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import col, pandas_udf, struct
pdf = pd.DataFrame([1, 2, 3], columns=["x"])
df = spark.createDataFrame(pdf)
@pandas_udf("long")
def multiply_two_cols(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
for a, b in iterator:
yield a * b
df.select(multiply_two_cols("x", "x")).show()
# +-----------------------+
# |multiply_two_cols(x, x)|
# +-----------------------+
# | 1|
# | 4|
# | 9|
# +-----------------------+
シリーズ to スカラーUDF
シリーズ to スカラーpandas UDFはSparkの集計関数と類似のものです。シリーズ to スカラーpandas UDFでは、1つ以上のpandasシリーズからスカラー値への集計処理を定義します。ここでは、それぞれのpandasシリーズはSparkカラムを表現します。select
、withColumn
、groupBy.agg
やpyspark.sql.WindowのようなAPIと共にシリーズ to スカラーUDFを使用します。
型ヒントはpandas.Series, ... -> Any
と表現します。戻り値の型は、プリミティブなデータ型である必要があり、返却されるスカラー値は、int
、float
のようなPythonのプリミティブ型、numpy.int64
、numpy.float64
のようなNumpyデータ型となります。Any
は理想的には特定のスカラー型となります。
このタイプのUDFでは、部分的集計をサポートしておらず、それぞれのグループの全データはメモリーにロードされます。
以下のサンプルでは、select
、groupBy
、window
オペレーションを用いて平均値を計算する流れを通じて、このタイプのUDFの使い方を示しています。
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
# Declare the function and create the UDF
@pandas_udf("double")
def mean_udf(v: pd.Series) -> float:
return v.mean()
df.select(mean_udf(df['v'])).show()
# +-----------+
# |mean_udf(v)|
# +-----------+
# | 4.2|
# +-----------+
df.groupby("id").agg(mean_udf(df['v'])).show()
# +---+-----------+
# | id|mean_udf(v)|
# +---+-----------+
# | 1| 1.5|
# | 2| 6.0|
# +---+-----------+
w = Window \
.partitionBy('id') \
.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
df.withColumn('mean_v', mean_udf(df['v']).over(w)).show()
# +---+----+------+
# | id| v|mean_v|
# +---+----+------+
# | 1| 1.0| 1.5|
# | 1| 2.0| 1.5|
# | 2| 3.0| 6.0|
# | 2| 5.0| 6.0|
# | 2|10.0| 6.0|
# +---+----+------+
詳細な使い方に関しては、pyspark.sql.functions.pandas_udfをご覧ください。
使い方
Arrowのバッチサイズの設定
Sparkにおけるデータのパーティションは、Arrowのレコードバッチに変換され、JVMにおけるメモリー使用量の増大を引き起こします。アウトオブメモリー例外を回避するために、spark.sql.execution.arrow.maxRecordsPerBatch
設定を、それぞれのバッチにおける最大レコード数を決定する整数値に設定することで、Arrowのレコードバッチのサイズを調整することができます。デフォルト値はバッチあたり10,000レコードとなっています。カラム数が大きい場合には、それに応じて値を調整する必要があります。この制限を用いることで、データのパーティションは、処理を行うための1つ以上のレコードバッチに分割されます。
タイムゾーンを伴うタイムスタンプのセマンティクス
Sparkは内部でタイムスタンプをUTCの値として保持し、特定のタイムゾーンの指定がない場合、取り込まれるタイムスタンプデータは、ローカル時間として、マイクロ秒の解像度でUTCに変換されます。
タイムスタンプデータがエクスポート、あるいはSparkで表示される際、タイムスタンプの値をローカライズするためにセッションのタイムゾーンが使用されます。セッションのタイムゾーンはspark.sql.session.timeZone
設定で定義され、デフォルトはJVMのシステムローカルタームゾーンとなります。pandasでは、ナノ秒の解像度によるdatetime64
型を使用し、カラムごとにオプションでタイムゾーンを定義できるdatetime64[ns]
型となります。
タイムスタンプデータがsparkからpandasに転送されると、ナノ秒に変換され、それぞれのカラムはSparkセッションのタイムゾーンに変換され、指定されたタイムゾーンでローカライズされます。これによって、タイムゾーンは削除され、ローカル時間として値が表示されます。これは、タイムスタンプカラムでtoPandas()
やpandas_udf
が呼び出された際に実行されます。
タイムスタンプデータがpandasからSparkに転送されると、マイクロ秒のUTCに変換されます。これは、pandasデータフレームに対してcreateDataFrame
が呼び出された際、あるいは、pandas UDFからタイムスタンプが返却される際に実行されます。これらの変換は自動で行われ、Sparkが期待通りのフォーマットでデータを保持することを確実にし、ご自身で変換処理を行う必要はありません。あらゆるナノ秒の値は切り捨てられます。
標準的なUDFでは、タイムスタンプデータをPythonのdatetimeオブジェクトとしてロードし、これはpandasのtimestampとは異なります。ベストなパフォーマンスを得るためには、pandas UDFでタイムスタンプを取り扱う際にはpandasの時系列機能を使用することをお勧めします。詳細はTime Series / Date functionalityを参照ください。
サンプルノートブック
以下のノートブックでは、pandas UDFを用いることで達成できるパフォーマンス改善を説明しています。
pandas UDFベンチマークノートブック