はじめに
本記事では、PythonやScalaで実装されたETL処理において、Photonエンジンを効果的に活用する方法について、筆者が実施したHadoopからDatabricksへの移行検証PoCを基に得られた知見をご紹介します。
Databricksでは、Photonエンジンを利用することで容易にパフォーマンスを向上させることが可能です。ただし、PythonやScalaでの実装にはいくつかの注意点が伴います。本記事では、PythonやScalaで構築されたバッチ処理パイプラインにおいて、Photonエンジンを最大限に活用するためのコード最適化手法に焦点を当て、解説していきます。
Photonは、Databricksのネイティブベクトル化クエリエンジンです。SQLワークロードやDataFrame API呼び出しを高速化し、ワークロードごとの総コストを削減します。既存のコードを変更することなく使用でき、データの入出力、ETL、ストリーミング、データサイエンス、インタラクティブクエリなど、さまざまなユースケースに対応しています。
チューニング対象のバッチ処理の概要
PoCで移行対象となったバッチ処理は、Hadoop環境で稼働していたScalaのDataset APIを用いて実装されていました。
処理対象データ量は多いものの、入力ファイルを加工し、頻繁にアクセスされるデータをメモリにキャッシュした後、後続の処理で必要なデータセットをファイルに出力するというシンプルな処理内容です。
1.インプットファイルからRead
Parquetファイル(マスター、ディテール)を読み込む。
2.データの結合
マスターとディテールを、特定のIDを基に結合する。
3.データ加工(UDF:UserDefinedFunctionで実装)
UDFを使用して、特定の条件に合致するデータを計算し、新しいカラムとして追加する。
4.加工データをSparkメモリにキャッシング
最終的なデータをSparkのメモリにキャッシュし、複数回のアクセスに備える。
5.アウトプットファイルにWrite
キャッシュされたデータにフィルタ条件を適用し、複数回アクセスして、最終的にParquetファイルとして出力する。
コード最適化手法
上記のようなETLバッチ処理をDatabricksに移行し、Photonエンジンを有効利用するための代表的なコード最適化手法は以下のようになります。
方法1:DataFrameの使用
PhotonエンジンはDataFrame APIをサポートしていますが、Dataset APIはサポートしていないため、Datasetを使用する場合にはPhotonの最適化を利用することはできません。そのためDatabricksでは、DataFrameの利用が一般的に推奨されます。
方法2:ベクトル化(Pandas)UDFの使用
Python UDFとベクトル化(Pandas)UDFには重要な違いがあり、Python UDFは1行ずつデータを処理するのに対し、ベクトル化UDFは複数行をバッチで処理します。
通常のPython UDFの処理は、PythonとJVM間でのデータのシリアライゼーションとデータ転送が必要であり、特に大量データの処理では、このオーバーヘッドが顕著となりますが、ベクトル化(Pandas)UDFではApache ArrowによるPythonとJVMの間のデータ転送量を大幅に削減することで高速化可能です。
# pandas_udfデコレータを使用し、関数の引数としてPandasのSeriesを受け取り、PandasのSeriesを返す
@pandas_udf(DoubleType())
def adjust_amount1(sales_date: pd.Series, amount: pd.Series) -> pd.Series:
sales_date = pd.to_datetime(sales_date, format='%m-%d-%Y %H:%M:%S')
return amount * sales_date.apply(lambda x: 1.2 if x.weekday() == 4 else 1.1)
UDFではPython UDFとベクトル化(Pandas)UDF問わず、Photonエンジンは利用できません。
方法3:PySparkネイティブ関数の使用
Sparkで最適化済みの組み込み関数は、Photonエンジンを利用可能です。
また、Sparkネイティブ関数は各エグゼキューターのJVM上で直接実行されるため、PythonとJVM間でのデータのシリアライゼーションとデータ転送の必要はありません。
方法4:SparkSQL(SQL関数)の使用
SparkSQL関数はPhotonエンジンを利用できます。
Photonは、Apache Spark APIと互換性があるように設計されており、SparkSQLコードを変更せずに使用でき、特にDeltaテーブルとParquetテーブルに対する操作で高いパフォーマンスを発揮します。
方法5:Deltaテーブルの使用
一時データの保存ではPhotonエンジンを利用できないメモリキャッシュの代わりにDeltaテーブルを使用します。
Deltaテーブルは圧縮Parquet形式で保存することでメモリ上のデータと比較して約10倍の圧縮率に加え、データスキッピングなどの自動最適化技術により、頻繁にアクセスされるデータの保持にSparkメモリキャッシュの代わりにDeltaテーブルに保存することでクエリパフォーマンスを向上させます。
メモリキャッシュへのデータ保存速度は、Deltaテーブルとしてストレージに書き込む場合より高速ですが、その後のデータアクセスにおいては、Photonエンジンが利用可能なDeltaテーブルの方が数倍速いことが多く見られます。
PoCで検証した実装方式
上記のコード最適化手法を用いて、コードの最適化とPhotonエンジンによるパフォーマンス向上を検証するため、以下の7種類のパターンで実装を検証しました。
方式 | 実装方法 | |
---|---|---|
1 | Scala Dataset+UDF+メモリキャッシュ | 移行対象のHadoop環境と同じくScalaのDatasetとUDFで実装しコード最適化は実施していない |
2 | Scala DataFrame+UDF+メモリキャッシュ | ScalaのDataFrameを使用したUDFを実装 ※方法1:Dataframeの使用 |
3 | Python+(PySpark)UDF+メモリキャッシュ | PythonのDataFrameを使用したUDFによる実装 ※方法1:DataFrameの使用 |
4 | Python+ベクトル化UDF+メモリキャッシュ | Apache Arrowを活用するベクトル化UDFによる実装 ※方法1:DataFrameの使用 ※方法2:ベクトル化(Pandas)UDFの使用 |
5 | Scala Dataset+UDF+Deltaテーブル | 最小コード修正でパフォーマンス最適化(変更点はDeltaテーブルを使用するのみ) ※方法5:Deltaテーブルの使用 |
6 | PySparkネイティブ関数+Deltaテーブル | UDFを使用せず全て組み込みのPySpark関数で実装 ※方法1:DataFrameの使用 ※方法3:PySparkネイティブ関数の使用 ※方法5:Deltaテーブルの使用 |
7 | SparkSQL+Deltaテーブル | UDFを使用せず全てSparkSQL関数で実装 ※方法1:DataFrameの使用 ※方法4:SparkSQL(SQL関数)の使用 ※方法5:Deltaテーブルの使用 |
処理時間
各ケースについて、Photonエンジンを利用しない場合と利用する場合の処理時間を計測した結果は以下の通りです。適切なコード最適化を施した上でPhotonエンジンを有効にすることで、期待通りパフォーマンスが向上するケースがある一方で、処理が大幅に遅くなるケースも確認されました。ただし、さらに多くのコード最適化手法を適用した場合、Photonエンジンによりほぼ期待通りのパフォーマンスを得ることができました。
実装の方式 | Photon オフ |
Photon オン |
パフォーマンス | |
---|---|---|---|---|
1 | Scala Dataset+UDF+メモリキャッシュ | 3446秒 | 4183秒 | 約1.21倍劣化 |
2 | Scala DataFrame+UDF+メモリキャッシュ | 3297秒 | 3992秒 | 約1.21倍劣化 |
3 | Python+(PySpark)UDF+メモリキャッシュ | 4149秒 | 4824秒 | 約1.16倍劣化 |
4 | Python+ベクトル化UDF+メモリキャッシュ | 3235秒 | 4532秒 | 約1.40倍劣化 |
5 | Scala Dataset+UDF+Deltaテーブル | 1548秒 | 939秒 | 約1.65倍向上 |
6 | PySparkネイティブ関数+Deltaテーブル | 1041秒 | 318秒 | 約3.27倍向上 |
7 | SparkSQL+Deltaテーブル | 1058秒 | 358秒 | 約2.95倍向上 |
方式1〜4では、Photonエンジンを有効にすることで逆にパフォーマンスが低下する結果となりましたが、方式5〜7ではパフォーマンスの向上が確認されました。
コード最適化前(方式1:3448秒)とコード最適化後のベストケース(方式6:318秒)では約10.84倍のパフォーマンス向上を実現できました。
ボトルネックの考察
Photonエンジンを有効にした結果、逆にパフォーマンスが低下した方式1〜4のケースでは、共通のボトルネックとして、PhotonエンジンからSparkエンジンへの切り替えが頻繁に発生していることが確認されました。
PhotonエンジンからSparkエンジンへの切り替えのオーバーヘッド
PhotonエンジンからSparkエンジンへの切り替えはジョブ単位で発生します。具体的には、クエリの実行プランにPhotonエンジンがサポートしていない処理が含まれている場合、その部分はSparkエンジンで実行されます。
この切り替えは、Spark UIのクエリプランを確認することで把握できます。PhotonエンジンからSparkエンジンに切り替わる際にColumnarToRowオペレーターが実行される理由は、現在のPhotonエンジンがカラム形式のデータのみを出力可能であるためです。その結果、SparkエンジンのColumnarToRowオペレーターを挿入する必要が生じます。
さらに、次のジョブでPhotonエンジンが実行可能な場合には、再びSparkエンジンからPhotonエンジンへの切り替えが発生します。この際、PhotonRowToColumnarオペレーターを使用してデータをカラム形式に変換し、Photonエンジンで処理できる状態にします。ただし、これらのデータ形式の変換には一定のオーバーヘッドが伴います。
また、以下のようにSpark UIのクエリプランの詳細 (Details) に出力される情報から、Photonエンジンの利用を妨げる処理を特定することができます。
これらの情報を基にコードを見直し、特定の処理を調整することで、ETLプロセス中のエンジン切り替えを最小化し、全体の処理速度と効率を改善できます。
PhotonエンジンとSparkエンジンの切り替えが頻繁に発生すると、Photonエンジンによる高速化効果を上回るオーバーヘッドが生じます。
処理エンジン間の切り替えオーバーヘッドの対応方法
処理エンジン間の切り替えによるオーバーヘッドを解消し、パフォーマンスを最適化するためには、PhotonエンジンとSparkエンジン間でデータ変換が発生しないようにすることが重要です。そのため、可能な限りPySparkネイティブ関数やSparkSQLを使用した実装が推奨されます。
しかしながら、実際のETLのビジネスロジックではUDFの利用が不可避なケースもあります。UDFが必要なケースではPhotonエンジンが利用できないため、以下の対応を検討してください。
処理ロジック実装方式の選定
大規模データセットにおいて複雑な処理ロジックのためにUDFの使用が必要となる場合は、ベクトル化(Pandas)UDFやScala UDFでの実装を検討してください。
Photonエンジンが利用できるPySparkネイティブ関数やSparkSQLほどには処理速度は劣りますが、Sparkエンジンでも高速に動作します。
以下の図は、UDFの種類と、SparkSQLおよびPySparkネイティブ関数を使用した場合のパフォーマンスの違いを示しています。
処理コードの分割と適切なクラスタの割り当て
コスト効率を考慮し、Photonエンジンに適した処理とSparkエンジンで実行すべき処理を分離し、必要に応じてそれぞれに専用のクラスタを割り当てることも検討してください。
Photon有効クラスタの利用を効率化することで、コストの最適化も実現可能です。
また、処理単位でクラスタを切り替える際には、処理の前後で対象データをDeltaテーブルに保存することをお勧めします。
Deltaテーブルによりデータの永続性と一貫性を保証し、Spotインスタンス利用時などのインスタンス停止タイミングでの影響も受けにくくなります。
Photonエンジンに適した処理とSparkエンジンで実行すべき処理を分離し、それぞれに専用のクラスタ(前者はPhoton有効化、後者はPhotonオフ)を割り当てるというDatabricksワークフローの設計を検討してください。
さいごに
適切なコード最適化を行うことで、PythonやScalaなどで記述されたETLパイプラインにおいて、Photonエンジンを効果的に活用することが可能になります。皆様の環境でも、コード修正のコストを最小限に抑えながら、パフォーマンスを最大化するための参考になれば幸いです。