4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Spark3.0における新機能: Pandas UDFとPython型ヒント

Posted at

この記事はNew Pandas UDFs and Python Type Hints in the Upcoming Release of Apache Spark 3.0の2022/02/08時点での翻訳になります。

Pandasのユーザー定義関数(UDF)は、データサイエンス向けのApache Sparkの最も重要な機能強化の1つです。PandasのAPIを利用できるようになったり、パフォーマンスが向上したりと、多くのメリットをもたらしてくれます。

しかし、Pandas UDFは時間の経過とともに有機的に進化してきたため、いくつかの矛盾が生じ、ユーザーの間に混乱が生じています。近々リリースされる予定のApache Spark 3.0では、Pythonの型ヒントを活用したPandas UDFの新しいインターフェースが導入され、Pandas UDFの型の増殖に対処し、よりPythonicで自己記述的なものになるように支援する予定です。

このブログ記事では、Pythonのタイプヒントを利用した新しいPandas UDFと、グループ化マップ、マップ、コグループ化マップを含む新しいPandas Function APIを紹介します。

Pandas UDF

Pandas UDFはSpark 2.3で導入されましたが、Introducing Pandas UDF for PySparkの項もご覧ください。Pandasはデータサイエンティストによく知られており、NumPystatsmodelscikit-learnなどの多くのPythonライブラリやパッケージとシームレスに統合されており、Pandas UDFによってデータサイエンティストはワークロードをスケールアウトできるだけでなく、Apache SparkでPandas APIを利用することができるようになりました。

ユーザー定義関数は、以下のように実行されます。

  • Apache Arrowは、JVMとPythonのドライバ/エグゼキュータ間で、ほぼゼロの(デ)シリアライズコストで直接データを交換することができます。
  • 関数内部のPandasは、PandasインスタンスとAPIで動作します。

Pandas UDFは、関数内部のPandas APIやApache Arrowと連携してデータ交換を行います。行単位で実行するPython UDFと比較して、最大100倍まで性能を向上できるベクトル化された操作を可能にします。

以下の例では、単純に各値に1を足すPandas UDFが定義されており、pandas_plus_oneという関数がpandas_udfで装飾され、Pandas UDFTypeがPandasUDFType.SCALARと指定されています。

from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_plus_one(v):
    # `v` is a pandas Series
    return v.add(1)  # outputs a pandas Series

spark.range(10).select(pandas_plus_one("id")).show()

Python関数はPandas Seriesを受け取り、出力します。この関数内でPandasの豊富なAPI群を利用することで、各値に1を足すベクトル化操作を実行することができます。(De)シリアライゼーションも、Apache Arrowをフードに活用することで自動的にベクトル化されます。

Python型ヒント

Pythonの型ヒント(Type Hints)はPython 3.5のPEP 484で正式に導入されました。型ヒントは、Pythonで値の型を静的に示す公式の方法です。以下の例を見てください。

def greeting(name: str) -> str:
    return 'Hello ' + name

name: strname引数がstr型であることを示し、 ->構文はgreeting()関数が文字列を返すことを示しています。

Pythonの型ヒントは、PySparkとPandasのUDFコンテキストに2つの大きな利点をもたらします。

  • その関数が何をすることになっているのかを明確に定義し、ユーザーがコードを理解しやすくなる。例えば、文書化されていない限り、型ヒントがなければgreetingNoneを取れるかどうかユーザーは知ることができません。このような微妙なケースを、たくさんのテストケースを用いてドキュメント化したり、ユーザが自分でテストして理解したりする必要がなくなります。
  • 静的解析が容易になる。PyCharmVisual Studio CodeなどのIDEは、型アノテーションを利用して、コード補完やエラーの表示、より優れたgo-to-definition機能をサポートすることができます。

Pandas UDF型の乱立

Apache Spark 2.3のリリース以降、多くの新しいPandas UDFが実装され、ユーザーは新しい仕様とその使い方を学ぶことが難しくなっています。例えば、ほぼ同じ結果を出力する3つのPandas UDFを紹介します。

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                                                                                                                                                                                                                              

@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
    # `v` is a pandas Series
    return v + 1  # outputs a pandas Series

spark.range(10).select(pandas_plus_one("id")).show()
from pyspark.sql.functions import pandas_udf, PandasUDFType


# New type of Pandas UDF in Spark 3.0.
@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(itr):
    # `iterator` is an iterator of pandas Series.
    return map(lambda v: v + 1, itr)  # outputs an iterator of pandas Series.

spark.range(10).select(pandas_plus_one("id")).show()
from pyspark.sql.functions import pandas_udf, PandasUDFType

@pandas_udf("id long", PandasUDFType.GROUPED_MAP)
def pandas_plus_one(pdf):
    # `pdf` is a pandas DataFrame
    return pdf + 1  # outputs a pandas DataFrame

# `pandas_plus_one` can _only_ be used with `groupby(...).apply(...)`
spark.range(10).groupby('id').apply(pandas_plus_one).show()

これらのUDFタイプはそれぞれ明確な目的を持っていますが、複数のUDFを適用することができます。この単純なケースでは、3つのうちどれでも使うことができます。しかし、Pandas UDFはそれぞれ異なる入出力型を期待し、明確なセマンティックと異なるパフォーマンスで動作します。どれを使って学習すればいいのか、それぞれがどのように動作するのか、ユーザーを混乱させることになります。

さらに、最初のケースと2番目のケースのpandas_plus_oneは、通常のPySparkのカラムが使用される場所で使用することができます。withColumnの引数や、pandas_plus_one("id") + 1のような他の式との組み合わせによる関数を考えてみてください。ただし、最後のpandas_plus_onegroupby(...).apply(pandas_plus_one)で使うしかありません。

この複雑さがSparkの開発者と多くの議論を引き起こし、公式の提案を通じてPythonの型ヒントを使った新しいPandas APIを導入する取り組みを推進することになりました。その目的は、上記の問題のようなケースで混乱することなく、ユーザーがPythonの型ヒントを用いて自然にpandas UDFを表現できるようにすることです。例えば、上記のケースは以下のように書くことができます。

def pandas_plus_one(v: pd.Series) -> pd.Series:
    return v + 1
def pandas_plus_one(itr: Iterator[pd.Series]) -> Iterator[pd.Series]:
    return map(lambda v: v + 1, itr)
def pandas_plus_one(pdf: pd.DataFrame) -> pd.DataFrame:
    return pdf + 1

Pythonの型ヒントを使った新しいPandas API

旧来のPandas UDFの複雑さに対応するため、Apache Spark 3.0とPython 3.6以降では、pandas.Seriespandas.DataFrameTupleIteratorなどのPython型ヒントで新しいPandas UDF型を表現できるようにしました。

また、旧来のPandas UDFは、2つのAPIカテゴリに分割されました。Pandas UDFとPandas Function APIsです。これらは内部的には似たような動きをしますが、明確な違いがあります。

Pandas UDFは、他のPySparkのカラムインスタンスを使うのと同じように扱うことができます。しかし、これらのカラムインスタンスでPandas Function APIを使用することはできません。以下はこの2つの例です。

# Pandas UDF
import pandas as pd
from pyspark.sql.functions import pandas_udf, log2, col

@pandas_udf('long')
def pandas_plus_one(s: pd.Series) -> pd.Series:
    return s + 1

# pandas_plus_one("id") is identically treated as _a SQL expression_ internally.
# Namely, you can combine with other columns, functions and expressions.
spark.range(10).select(
    pandas_plus_one(col("id") - 1) + log2("id") + 1).show()
# Pandas Function API
from typing import Iterator
import pandas as pd


def pandas_plus_one(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    return map(lambda v: v + 1, iterator)


# pandas_plus_one is just a regular Python function, and mapInPandas is
# logically treated as _a separate SQL query plan_ instead of a SQL expression. 
# Therefore, direct interactions with other expressions are impossible.
spark.range(10).mapInPandas(pandas_plus_one, schema="id long").show()

また、Pandas UDFはPythonの型ヒントを必要としますが、Pandas Function APIsの型ヒントは現在のところオプションであることに注意してください。型ヒントは Pandas Function API で計画されており、将来的には必須となるかもしれません。

新しいPandas UDF

新しいPandas UDFは、それぞれのPandas UDFの型を手動で定義して指定する代わりに、Python関数で与えられたPython型ヒントからPandas UDFの型を推論します。現在、Pandas UDFでサポートされているPython型ヒントのケースは4つあります。

  1. Series to Series
  2. Iterator of Series to Iterator of Series
  3. Iterator of Multiple Series to Iterator of Series
  4. Series to Scalar(一つの値)

それぞれのケースを深く掘り下げる前に、新しいPandas UDFを扱う上での3つのポイントを見てみましょう。

  • Pythonの世界では一般的にPython型ヒントはオプションですが、新しいPandas UDFを使うには入出力にPython型ヒントを指定する必要があります。
  • ユーザーは手動でPandas UDFの型を指定することで、まだ旧来の方法を使うことができます。しかし、Python型ヒントを使うことが推奨されます。
  • タイプヒントは全ての場合においてpandas.Seriesを使用する必要があります。しかし、入出力の型ヒントにpandas.DataFrameを使うべきバリエーションが1つあります。
import pandas as pd
from pyspark.sql.functions import pandas_udf


df = spark.createDataFrame(
    [[1, "a string", ("a nested string",)]],
    "long_col long, string_col string, struct_col struct<col1:string>")

@pandas_udf("col1 string, col2 long")
def pandas_plus_len(
        s1: pd.Series, s2: pd.Series, pdf: pd.DataFrame) -> pd.DataFrame:
    # Regular columns are series and the struct column is a DataFrame.
    pdf['col2'] = s1 + s2.str.len() 
    return pdf  # the struct column expects a DataFrame to return

df.select(pandas_plus_len("long_col", "string_col", "struct_col")).show()

Series to Series

Series to Seriesは、Apache Spark 2.3で導入されたスカラーPandas UDFにマッピングされています。型ヒントは pandas.Series, ...-> pandas.Series.のように表現できます。与えられた関数が1つ以上のpandas.Seriesを受け取り、1つのpandas.Seriesを出力することが期待されます。出力の長さは入力と同じであることが期待されます。

import pandas as pd
from pyspark.sql.functions import pandas_udf       

@pandas_udf('long')
def pandas_plus_one(s: pd.Series) -> pd.Series:
    return s + 1

spark.range(10).select(pandas_plus_one("id")).show()

上記の例は、以下のようにスカラーPandas UDFを用いた旧来のスタイルにマッピングすることができます。

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                                                                                                                                                                                                                   

@pandas_udf('long', PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return v + 1

spark.range(10).select(pandas_plus_one("id")).show()

Iterator of Series to Iterator of Series

Apache Spark 3.0で登場した新しいタイプのPandas UDFです。Series to Seriesの変形で、型ヒントはIterator[pd.Series] -> Iterator[pd.Series]のように表現できます。この関数はpandas.Seriesのイテレータを受け取り、出力します。

出力全体の長さは,入力全体の長さと同じでなければなりません.したがって,入力と出力の長さが同じであれば,入力イテレータからデータをプリフェッチすることができます.与えられた関数は,1つの列を入力として受け取る必要があります.

from typing import Iterator
import pandas as pd
from pyspark.sql.functions import pandas_udf       

@pandas_udf('long')
def pandas_plus_one(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    return map(lambda s: s + 1, iterator)

spark.range(10).select(pandas_plus_one("id")).show()

また、UDFの実行に際して、何らかの状態の初期化が高価になる場合にも有効である。以下の擬似コードはその場合を示している。

@pandas_udf("long")
def calculate(iterator: Iterator[pd.Series]) -> Iterator[pd.Series]:
    # Do some expensive initialization with a state
    state = very_expensive_initialization()
    for x in iterator:
        # Use that state for the whole iterator.
        yield calculate_with_state(x, state)

df.select(calculate("value")).show()

Iterator of SeriesからIterator of Seriesへのマッピングは、旧来のPandas UDFスタイルにも対応可能です。以下の例を見てください。

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                   

@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def pandas_plus_one(iterator):
    return map(lambda s: s + 1, iterator)

spark.range(10).select(pandas_plus_one("id")).show()

Iterator of Multiple Series to Iterator of Series

このタイプのPandas UDFは、Iterator of SeriesからIterator of Seriesと共にApache Spark 3.0でも導入される予定です。型ヒントはIterator[Tuple[pandas.Series, ...]]-> Iterator[pandas.Series]と表現できます。

Iterator of SeriesとIterator of Seriesと同様の性質と制約があります。与えられた関数は、pandas.Seriesのタプルのイテレータを受け取り、pandas.Seriesのイテレータを出力します。また、いくつかの状態を利用する場合や、入力データをプリフェッチする場合にも有効です。また、出力全体の長さは入力全体の長さと同じであるべきです。ただし、与えられた関数は、Iterator of Series から Iterator of Series と異なり、複数の列を入力として受け取る必要があります。

from typing import Iterator, Tuple
import pandas as pd
from pyspark.sql.functions import pandas_udf       

@pandas_udf("long")
def multiply_two(
        iterator: Iterator[Tuple[pd.Series, pd.Series]]) -> Iterator[pd.Series]:
    return (a * b for a, b in iterator)

spark.range(10).select(multiply_two("id", "id")).show()

これは、以下のように古いPandas UDFのスタイルにマッピングすることもできます。

from pyspark.sql.functions import pandas_udf, PandasUDFType                                                                                                                                                                                                                                                                                                                                                                                                                              

@pandas_udf('long', PandasUDFType.SCALAR_ITER)
def multiply_two(iterator):
    return (a * b for a, b in iterator)

spark.range(10).select(multiply_two("id", "id")).show()

Series to Scalar

Series to Scalarは、Apache Spark 2.4で導入されたグループ化されたアグリゲートPandas UDFにマッピングされています。型ヒントはpandas.Series, ...-> Anyのように表現できます。。この関数は、1つ以上のpandas.Seriesを受け取り、プリミティブなデータ型を出力します。返されるスカラーは、Pythonのプリミティブ型、例えばint, float、またはNumPyのデータ型、例えば numpy.int64, numpy.float64, などのいずれかにすることが可能です。Any は理想的にはそれに応じて特定のスカラー型であるべきです。

import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

@pandas_udf("double")
def pandas_mean(v: pd.Series) -> float:
    return v.sum()

df.select(pandas_mean(df['v'])).show()
df.groupby("id").agg(pandas_mean(df['v'])).show()
df.select(pandas_mean(df['v']).over(Window.partitionBy('id'))).show()

上の例は、このようにグループ化された集約Pandas UDFを使った例に変換することができます。

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql import Window

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def pandas_mean(v):
    return v.sum()

df.select(pandas_mean(df['v'])).show()
df.groupby("id").agg(pandas_mean(df['v'])).show()
df.select(pandas_mean(df['v']).over(Window.partitionBy('id'))).show()

新しいPandas関数API

Apache Spark 3.0のこの新しいカテゴリでは、PySpark DataFrameに対してPandasインスタンスを取得・出力するPythonネイティブ関数を直接適用することができます。Apache Spark 3.0でサポートされているPandas Functions APIは、grouped map、map、co-grouped mapです。

なお、グループ化マップのPandas UDFは、グループマップのPandas Function APIに分類されるようになりました。前述の通り、Pandas Function APIのPython型ヒントは、現在オプションとなっています。

Grouped Map

Pandas Function APIにおけるGrouped Mapは、df.groupby(...)などのグループ化されたDataFrameでのapplyInPandasです。これは、旧来のPandas UDFの型では、grouped map Pandas UDFにマッピングされています。これは、関数内で各グループを各pandas.DataFrameにマッピングします。出力が入力と同じ長さである必要はないことに注意してください。

import pandas as pd

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

def subtract_mean(pdf: pd.DataFrame) -> pd.DataFrame:
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").applyInPandas(subtract_mean, schema=df.schema).show()

Grouped Map型は、Spark 2.3からサポートされたGrouped Map Pandas UDFに以下のようにマッピングされています。

import pandas as pd
from pyspark.sql.functions import pandas_udf, PandasUDFType

df = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)], ("id", "v"))

@pandas_udf(df.schema, PandasUDFType.GROUPED_MAP)
def subtract_mean(pdf):
    v = pdf.v
    return pdf.assign(v=v - v.mean())

df.groupby("id").apply(subtract_mean).show()

Map

Map Pandas Function APIは、DataFrame内のmapInPandasです。Apache Spark 3.0で新しく追加された。各パーティションの各バッチをマッピングし、それぞれを変換します。この関数は、pandas.DataFrameのイテレータを受け取り、pandas.DataFrameのイテレータを出力します。出力の長さは、入力のサイズと一致する必要はありません。

from typing import Iterator
import pandas as pd

df = spark.createDataFrame([(1, 21), (2, 30)], ("id", "age"))

def pandas_filter(iterator: Iterator[pd.DataFrame]) -> Iterator[pd.DataFrame]:
    for pdf in iterator:
        yield pdf[pdf.id == 1]

df.mapInPandas(pandas_filter, schema=df.schema).show()

Co-grouped Map

Apache Spark 3.0では、df.groupby(...).cogroup(...)のようなco-grouped DataFrameでapplyInPandasするco-grouped Mapも導入される予定だそうです。grouped mapと同様に、各グループと各pandas.DataFrameを関数でマッピングしますが、共通のキーで別のDataFrameとグループ化し、各cogroupに関数を適用することになります。同様に、出力の長さにも制限はありません。

import pandas as pd

df1 = spark.createDataFrame(
    [(1201, 1, 1.0), (1201, 2, 2.0), (1202, 1, 3.0), (1202, 2, 4.0)],
    ("time", "id", "v1"))
df2 = spark.createDataFrame(
    [(1201, 1, "x"), (1201, 2, "y")], ("time", "id", "v2"))

def asof_join(left: pd.DataFrame, right: pd.DataFrame) -> pd.DataFrame:
    return pd.merge_asof(left, right, on="time", by="id")

df1.groupby("id").cogroup(
    df2.groupby("id")
).applyInPandas(asof_join, "time int, id int, v1 double, v2 string").show()

まとめと今後

近日リリース予定のApache Spark 3.0(詳細はプレビューブログ)では、Pythonの型ヒントを提供し、ユーザーがPandas UDFとPandas Function APIをより簡単に表現できるようにします。将来的には、Pandas UDFとPandas Function APIの両方で、他の型ヒントの組み合わせのサポートを追加することを検討する必要があります。現在、サポートされているケースは、Pythonの型ヒントの多くの可能な組み合わせのうちの一部に過ぎません。また、Apache Sparkコミュニティでは他にも進行中の議論があります。詳しくはSide Discussions and Future Improvementをご覧ください。

Spark 3.0の詳細については、プレビューWebinarをご覧ください。 Databricks Runtime 7.0 Betaの一部として、これらの新機能をDatabricksで今すぐ無料でお試しください。

4
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?