Memory Profiling in PySpark - The Databricks Blogの翻訳です。
本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
PySparkプログラムのパフォーマンスには多数の要素が存在しています。PySparkでは、あなたのプログラム密なループを明らかにする様々なプロファイリングツールをサポートしており、パフォーマンス改善に関する意思決定を行うことが可能となります。詳細はこちらをご覧ください。しかし、プログラムのパフォーマンスのキーとなる要素の一つであるメモリーは長らくPySparkのプロファイリングではサポートされていませんでした。通常のPythonプロセスとしてメモリープロファイラーを用いて、Sparkドライバー上のPySparkプログラムをプロファイリングすることは可能でしたが、Sparkエグゼキューターのメモリーをプロファイルする容易な方法は存在していませんでした。
最も人気のあるPython APIの一つであるPySpark UDFは、SparkエグゼキューターによってスポーンされたPythonワーカーサブプロセスによって実行されます。これによってユーザーはApache Spark™エンジンの上でカスタムコードを実行できるのでパワフルなものとなっています。しかし、メモリー消費を理解することなしにUDFを最適化することは困難です。PySpark UDFの最適化を支援し、アウトオブメモリーエラーの可能性を削減するために、PySparkメモリープロファイラーは、合計メモリー使用量の情報を提供します。UDFのコードのどの行がもっともメモリーを使用しているのかを特定します。
エグゼキューターにおけるメモリープロファイリングの実装は困難なものでした。エグゼキューターはクラスター上に分散されているので、結果のメモリープロファイルをそれぞれのエグゼキューターから収集し、合計のメモリー消費を表示するために適切に集計する必要があります。また、デバッグと修正のために、メモリー消費とそれぞれのソースコード行を提供しなくてはなりません。Databricks Runtime 12.0において、PySparkはこれらすべての技術的課題を克服し、エグゼキューターでメモリープロファイリングが可能となりました。この記事では、ユーザー定義関数(UDF)の概要を説明し、UDFにどのようにメモリープロファイラーを使うのかをデモします。
ユーザー定義関数(UDF)の概要
PySparkでサポートされているUDFには大きく2つのカテゴリーが存在します: Python UDFとPandas UDFです。
- Python UDFは、Pickleによってシリアライズ/デシリアライズされるPythonオブジェクトを受け取り/返却するユーザー定義スカラー関数であり、一度に一行を処理します。
- Pandas UDF(ベクトライズドUDF)は、Apache Arrowによってシリアライズ/デシリアライズされるシリーズ、データフレームを受け取り/返却するUDFであり、ブロックごとに処理します。Pandas UDFは用途や入出力のタイプに応じてカテゴリ分けされるバリエーションが存在します:
Series to Series
,Series to Scalar
,Iterator to Iterator
です。
Pandas UDF実装をベースとしたPandas Functions APIが存在します: Map (mapInPandas
など)、(Co)Grouped Map (applyInPandasなど)、そして、Arrow Function APIのmapInArrow
もあります。関数が入出力のイテレーターを受け取らない限り、上述のすべてのUDFタイプにメモリープロファイラーは適用されます。
メモリープロファイリングの有効化
クラスターでメモリープロファイリングを有効化するには、以下の様にMemory Profilerライブラリをインストールし、Spark設定spark.python.profile.memory
をtrue
に設定する必要があります。
すると、UDFのメモリーをプロファイルできる様になります。GroupedData.applyInPandasを用いて、メモリープロファイラーを説明します。
最初に、以下のように4,000,000行のPySparkデータフレームを生成します。その後で、グループごとに1,000,000行の4グループが生成される様にid列でグルーピングします。
sdf = spark.range(0, 4 * 1000000).withColumn(
'id', (col('id') % 4).cast('integer')
).withColumn('v', rand())
以下の様に、関数arith_op
を定義してsdf
に適用します。
def arith_op(pdf: pd.DataFrame) -> pd.DataFrame:
new_v = []
for x in pdf.v:
new_v.append(x * 10 + 1)
pdf.v = pd.Series(new_v)
return pdf
res = sdf.groupby("id").applyInPandas(arith_op, schema=sdf.schema)
res.collect()
上のコードを実行し、sc.show_profiles()
を実行すると以下のプロファイル結果が表示されます。sc.dump_profiles(path)
を用いて、プロファイル結果をディスクにダンプすることもできます。
上のプロファイル結果のUDFのIDである245は、res.explain()
を呼び出すことで表示されるSparkの実行計画のrest
と一致します。
== Physical Plan ==
...
FlatMapGroupsInPandas [...], arith_op(...)#245, [...]
sc.show_profiles()
のプロファイル結果の本文には、以下のカラムが含まれます。
-
Line #
プロファイリングされたコードの行番号 -
Mem usage
行を実行した後のPythonインタプリタのメモリー使用量 -
Increment
前回の行と現在の行のメモリーの差 -
Occurrences
行が実行された回数 -
Line Contents
プロファイリングされたコード
プロファイル結果からLine 3 ("for x in pdf.v")
が~125 MiB
と最もメモリーを消費していることがわかります。そして、この関数のメモリー消費の総量は~185 MiB
となっています。
以下の様にpdf.v
のイテレーションを削除することで、関数のメモリー効率を改善することができます。
def optimized_arith_op(pdf: pd.DataFrame) -> pd.DataFrame:
pdf.v = pdf.v * 10 + 1
return pdf
res = sdf.groupby("id").applyInPandas(optimized_arith_op, schema=sdf.schema)
res.collect()
optimized_arith_op
のメモリー消費の合計は~61 MiB
に削減され、半分程度になりました。
上のサンプルでは、UDFのメモリー消費を深く理解し、メモリーのボトルネックを特定し、関数のメモリー効率を改善するために、どのようにメモリープロファイラーが役立つのかをデモしました。
まとめ
PySparkのメモリープロファイラーはMemory Profilerをベースとして実装されています。また、Pythonワーカーからプロファイル結果を収集する際に、Spark Accumulatorsが重要な役割を担っています。メモリープロファイラーはUDFの合計メモリー使用量を計算し、コードのどの行がもっともメモリーを消費するのかを特定します。Databricks Runtime 12.0以降で簡単に利用することができます。
さらに、我々はPySparkのメモリープロファイラーをApache Spark™コミュニティにオープンソース化しました。Spark 3.4以降でこのメモリープロファイラーを使用することができます。詳細については、SPARK-40281をご覧ください。