How to Profile PySpark - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
Apache Spark™においては、ビッグデータワークロードのために宣言型のPython APIがサポートされています。これらは大部分のユースケースを取り扱えるほどにパワフルなものです。さらに、PySparkのUDF(ユーザー定義関数)を用いることで、ユーザーはApache Spark™エンジンで任意のPythonコードを実行できるので、更なる柔軟性を提供します。ユーザーは「何をするのか」を宣言するだけで良いのです。PySparkはサンドボックスとして「どのようにおこなうのか」をカプセル化します。これによって、PySparkは使いやすいものになっていますが、パフォーマンスのボトルネックを特定し、カスタムの最適化を適用することが困難になることがあります。
上述した困難さに取り組むために、PySparkでは標準的なPythonプロファイラー実装の一つであるcProfileをベースとした様々なプロファイリングツールをサポートしています。PySparkプロファイラーは関数呼び出し回数、特定の関数に費やされた時間の合計、ファイル名、ナビゲーションをサポートするための行番号のような情報を提供します。これらの情報は、PySparkプログラムの緊密なループを露出させ、パフォーマンス改善の意思決定を行うためには重要なものとなります。
ドライバーのプロファイリング
PySparkアプリケーションは、ドライバープログラムのSparkContextオブジェクトによって調整されるクラスターの独立した一連のプロセスとして実行されます。ドライバー側では、PySparkは通常のPythonプロセスとなります。このため、以下に示す様にcProfileを用いて通常のPythonプログラムと同じ様にプロファイリングすることができます。
import cProfile
with cProfile.Profile() as pr:
# Your code
pr.print_stats()
ワーカーのプロファイリング
エグゼキューターはこのクラスターのワーカーノードに分散され、我々はプロファイルを集約する必要があるため複雑性が増します。さらに、Pythonワーカープロセスは、PySpark UDF実行においてはエグゼキューターごとにスポーンされるので、プロファイリングがさらに難解なものになります。
Spark 3.3で導入されるUDFプロファイラーは、これらのすべての障害を乗り越え、PySparkアプリケーションのワーカーのプロファイリングを行うためのメジャーなツールとなります。シンプルなPandas UDFサンプルを用いてUDFプロファイラーの使い方を説明します。
最初に、以下の様に8000行を持つPySparkデータフレームを生成します。
sdf = spark.range(0, 8 * 1000).withColumn(
'id', (col('id') % 8).cast('integer') # 1000 rows x 8 groups (if group by 'id')
).withColumn('v', rand())
この後で、グループあたり1000行となる8つのグループになるように、idカラムを用いてグルーピングを行います。
以下の様にPandas UDF plus_one
を作成して適用します。
import pandas as pd
def plus_one(pdf: pd.DataFrame) -> pd.DataFrame:
return pdf.apply(lambda x: x + 1, axis=1)
res = sdf.groupby("id").applyInPandas(plus_one, schema=sdf.schema)
res.collect()
plus_one
はpandasデータフレームを受け取り、別のpandasデータフレームを返却することに注意してください。それぞれのグループに対して、すべてのカラムがpandasデータフレームとしてUDF plus_one
に引き渡され、返却されたpandasデータフレームはPySparkデータフレームに結合されます。
上のサンプルを実行し、sc.show_profiles()
を実行すると以下のプロファイルが表示されます。sc.dump_profiles(path)
を用いることで、以下のプロファイルをディスクにダンプすることもできます。
プロファイルのUDF ID(上でハイライトされた271)は、res
のSparkの実行計画のものとマッチします。
プロファイルの本体の最初の行は、モニタリングされた呼び出しの総数を示しています。カラムのヘッダーには以下が含まれます。
-
ncalls
: 呼び出し回数 -
tottime
: 特定の関数に費やされた合計時間(サブ関数の呼び出しに要した時間を除く) -
percall
:tottime
をncalls
で割った割合 -
cumtime
: この関数とサブ関数で費やされた累積時間(呼び出しからexitまで) -
percall
:cumtime
をプリミティブの呼び出しで割った割合 -
filename:lineno(function)
: それぞれの関数ごとの情報
カラムの詳細を見ていきましょう。plus_one
はグループごとに1回呼び出されており、合計8回となっています。pandasのSeriesの_arith_method
が行ごとに1回呼び出されており、合計8000回となります。pandas.DataFrame.apply
が関数lambda x: x + 1
を行ごとに適用しており、呼び出しの高いオーバーヘッドに苦しんでいます。
pandas.DataFrame.apply
をpandasでベクトル化されるpdf + 1
で置き換えることで、このようなオーバーヘッドを削減することができます。最適化されたPandas UDFは以下の様になります。
import pandas as pd
def plus_one_optimized(pdf: pd.DataFrame) -> pd.DataFrame:
return pdf + 1
res = sdf.groupby("id").applyInPandas(plus_one_optimized, schema=sdf.schema)
res.collect()
この最適化を以下の様にまとめることができます。
- 計算オペレーションを8,000コールから8コールに
- トータルの関数呼び出し回数を2,898,160コールから2,384コールに
- 合計実行時間を2.300秒から0.004秒に
上述した短いサンプルでは、処理を深く理解し、パフォーマンスのボトルネックを特定し、ユーザー定義関数の全体的なパフォーマンスを強化する際に、UDFプロファイラーがどのように役立つのかをデモンストレーションしました。
UDFプロファイラーは、エグゼキューターサイドのプロファイラーをベースにして実装されており、PySpark RDD API向けに設計されています。エグゼキューターサイドのプロファイラーは、すべてのアクティブなDatabrikcsランタイムバージョンで利用することができます。
UDFプロファイラーとエグゼキューターサイドプロファイラーの両方は、Pythonワーカーで稼働します。これらはSpark設定spark.python.profile
で制御することができ、これはデフォルトではfalse
になっています。以下の様に、DatabricksランタイムクラスターのSpark設定で有効化することができます。
まとめ
PySparkプロファイラーは、cProfileをベースとして実装されています。このため、プロファイルのレポーティングはStatsクラスに依存しています。また、Pythonワーカーからのプロファイルレポートを収集する際には、Spark Accumulatorsも重要な役割を担っています。
ホットなループを特定し、潜在的な改善点を提案するためにPySparkによってパワフルなプロファイラーが提供されます。これらは使いやすく、PySparkプログラムのパフォーマンスを強化するためには重要なものとなります。Databricksランタイム11.0(Spark 3.3)から利用できるUDFプロファイラーは、すべての技術的課題を克服し、ユーザー定義関数に対する洞察をもたらします。
さらに、現在Apache Spark™のオープンソースコミュニティでは、エグゼキューターのメモリープロファイリングを導入するための努力が続けられています。詳細はSPARK-40281をご覧ください。