はじめに
PySparkで機械学習を行う際、MLライブラリでは機能が不十分であることもあり、scikit-learnなど他のライブラリを利用したいと思うことがある。
その際の学習は、そもそもSparkのDataFrameが対応していないので別途行う必要があるが、推論についてはUDFを使えばスムーズにできるので、備忘として掲載。
※ここでは推論についてのみ扱い、学習自体は扱わない
やり方
学習済みのモデル(model
: scikit-learnのイメージ)がある時、以下のように行えばよい。
data
が推論データのDataFrameで、features
は説明変数のリストである。
ここでは、model.predict(x)
で予測した結果を返しており、作成したモデルの予測関数に適宜入れ替える必要がある。
同様に、返り値が連続値になる場合は、DoubleType()
などに変更する。
pyspark上で学習済モデルを使った推論
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import IntegerType
@pandas_udf(returnType=IntegerType())
def predict_udf(*cols):
X = pd.concat(cols, axis=1)
return pd.Series(model.predict(X))
data.withColumn('predict', predict_udf(*features))