この記事はNew Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0の2022/02/08時点での翻訳になります。
Pandasのユーザー定義関数(UDF)は、データサイエンス向けのApache Sparkの最も重要な機能強化の1つです。PandasのAPIを利用できるようになったり、パフォーマンスが向上したりと、多くのメリットをもたらしてくれます。
しかし、Pandas UDFは時間の経過とともに有機的に進化してきたため、いくつかの矛盾が生じ、ユーザーの間に混乱が生じています。近々リリースされる予定のApache Spark 3.0では、Pythonの型ヒントを活用したPandas UDFの新しいインターフェースが導入され、Pandas UDFの型の増殖に対処し、よりPythonicで自己記述的なものになるように支援する予定です。
このブログ記事では、Pythonのタイプヒントを利用した新しいPandas UDFと、グループ化マップ、マップ、コグループ化マップを含む新しいPandas Function APIを紹介します。
Pandas UDF
Pandas UDFはSpark 2.3で導入されましたが、Introducing Pandas UDF for PySparkの項もご覧ください。Pandasはデータサイエンティストによく知られており、NumPy、statsmodel、scikit-learnなどの多くのPythonライブラリやパッケージとシームレスに統合されており、Pandas UDFによってデータサイエンティストはワークロードをスケールアウトできるだけでなく、Apache SparkでPandas APIを利用することができるようになりました。
ユーザー定義関数は、以下のように実行されます。
- Apache Arrowは、JVMとPythonのドライバ/エグゼキュータ間で、ほぼゼロの(デ)シリアライズコストで直接データを交換することができます。
- 関数内部のPandasは、PandasインスタンスとAPIで動作します。
Pandas UDFは、関数内部のPandas APIやApache Arrowと連携してデータ交換を行います。行単位で実行するPython UDFと比較して、最大100倍まで性能を向上できるベクトル化された操作を可能にします。
以下の例では、単純に各値に1を足すPandas UDFが定義されており、pandas_plus_one
という関数がpandas_udfで装飾され、Pandas UDFTypeがPandasUDFType.SCALAR
と指定されています。
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_plus_one(v):
# `v` is a pandas Series
return v.add(1) # outputs a pandas Series
spark.range(10).select(pandas_plus_one("id")).show()
Python関数はPandas Seriesを受け取り、出力します。この関数内でPandasの豊富なAPI群を利用することで、各値に1を足すベクトル化操作を実行することができます。(De)シリアライゼーションも、Apache Arrowをフードに活用することで自動的にベクトル化されます。
Python型ヒント
Pythonの型ヒント(Type Hints)はPython 3.5のPEP 484で正式に導入されました。型ヒントは、Pythonで値の型を静的に示す公式の方法です。以下の例を見てください。
def greeting(name: str) -> str:
return 'Hello ' + name
name: str
はname
引数がstr
型であることを示し、 ->
構文はgreeting()
関数が文字列を返すことを示しています。
Pythonの型ヒントは、PySparkとPandasのUDFコンテキストに2つの大きな利点をもたらします。
- その関数が何をすることになっているのかを明確に定義し、ユーザーがコードを理解しやすくなる。例えば、文書化されていない限り、型ヒントがなければ
greeting
がNone
を取れるかどうかユーザーは知ることができません。このような微妙なケースを、たくさんのテストケースを用いてドキュメント化したり、ユーザが自分でテストして理解したりする必要がなくなります。 - 静的解析が容易になる。PyCharmやVisual Studio CodeなどのIDEは、型アノテーションを利用して、コード補完やエラーの表示、より優れたgo-to-definition機能をサポートすることができます。
Pandas UDF型の乱立
Apache Spark 2.3のリリース以降、多くの新しいPandas UDFが実装され、ユーザーは新しい仕様とその使い方を学ぶことが難しくなっています。例えば、ほぼ同じ結果を出力する3つのPandas UDFを紹介します。
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
# `v` is a pandas Series
return v + 1 # outputs a pandas Series
spark.range(10).select(pandas_plus_one("id")).show()
from pyspark.sql.functions import pandas_udf, PandasUDFType
# New type of Pandas UDF in Spark 3.0.
@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(itr):
# `iterator` is an iterator of pandas Series.
return map(lambda v: v + 1, itr) # outputs an iterator of pandas Series.
spark.range(10).select(pandas_plus_one("id")).show()
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf("id long", PandasUDFType.GROUPED_MAP)
def pandas_plus_one(pdf):
# `pdf` is a pandas DataFrame
return pdf + 1 # outputs a pandas DataFrame
# `pandas_plus_one` can _only_ be used with `groupby(...).apply(...)`
spark.range(10).groupby('id').apply(pandas_plus_one).show()
これらのUDFタイプはそれぞれ明確な目的を持っていますが、複数のUDFを適用することができます。この単純なケースでは、3つのうちどれでも使うことができます。しかし、Pandas UDFはそれぞれ異なる入出力型を期待し、明確なセマンティックと異なるパフォーマンスで動作します。どれを使って学習すればいいのか、それぞれがどのように動作するのか、ユーザーを混乱させることになります。
さらに、最初のケースと2番目のケースのpandas_plus_one
は、通常のPySparkのカラムが使用される場所で使用することができます。withColumn
の引数や、pandas_plus_one("id") + 1
のような他の式との組み合わせによる関数を考えてみてください。ただし、最後のpandas_plus_one
はgroupby(...).apply(pandas_plus_one)
で使うしかありません。
この複雑さがSparkの開発者と多くの議論を引き起こし、公式の提案を通じてPythonの型ヒントを使った新しいPandas APIを導入する取り組みを推進することになりました。その目的は、上記の問題のようなケースで混乱することなく、ユーザーがPythonの型ヒントを用いて自然にpandas UDFを表現できるようにすることです。例えば、上記のケースは以下のように書くことができます。
def pandas_plus_one(v: pd.Series) -> pd.Series:
return v + 1
def pandas_plus_one(itr: Iterator[pd.Series]) -> Iterator[pd.Series]:
return map(lambda v: v + 1, itr)
def pandas_plus_one(pdf: pd.DataFrame) -> pd.DataFrame:
return pdf + 1
Pythonの型ヒントを使った新しいPandas API
旧来のPandas UDFの複雑さに対応するため、Apache Spark 3.0とPython 3.6以降では、pandas.Series
、pandas.DataFrame
、Tuple
、Iterator
などのPython型ヒントで新しいPandas UDF型を表現できるようにしました。
また、旧来のPandas UDFは、2つのAPIカテゴリに分割されました。Pandas UDFとPandas Function APIsです。これらは内部的には似たような動きをしますが、明確な違いがあります。
Pandas UDFは、他のPySparkのカラムインスタンスを使うのと同じように扱うことができます。しかし、これらのカラムインスタンスでPandas Function APIを使用することはできません。以下はこの2つの例です。
# Pandas UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf, log2, col
@pandas_udf('long')
def pandas_plus_one(s: pd.Series) -> pd.Series:
return s + 1
# pandas_plus_one("id") is identically treated as _a SQL expression_ internally.
# Namely, you can combine with other columns, functions and expressions.
spark.range(10).select(
pandas_plus_one(col("id") - 1) + log2("id") + 1).show()
# Pandas Function API
from typing import Iterator
import pandas as pd
def pandas_plus_one(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
return map(lambda v: v + 1, iterator)
# pandas_plus_one is just a regular Python function, and mapInPandas is
# logically treated as _a separate SQL query plan_ instead of a SQL expression.
# Therefore, direct interactions with other expressions are impossible.
spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show()
また、Pandas UDFはPythonの型ヒントを必要としますが、Pandas Function APIsの型ヒントは現在のところオプションであることに注意してください。型ヒントは Pandas Function API で計画されており、将来的には必須となるかもしれません。
新しいPandas UDF
新しいPandas UDFは、それぞれのPandas UDFの型を手動で定義して指定する代わりに、Python関数で与えられたPython型ヒントからPandas UDFの型を推論します。現在、Pandas UDFでサポートされているPython型ヒントのケースは4つあります。
- Series to Series
- Iterator of Series to Iterator of Series
- Iterator of Multiple Series to Iterator of Series
- Series to Scalar(一つの値)
それぞれのケースを深く掘り下げる前に、新しいPandas UDFを扱う上での3つのポイントを見てみましょう。
- Pythonの世界では一般的にPython型ヒントはオプションですが、新しいPandas UDFを使うには入出力にPython型ヒントを指定する必要があります。
- ユーザーは手動でPandas UDFの型を指定することで、まだ旧来の方法を使うことができます。しかし、Python型ヒントを使うことが推奨されます。
- タイプヒントは全ての場合においてpandas.Seriesを使用する必要があります。しかし、入出力の型ヒントに
pandas.DataFrame
を使うべきバリエーションが1つあります。
import pandas as pd
from pyspark.sql.functions import pandas_udf
df = spark.createDataFrame(
[[1, "a string", ("a nested string",)]],
"long_col long, string_col string, struct_col struct<col1:string>")
@pandas_udf("col1 string, col2 long")
def pandas_plus_len(
s1: pd.Series, s2: pd.Series, pdf: pd.DataFrame) -> pd.DataFrame:
# Regular columns are series and the struct column is a DataFrame.
pdf['col2'] = s1 + s2.str.len()
return pdf # the struct column expects a DataFrame to return
df.select(pandas_plus_len("long_col", "string_col", "struct_col")).show()
Series to Series
Series to Seriesは、Apache Spark 2.3で導入されたスカラーPandas UDFにマッピングされています。型ヒントは pandas.Series, ...-> pandas.Series.
のように表現できます。与えられた関数が1つ以上のpandas.Seriesを受け取り、1つのpandas.Series
を出力することが期待されます。出力の長さは入力と同じであることが期待されます。
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
def pandas_plus_one(s: pd.Series) -> pd.Series:
return s + 1
spark.range(10).select(pandas_plus_one("id")).show()
上記の例は、以下のようにスカラーPandas UDFを用いた旧来のスタイルにマッピングすることができます。
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
return v + 1
spark.range(10).select(pandas_plus_one("id")).show()
Iterator of Series to Iterator of Series
Apache Spark 3.0で登場した新しいタイプのPandas UDFです。Series to Seriesの変形で、型ヒントはIterator[pd.Series] -> Iterator[pd.Series]
のように表現できます。この関数はpandas.Series
のイテレータを受け取り、出力します。
出力全体の長さは,入力全体の長さと同じでなければなりません.したがって,入力と出力の長さが同じであれば,入力イテレータからデータをプリフェッチすることができます.与えられた関数は,1つの列を入力として受け取る必要があります.
from typing import Iterator
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf('long')
def pandas_plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
return map(lambda s: s + 1, iterator)
spark.range(10).select(pandas_plus_one("id")).show()
また、UDFの実行に際して、何らかの状態の初期化が高価になる場合にも有効である。以下の擬似コードはその場合を示している。
@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
# Do some expensive initialization with a state
state = very_expensive_initialization()
for x in iterator:
# Use that state for the whole iterator.
yield calculate_with_state(x, state)
df.select(calculate("value")).show()
Iterator of SeriesからIterator of Seriesへのマッピングは、旧来のPandas UDFスタイルにも対応可能です。以下の例を見てください。
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(iterator):
return map(lambda s: s + 1, iterator)
spark.range(10).select(pandas_plus_one("id")).show()
Iterator of Multiple Series to Iterator of Series
このタイプのPandas UDFは、Iterator of SeriesからIterator of Seriesと共にApache Spark 3.0でも導入される予定です。型ヒントはIterator[Tuple[pandas.Series, ...]]-> Iterator[pandas.Series]
と表現できます。
Iterator of SeriesとIterator of Seriesと同様の性質と制約があります。与えられた関数は、pandas.Series
のタプルのイテレータを受け取り、pandas.Series
のイテレータを出力します。また、いくつかの状態を利用する場合や、入力データをプリフェッチする場合にも有効です。また、出力全体の長さは入力全体の長さと同じであるべきです。ただし、与えられた関数は、Iterator of Series から Iterator of Series と異なり、複数の列を入力として受け取る必要があります。
from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import pandas_udf
@pandas_udf("long")
def multiply_two(
iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
return (a * b for a, b in iterator)
spark.range(10).select(multiply_two("id", "id")).show()
これは、以下のように古いPandas UDFのスタイルにマッピングすることもできます。
from pyspark.sql.functions import pandas_udf, PandasUDFType
@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def multiply_two(iterator):
return (a * b for a, b in iterator)
spark.range(10).select(multiply_two("id", "id")).show()
Series to Scalar
Series to Scalarは、Apache Spark 2.4で導入されたグループ化されたアグリゲートPandas UDFにマッピングされています。型ヒントはpandas.Series, ...-> Any
のように表現できます。。この関数は、1つ以上のpandas.Series
を受け取り、プリミティブなデータ型を出力します。返されるスカラーは、Pythonのプリミティブ型、例えばint
, float
、またはNumPyのデータ型、例えば numpy.int64
, numpy.float64
, などのいずれかにすることが可能です。Any は理想的にはそれに応じて特定のスカラー型であるべきです。
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
@pandas_udf("double")
def pandas_mean(v: pd.Series) -> float:
return v.sum()
df.select(pandas_mean(df['v'])).show()
df.groupby("id").agg(pandas_mean(df['v'])).show()
df.select(pandas_mean(df['v']).over(Window.partitionBy('id'))).show()
上の例は、このようにグループ化された集約Pandas UDFを使った例に変換することができます。
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def pandas_mean(v):
return v.sum()
df.select(pandas_mean(df['v'])).show()
df.groupby("id").agg(pandas_mean(df['v'])).show()
df.select(pandas_mean(df['v']).over(Window.partitionBy('id'))).show()
新しいPandas関数API
Apache Spark 3.0のこの新しいカテゴリでは、PySpark DataFrameに対してPandasインスタンスを取得・出力するPythonネイティブ関数を直接適用することができます。Apache Spark 3.0でサポートされているPandas Functions APIは、grouped map、map、co-grouped mapです。
なお、グループ化マップのPandas UDFは、グループマップのPandas Function APIに分類されるようになりました。前述の通り、Pandas Function APIのPython型ヒントは、現在オプションとなっています。
Grouped Map
Pandas Function APIにおけるGrouped Mapは、df.groupby(...)
などのグループ化されたDataFrameでのapplyInPandas
です。これは、旧来のPandas UDFの型では、grouped map Pandas UDFにマッピングされています。これは、関数内で各グループを各pandas.DataFrame
にマッピングします。出力が入力と同じ長さである必要はないことに注意してください。
import pandas as pd
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").applyInPandas(subtract_mean, schema=df.schema).show()
Grouped Map型は、Spark 2.3からサポートされたGrouped Map Pandas UDFに以下のようにマッピングされています。
import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))
@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
v = pdf.v
return pdf.assign(v=v - v.mean())
df.groupby("id").apply(subtract_mean).show()
Map
Map Pandas Function APIは、DataFrame内のmapInPandas
です。Apache Spark 3.0で新しく追加された。各パーティションの各バッチをマッピングし、それぞれを変換します。この関数は、pandas.DataFrame
のイテレータを受け取り、pandas.DataFrame
のイテレータを出力します。出力の長さは、入力のサイズと一致する必要はありません。
from typing import Iterator
import pandas as pd
df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))
def pandas_filter(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
for pdf in iterator:
yield pdf[pdf.id == 1]
df.mapInPandas(pandas_filter, schema=df.schema).show()
Co-grouped Map
Apache Spark 3.0では、df.groupby(...).cogroup(...)
のようなco-grouped DataFrameでapplyInPandas
するco-grouped Mapも導入される予定だそうです。grouped mapと同様に、各グループと各pandas.DataFrame
を関数でマッピングしますが、共通のキーで別のDataFrameとグループ化し、各cogroupに関数を適用することになります。同様に、出力の長さにも制限はありません。
import pandas as pd
df1 = spark.createDataFrame(
[(1201, 1, 1.0), (1201, 2, 2.0), (1202, 1, 3.0), (1202, 2, 4.0)],
("time", "id", "v1"))
df2 = spark.createDataFrame(
[(1201, 1, "x"), (1201, 2, "y")], ("time", "id", "v2"))
def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
return pd.merge_asof(left, right, on="time", by="id")
df1.groupby("id").cogroup(
df2.groupby("id")
).applyInPandas(asof_join, "time int, id int, v1 double, v2 string").show()
まとめと今後
近日リリース予定のApache Spark 3.0(詳細はプレビューブログ)では、Pythonの型ヒントを提供し、ユーザーがPandas UDFとPandas Function APIをより簡単に表現できるようにします。将来的には、Pandas UDFとPandas Function APIの両方で、他の型ヒントの組み合わせのサポートを追加することを検討する必要があります。現在、サポートされているケースは、Pythonの型ヒントの多くの可能な組み合わせのうちの一部に過ぎません。また、Apache Sparkコミュニティでは他にも進行中の議論があります。詳しくはSide Discussions and Future Improvementをご覧ください。
Spark 3.0の詳細については、プレビューWebinarをご覧ください。 Databricks Runtime 7.0 Betaの一部として、これらの新機能をDatabricksで今すぐ無料でお試しください。