1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

How to Profile PySpark - The Databricks Blogの翻訳です。

本書は抄訳であり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。

Apache Spark™においては、ビッグデータワークロードのために宣言型のPython APIがサポートされています。これらは大部分のユースケースを取り扱えるほどにパワフルなものです。さらに、PySparkのUDF(ユーザー定義関数)を用いることで、ユーザーはApache Spark™エンジンで任意のPythonコードを実行できるので、更なる柔軟性を提供します。ユーザーは「何をするのか」を宣言するだけで良いのです。PySparkはサンドボックスとして「どのようにおこなうのか」をカプセル化します。これによって、PySparkは使いやすいものになっていますが、パフォーマンスのボトルネックを特定し、カスタムの最適化を適用することが困難になることがあります。

上述した困難さに取り組むために、PySparkでは標準的なPythonプロファイラー実装の一つであるcProfileをベースとした様々なプロファイリングツールをサポートしています。PySparkプロファイラーは関数呼び出し回数、特定の関数に費やされた時間の合計、ファイル名、ナビゲーションをサポートするための行番号のような情報を提供します。これらの情報は、PySparkプログラムの緊密なループを露出させ、パフォーマンス改善の意思決定を行うためには重要なものとなります。

ドライバーのプロファイリング

PySparkアプリケーションは、ドライバープログラムのSparkContextオブジェクトによって調整されるクラスターの独立した一連のプロセスとして実行されます。ドライバー側では、PySparkは通常のPythonプロセスとなります。このため、以下に示す様にcProfileを用いて通常のPythonプログラムと同じ様にプロファイリングすることができます。

Python
import cProfile

with cProfile.Profile() as pr:
    # Your code

pr.print_stats()

ワーカーのプロファイリング

エグゼキューターはこのクラスターのワーカーノードに分散され、我々はプロファイルを集約する必要があるため複雑性が増します。さらに、Pythonワーカープロセスは、PySpark UDF実行においてはエグゼキューターごとにスポーンされるので、プロファイリングがさらに難解なものになります。

Spark 3.3で導入されるUDFプロファイラーは、これらのすべての障害を乗り越え、PySparkアプリケーションのワーカーのプロファイリングを行うためのメジャーなツールとなります。シンプルなPandas UDFサンプルを用いてUDFプロファイラーの使い方を説明します。

最初に、以下の様に8000行を持つPySparkデータフレームを生成します。

Python
sdf = spark.range(0, 8 * 1000).withColumn(
  'id', (col('id') % 8).cast('integer') # 1000 rows x 8 groups (if group by 'id')
).withColumn('v', rand())

この後で、グループあたり1000行となる8つのグループになるように、idカラムを用いてグルーピングを行います。

以下の様にPandas UDF plus_oneを作成して適用します。

Python
import pandas as pd

def plus_one(pdf: pd.DataFrame) -> pd.DataFrame:
    return pdf.apply(lambda x: x + 1, axis=1)

res = sdf.groupby("id").applyInPandas(plus_one, schema=sdf.schema)
res.collect()

plus_oneはpandasデータフレームを受け取り、別のpandasデータフレームを返却することに注意してください。それぞれのグループに対して、すべてのカラムがpandasデータフレームとしてUDF plus_oneに引き渡され、返却されたpandasデータフレームはPySparkデータフレームに結合されます。

上のサンプルを実行し、sc.show_profiles()を実行すると以下のプロファイルが表示されます。sc.dump_profiles(path)を用いることで、以下のプロファイルをディスクにダンプすることもできます。

プロファイルのUDF ID(上でハイライトされた271)は、resのSparkの実行計画のものとマッチします。

プロファイルの本体の最初の行は、モニタリングされた呼び出しの総数を示しています。カラムのヘッダーには以下が含まれます。

  • ncalls: 呼び出し回数
  • tottime: 特定の関数に費やされた合計時間(サブ関数の呼び出しに要した時間を除く)
  • percall: tottimencallsで割った割合
  • cumtime: この関数とサブ関数で費やされた累積時間(呼び出しからexitまで)
  • percall: cumtimeをプリミティブの呼び出しで割った割合
  • filename:lineno(function): それぞれの関数ごとの情報

カラムの詳細を見ていきましょう。plus_oneはグループごとに1回呼び出されており、合計8回となっています。pandasのSeriesの_arith_methodが行ごとに1回呼び出されており、合計8000回となります。pandas.DataFrame.applyが関数lambda x: x + 1を行ごとに適用しており、呼び出しの高いオーバーヘッドに苦しんでいます。

pandas.DataFrame.applyをpandasでベクトル化されるpdf + 1で置き換えることで、このようなオーバーヘッドを削減することができます。最適化されたPandas UDFは以下の様になります。

Python
import pandas as pd

def plus_one_optimized(pdf: pd.DataFrame) -> pd.DataFrame:
    return pdf + 1
  
res = sdf.groupby("id").applyInPandas(plus_one_optimized, schema=sdf.schema)
res.collect()

アップデートされたプロファイルは以下の様になります。

この最適化を以下の様にまとめることができます。

  • 計算オペレーションを8,000コールから8コールに
  • トータルの関数呼び出し回数を2,898,160コールから2,384コールに
  • 合計実行時間を2.300秒から0.004秒に

上述した短いサンプルでは、処理を深く理解し、パフォーマンスのボトルネックを特定し、ユーザー定義関数の全体的なパフォーマンスを強化する際に、UDFプロファイラーがどのように役立つのかをデモンストレーションしました。

UDFプロファイラーは、エグゼキューターサイドのプロファイラーをベースにして実装されており、PySpark RDD API向けに設計されています。エグゼキューターサイドのプロファイラーは、すべてのアクティブなDatabrikcsランタイムバージョンで利用することができます。

UDFプロファイラーとエグゼキューターサイドプロファイラーの両方は、Pythonワーカーで稼働します。これらはSpark設定spark.python.profileで制御することができ、これはデフォルトではfalseになっています。以下の様に、DatabricksランタイムクラスターのSpark設定で有効化することができます。

まとめ

PySparkプロファイラーは、cProfileをベースとして実装されています。このため、プロファイルのレポーティングはStatsクラスに依存しています。また、Pythonワーカーからのプロファイルレポートを収集する際には、Spark Accumulatorsも重要な役割を担っています。

ホットなループを特定し、潜在的な改善点を提案するためにPySparkによってパワフルなプロファイラーが提供されます。これらは使いやすく、PySparkプログラムのパフォーマンスを強化するためには重要なものとなります。Databricksランタイム11.0(Spark 3.3)から利用できるUDFプロファイラーは、すべての技術的課題を克服し、ユーザー定義関数に対する洞察をもたらします。

さらに、現在Apache Spark™のオープンソースコミュニティでは、エグゼキューターのメモリープロファイリングを導入するための努力が続けられています。詳細はSPARK-40281をご覧ください。

Databricks 無料トライアル

Databricks 無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?