pandas function APIs | Databricks on AWS [2022/6/27時点]の翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
pandas function APIを用いることで、pandasインスタンスを入力として受け取り、pandasインスタンスを出力するネイティブのPython関数をPySparkデータフレームに直接適用できるようになります。function APIはpandasユーザー定義関数と同様に、データを転送し、pandasがデータを操作する際にApache Arrowを使用します。しかし、pandas function APIではPython型ヒントはオプションです。
pandas function APIには3つのタイプがあります。
- Grouped map
- Map
- Cogrouped map
pandas function APIは、pandas UDFの実行で使用されるのと同じ内部ロジックを活用します。このため、PyArrow、サポートされるSQLの対応、設定などはpandas UDFと同じ特性を共有します。
詳細に関しては、Spark3.0における新機能: Pandas UDFとPython型ヒントをご覧ください。
Grouped map
「split-apply-combine」パターンを実装するためにgroupBy().applyInPandas()を通じて、グループ分けされたデータの変換を行います。split-apply-combineは3つのステップから構成されます。
- 
DataFrame.groupByを用いてデータをグループに分割します。
- それぞれのグループに関数を適用します。関数の入力、出力は両方ともpandas.DataFrameです。入力データにはそれぞれのグループの全ての行と列が含まれます。
- 新たなDataFrameに結果を統合します。
groupBy().applyInPandas()を使用するには、以下を定義する必要があります。
- それぞれのグループに対する処理を定義するPython関数
- 出力DataFrameのスキーマを定義するStructTypeオブジェクトあるいは文字列
返却されるpandas.DataFrameのカラムのラベルは、出力スキーマが文字列として定義された場合はフィールド名が一致する必要があり、文字列でない場合には、integerのインデックスなどフィールのデータ型がマッチする必要があります。pandas.DataFrameを構成する際にどのようにカラムにラベルをつけるのかに関しては、pandas.DataFrameを参照ください。
関数を適用する前に、グループの全データがメモリーにロードされます。特にグループのサイズに偏りがある場合、アウトオブメモリー例外を引き起こすことがあります。設定maxRecordsPerBatchはグループには適用されないので、ご自身の手で、グループ分けされたデータがメモリーに収まるようにする必要があります。
以下の例では、グループのそれぞれの値から平均値を引き算するためにどのようにgroupby().applyInPandas()を使用するのかを示しています。
df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
    ("id", "v"))
def subtract_mean(pdf):
    # pdf is a pandas.DataFrame
    v = pdf.v
    return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema="id long, v double").show()
# +---+----+
# | id|   v|
# +---+----+
# |  1|-0.5|
# |  1| 0.5|
# |  2|-3.0|
# |  2|-1.0|
# |  2| 4.0|
# +---+----+
詳細な使い方に関しては、pyspark.sql.GroupedData.applyInPandasをご覧ください。
Map
DataFrame.mapInPandas()を用いることでpandasのインスタンスを用いたmapオペレーションを実行し、pandas.DataFrameのイテレータを現在のPySparkデータフレームを表現するpandas.DataFrameの別のイテレータに変換し、結果をPySparkデータフレームとして返却します。
内部の関数は、pandas.DataFrameのイテレータを受け取り、pandas.DataFrameのイテレータを返却します。Series to Series pandas UDFのような幾つかのpandas UDFと異なり、任意の長さの出力を返却することができます。
以下の例ではmapInPandas()の使い方を説明しています。
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def filter_func(iterator):
    for pdf in iterator:
        yield pdf[pdf.id == 1]
df.mapInPandas(filter_func, schema=df.schema).show()
# +---+---+
# | id|age|
# +---+---+
# |  1| 21|
# +---+---+
詳細については、pyspark.sql.DataFrame.mapInPandasをご覧ください。
Cogrouped map
pandasインスタンスを用いたCogrouped mapオペレーションには、共通のキーでグループ結合される2つのPySparkDataframeに対してDataFrame.groupby().cogroup().applyInPandas()を使用し、それぞれの共通グループにPython関数が適用されます。以下のステップから構成されます。
- それぞれのデータフレームでキーを同じくするグループのデータがシャッフルされ、同じ共通グループにまとめられます。
- それぞれの共通グループに関数を適用します。関数の入力はpandas.DataFrameです(オプションとしてキーを表現するタプル)。関数の出力はpandas.DataFrameです。
- 全てのグループのpandas.DataFrameを新たなPySparkDataFrameで結合します。
groupBy().cogroup().applyInPandas()を使用するには、以下を定義する必要があります。
- それぞれの共通グループに対する計算処理を定義するPython関数
- 出力DataFrameのスキーマを定義するStructTypeオブジェクトあるいは文字列
返却されるpandas.DataFrameのカラムのラベルは、出力スキーマが文字列として定義された場合はフィールド名が一致する必要があり、文字列でない場合には、integerのインデックスなどフィールのデータ型がマッチする必要があります。pandas.DataFrameを構成する際にどのようにカラムにラベルをつけるのかに関しては、pandas.DataFrameを参照ください。
関数を適用する前に、グループの全データがメモリーにロードされます。特にグループのサイズに偏りがある場合、アウトオブメモリー例外を引き起こすことがあります。設定maxRecordsPerBatchはグループには適用されないので、ご自身の手で、グループ分けされたデータがメモリーに収まるようにする必要があります。
以下の例では、2つのデータセット間のasof joinを実行するために、どのようにgroupby().cogroup().applyInPandas()を使用するのかを示しています。
import pandas as pd
df1 = spark.createDataFrame(
    [(20000101, 1, 1.0), (20000101, 2, 2.0), (20000102, 1, 3.0), (20000102, 2, 4.0)],
    ("time", "id", "v1"))
df2 = spark.createDataFrame(
    [(20000101, 1, "x"), (20000101, 2, "y")],
    ("time", "id", "v2"))
def asof_join(l, r):
    return pd.merge_asof(l, r, on="time", by="id")
df1.groupby("id").cogroup(df2.groupby("id")).applyInPandas(
    asof_join, schema="time int, id int, v1 double, v2 string").show()
# +--------+---+---+---+
# |    time| id| v1| v2|
# +--------+---+---+---+
# |20000101|  1|1.0|  x|
# |20000102|  1|3.0|  x|
# |20000101|  2|2.0|  y|
# |20000102|  2|4.0|  y|
# +--------+---+---+---+
詳細な使い方については、pyspark.sql.PandasCogroupedOps.applyInPandasをご覧ください。
