この記事について
Sparkは大規模データを高速に処理できるメリットがある一方で、pandasに比べるとまだまだ柔軟な処理ができるとは言い難い現状です。そこで、Sparkに実装されていない関数については、UDFを利用することがありますが、パフォーマンスが決して良いとは言えない状況です。
そこで、spark 2.3.0から登場したpandas UDFを使うと、高速かつ柔軟にデータを処理することができます。UDFを含めた概要についてはこちらの記事も見てみてください。
pandas UDFの概要
pandasUDFの基本的な構成は pandas.Series
もしくは pandas.DataFrame
を受け付け、出力値として同様にpandas.Series
もしくは pandas.DataFrame
を返すような形で記述することができます。
pandasUDFには、 SCALER
, GROUPED_AGG
, GROUPED_MAP
の3種類があり、それぞれ受け付ける入力と出力が異なります。
-
SCALER
- 一言でいうと:
pandas.Series
に対する単純なデータ処理が行える。 - 入力:
pandas.Series
- 出力:
pandas.Series
- ポイント: 入力と出力のデータが同一の形式である必要がある。
- 一言でいうと:
-
GROUPED_AGG
- 一言でいうと: 特定の一つまたは複数のデータに対する処理を行い、集計値を返すような操作ができる。
- 入力:
pandas.Series
- 出力:
pandas.Series
- ポイント: 入力と出力のデータは必ずしも同一の形式でなくても良い。
-
GROUPED_MAP
- 一言でいうと: 集計を含めた特定の複数のデータに対する処理を行い、元のデータの個別の値に対する操作ができる。文字通りmapですね。(具体的な例だと例えばwindow関数のようなイメージ)
- 入力:
pandas.DataFrame
- 出力:
pandas.DataFrame
- ポイント: 入力と出力のデータは必ずしも同一の形式でなくても良い。
SCALER
SCALERの記法は非常にシンプルです。逆にいうと複雑な処理はできないのですが…。
Databricksのブログからコード例を拝借しています(一部コメントとdocstringを追加)。
from pyspark.sql.functions import pandas_udf, PandasUDFType
#pandas_udfに、入力値の型(double)と、PandasUDFType(SCALAR)を指定
@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_plus_one(v):
"""
1を足して返すだけの関数
"""
return v + 1
df = df.withColumn('v2', pandas_plus_one(df.v))
これなら、pandasUDFではない標準のUDFでも書けますが、公式のベンチマークによると、データを一つ一つ処理する標準のUDFに比べ、vectrizeにより速度が出るとのこと。
ただし、手元で、いくつかシンプルな処理を通常のUDFで書いた場合と、pandasUDFで書いた場合でそこまでの速度差は出ないよう…。
ちなみに、SCALARを使ったもうちょっと実用的な例だと、その値の平均からの差分を取ったりもできます。
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
# pandas_udfに、入力値の型(double)と、PandasUDFType(SCALAR)を指定
@pandas_udf('double', PandasUDFType.SCALAR)
def pandas_mean_diff(pds):
"""
平均値との差分を返す関数
"""
return pds - pds.mean()
df = df.withColumn('mean_diff', pandas_mean_diff(col("v")))
df.show(10, False)
GROUPED_AGG
GROUPED_AGGは複数のSeriesを扱えることがSCALERとの違いです。
~~実用性はわかりませんが、~~例えば複数のSeriesを使うことでこんなこともできます。
pandas_UDF内に渡せるのはあくまでもpd.Seriesであることに注意しましょう。
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField
from pyspark.sql.functions import pandas_udf, PandasUDFType, struct, col
@pandas_udf("double", PandasUDFType.GROUPED_AGG)
def filtered_mean(pds1, pds2):
"""
カラム:vとカラム:v2の差分の平均をとる関数
"""
pdf = pds1 - pds2
return pdf.mean()
df.groupBy("group").agg(filtered_mean("v", "v2")).show(10, False)
GROUPED_MAP
GROUPED_MAPはこれまでの2種類と違い、pandas_UDF内部にpandas.DataFrameを渡すことができます。例えば、こんな感じで、2つの特徴量によるKmeansをgroupカラムごとに実行することができます。
K-meansならすでにpyspark.mlにもありますが、pyspark.mlに実装されている機械学習アルゴリズムの方が少ないですし、こうした処理を書けるということはひとつメリットかなと思います。
from pyspark.sql.functions import col, pandas_udf, PandasUDFType
schema = StructType([
StructField("group", StringType(), False),
StructField("v", DoubleType(), False),
StructField("v2", DoubleType(), False),
StructField("cluster", IntegerType(), False)
])
@pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def grouped_km(pdf):
"""
groupごとにKMeansによるクラスタリングを実施しデータを5つに分ける。
"""
from sklearn.cluster import KMeans
km = KMeans(n_clusters=5, random_state=71)
pdf.loc[:, "cluster"] = km.fit_predict(pdf.loc[:, ["v", "v2"]])
return pdf
df.groupBy("group").apply(hml_rank).show(10,False)