2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

DatabricksでPandasをスケール:Pandas UDFへのパラメーターの引き渡し

Last updated at Posted at 2024-11-11

Scaling Pandas with Databricks: Passing Parameters... - Databricks Community - 65123の翻訳です。

本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

データサイエンスの世界では、レガシーコードを最適化したり移行すると言うことが頻繁にあります。この記事では、applyInPandasやPandas UDFのようなアプローチを用いることで、既存のPandasのコードベースをよりスケーラブルで動的にすることで、多くのデータサイエンティストやデータエンジニアが直面する一般的な技術的な課題に取り組みます。クロージャを用いてapplyInPandasやPandas UDFにパラメーターを引き渡す方法を説明する前に、基本的なPandas UDFのユースケースをウォークスルーします。このアプローチは、あなたのコードをチューニングする助けとなり、Databricksによって提供されるパワフルな機能の大部分を構成しています。

Pandas UDFとは?

Pandas UDF(User Defined Function: ユーザー定義関数)は、ベクトル化された方法でPandasデータフレームやシリーズにカスタム関数を適用できるようにしてくれるパワフルな機能です。PyArrowの助けを借りることで、従来のforループを用いた場合と比較して、劇的にパフォーマンスを改善することができます。applyInPandas関数は、データフレームやシリーズのデータに対するオペレーションを実行するためにどのようにPandas UDFを活用できるのかを示す優れた例となっています。また、関数の入出力の方を指定できるようにしてくれるpandas_udfデコレーターを用いてPandas UDFを定義することもできます。定義すると、Sparkdーえたフレームに対して並列でUDFを適用することができるので、forループの直列的なオペレーションよりもはるかに高速になります。

Pandas UDFのユースケース

Pandas UDFはデータクリーニング、特徴量エンジニアリング、データ分析のように様々なタスクで活用することができます。多くの場合、シングルノードの環境にある既存のPandasコードを、使用されているロジックやライブラリを変更することなしに、分散Spark環境に移行するために用いられます。以下に、それぞれのエンジンのタイプを持つSparkデータフレームの値を正規化するためにapplyInPandasを用いる例を示します:

import pandas as pd

df = spark.createDataFrame(pd.DataFrame({'type': ['turbine', 'turbine', 'propeller', 'turbine', 'propeller', 'propeller'], 'sensor_reading': [10, 7, 25, 12, 29, 36]}))

def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
   reading = pdf.sensor_reading
   pdf['normalized'] = reading.mean() / reading.std()
   return pdf

expected_schema = 'type string, sensor_reading long, normalized long'
df.groupBy('type').applyInPandas(normalize, expected_schema).show()
Output:

+---------+--------------+----------+
|     type|sensor_reading|normalized|
+---------+--------------+----------+
|propeller|            25|         5|
|propeller|            29|         5|
|propeller|            36|         5|
|  turbine|            10|         3|
|  turbine|             7|         3|
|  turbine|            12|         3|
+---------+--------------+----------+

課題:applyInPandasへのパラメーターの引き渡し

Pandas UDFで何かしらのハイパーパラメーターチューニングを実施たり、関数に対する入力として動的な変数を使いたくなったらどうしましょうか?残念ながら、applyInPandasへのパラメーターの引き渡しは直接的にはサポートされていません。applyInPandasは、関数が適用されるグルーピングされたデータフレームである単一の引数しか受け付けません。別のパラメーターを追加するとエラーになります:

# We don't have a way to pass a value like the mean of the whole dataframe - this throws an error

def normalize_plus_value(pdf: pd.DataFrame, value: int) -> pd.DataFrame:
   reading = pdf.sensor_reading
   pdf['normalized'] = value + (reading.mean() / reading.std())
   return pdf

df.groupBy('type').applyInPandas(normalize_plus_value, 'type string, sensor_reading long, normalized long').show()
AttributeError: 'tuple' object has no attribute 'sensor_reading'

ソリューション:パラメーターを引き渡すためにクロージャを使う

この問題に対するソリューションの一つが、クロージャを使うことです。クロージャは、(包含している)外部の関数スコープにある変数にアクセスできる関数です。クロージャ内に関数を定義することで、applyInPandasに引き渡したいパラメーターをキャプチャする動的な関数を作成することができます。

以下に、applyInPandasへのパラメーターの引き渡しにクロージャを用いる例を示します:

def normalize_with_value(value: int):

   # Returning this function "injects" the value into the function we'll use for applyInPandas
   def normalize(pdf: pd.DataFrame) -> pd.DataFrame:
       reading = pdf.sensor_reading
       pdf['normalized'] = value - (reading.mean() / reading.std())
       return pdf

   return normalize

# Now we can initialize the function with a value inserted
average = df.selectExpr('avg(sensor_reading) as average').collect()[0][0]

dynamic_normalize = normalize_with_value(average)

df.groupBy('type').applyInPandas(dynamic_normalize, 'type string, sensor_reading long, normalized long').show()
Output:

+---------+--------------+----------+
|     type|sensor_reading|normalized|
+---------+--------------+----------+
|propeller|            25|        14|
|propeller|            29|        14|
|propeller|            36|        14|
|  turbine|            10|        15|
|  turbine|             7|        15|
|  turbine|            12|        15|
+---------+--------------+----------+

Pandas UDFでも同じことが可能です。このデモでは、Pandas UDFにARIMAモデルのハイパーパラメーターを引き渡しています。

from pyspark.sql.functions import pandas_udf
from statsmodels.tsa.arima.model import ARIMA

# Fit and run an ARIMA model using a Pandas UDF with the hyperparameters passed in
def create_arima_forecaster(order):

   @pandas_udf("double")
   def forecast_arima(value: pd.Series) -> pd.Series:
       model = ARIMA(value, order=order)
       model_fit = model.fit()
       return model_fit.predict()

   return forecast_arima

# Minimal Spark code - just select one column and add another. We can still use Pandas for our logic
forecast_arima = create_arima_forecaster((1, 2, 3))

df.withColumn('predicted_reading', forecast_arima('sensor_reading')).show()

Pnadas UDFのパラメーターのアプリケーション

以下のような様々なシナリオにおいて、Pandas UDFへのパラメーターの引き渡しは有用なものとなります:

  • ハイパーパラメーターチューニング
  • applyInPandasによるオブジェクト指向プログラミング - MLflow Pyfunc modelにおける動的な特徴量エンジニアリング関数など
  • 複雑で動的なデータパイプラインの構築

例えば、Pandas UDFから呼び出されるARIMAモデルのハイパーパラメーターのチューニングを行いたい場合、様々なハイパーパラメーターの選択肢を引き渡すために同じアプローチを活用することができます:

from hyperopt import hp, fmin, tpe, Trials
from pyspark.ml.evaluation import RegressionEvaluator

# Define the hyperparameter search space
search_space = {'p': 1, 'd': hp.quniform('d', 2, 3, 1), 'q': hp.quniform('q', 2, 4, 1)}

# Define the objective function to be minimized
def objective(params):
   order = (params['p'], params['d'], params['q'])
   forecast_arima = create_arima_forecaster(order)
   arima_output = df.withColumn('predicted_reading', forecast_arima('sensor_reading'))

   evaluator = RegressionEvaluator(predictionCol="predicted_reading",
                                   labelCol="sensor_reading", 
                                   metricName="rmse")

   rmse = evaluator.evaluate(arima_output)
   return rmse

# Run the hyperparameter optimization
trials = Trials()
best = fmin(fn=objective, space=search_space, algo=tpe.suggest, max_evals=6, trials=trials)

print('Best hyperparameters: ', best)

この例では、モデルに対する効果的な設定を自動で計測するために、Hyperoptを活用しています。objective関数は入力としてハイパーパラメーターのセットを受け取り、create_arima_forecaster関数を用いてそれらのハイパーパラメーターを持つARIMA予測器を作成し、Pandas UDFのforecast_arimaを用いて入力データフレームに予測器を適用します。そして、root mean squared error (RMSE)が目的関数の値として返却されるように、結果として得られるデータフレームが評価されます。Hyperoptのhp関数を用いて、ハイパーパラメーターの探索空間が定義されるので、それぞれのハイパーパラメーターに対して検索する値の範囲を指定することができます。最後に、目的関数、探索空間、最適化アルゴリズム、実行する評価の回数を指定したHyperoptのfmin関数を使用します。結果として得られる最適化ハイパーパラメーターが表示され、予測をさらに前進させるために活用することができます。

Databricksランタイム上で並列ハイパーパラメーターチューニングを実行する際には、標準的なHyperoptのトライアルではなくSparkTrialsによって実行されることにも触れておきます。SparkTrialsは自動的にSparkクラスターに目的関数のそれぞれの処理を分散させ、期待する並列度を容易に設定することができます。実際のところ、SparkTrialsDatabricks AutoMLの内部で活用されています!

まとめ

この記事では、applyInPandasやPandas UDFにパラメーターを引き渡すためのちょっとしたプログラミング上のトリックを学びました。また、戦略的なパラメーターチューニングにおける重要なユースケースも見てきました。このトピックの理論をさらに探索するには、若干異なる方法で同じゴールを多声するためのPythonのfunctools partialsの活用について調査することをお勧めします。最終的には、レガシーあるいはカスタムのPandasコードをより動的かつスケーラブルにするためのクロージャの使い方をさらに理解していただけたらと考えています。

はじめてのDatabricks

はじめてのDatabricks

Databricks無料トライアル

Databricks無料トライアル

2
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?