目的
2020年の6月にリリースされたApache Spark 3.0から正式にGPU対応となり、MLタスクを中心に活用が進んできている。しかし、学習や推論以外ではGPUがフルに使われることはなかなかない。そこで、せっかくGPUがあるならML以外のタスクをにも活用し、GPUの恩恵を最大限に受けられないかと考えた。
今回は、Azure Databricksと小売業のIDPOSの実データを使って、よく行うデータ操作とETLのバッチクエリをGPU環境で実行し、その効果を検証してみた。
まだまだ理解が足りていないところがあり、間違いや不明点などがあれば、ぜひ皆様のお知恵をお借りしたく、ご指摘お待ちしております。
RAPIDS Accelerator For Apache Sparkとは
NVIDIAが提供しているライブラリであり、GPU DataFramesのcuDFをはじめとしたGPUアクセラレーションがSpark上で実行可能となる。このライブラリをSpark起動時に読み込むことで、既存のSQLやDataframe処理がGPUに移されるので、クエリを書き換えることなく、GPUのメリットを享受できる優れものである。
環境
Azure DatabricksのCompute画面からクラスターを作成し、インスタンスは以下の通り設定。
- Runtime:8.2 ML GPU (Spark 3.1.1)
- Driver: Standard_NC6s_v3 (6 Core, 112GB Memory 1 GPU V100)
- Worker: Standard_NC6s_v3 (6 Core, 112GB Memory 1 GPU V100) x 2
WARN:
RumtimeにML GPUをセットすると、DriverにはCPUインスタンスをセットできない。
DriverのGPUが少しもったいない気もするが、Azure Databricksではこの制約がある。
RAPIDS設定方法
公式設定ガイドからほぼその通りに設定。大まかな流れとしては、spark-rapidsとcudfのjarをダウンロードしてくるInit ScriptをNotebookから生成し、そのスクリプトをAdvanced OptionsのInit Scriptsに設定する。そうすると、Spark起動時に実行されて上記2つのjarが読み込まれる。
公式設定ガイドでも触れられているが、実際にやってみて詰まった点を以下にまとめた。
GPUクラスター使用時の注意事項
WARN:
デフォルト設定ではタスクが並列実行されない。
RAPIDSとは関係なく、Sparkの設定としてここが最初に理解できてなかった。デフォルトでなにも設定しないと、spark.task.resource.gpu.amountは1にセットされいる。したがって、一つのExecuterにCPUコアが複数あっても、この設定が優先されるため、複数タスクを並列に実行できず、データの読み込み時にとてつもない時間がかかってしまう。
一方、spark.task.resource.gpu.amountには1以下の数値もセットできるため、今回の場合は、コア数が6なので0.166 x 6 = 0.996となるように、0.166をセットした。公式設定ガイドにも記載があるので、こちらも参考に。
spark.task.resource.gpu.amount 0.166
ただし、一般的なMLタスクではGPUをフルに使い切るために、spark.task.resource.gpu.amountを1(デフォルト)で運用する場合も多いと思う。しかしながら、spark.task.resource.gpu.amountはクラスター起動後には変更できないため、この設定を有効にするためにSpark Clusterの再起動が必要になってしまう点はとても残念だ。
WARN:
GPUインスタンスは、圧倒的にCPUのコア数が少ないため、データ読み込みが遅い。
これはDatabricksやRAPIDSとは関係なく、AzureやAWSで用意されているGPUインスタンスに問題がある。ストレージからデータを取得する際に並列読み込みを行うが、ここではCPUのコア数がそのまま並列数になる。つまり、CPUコア数が少なければデータ読み込みの並列度が下がるため、データの読み込みに大きな時間がかかってしまう。
今回使用したStandard_NC6s_v3 (6 Core, 112GB Memory 1 GPU V100)はCPUが6コアしかないため、同じような価格帯のCPUインスタンスと比べるとCPUコア数が極端に少なく、ここがボトルネックとなってしまいクエリ全体でのパフォーマンスがでない。同じ価格でよりコア数の多いCPUインスタンスを使った方が、大きなテーブルの読み込みは高速である。例えば、Spot価格が近いD32s v3インスタンスはCPUが32コアあるので、NC6s_v3と比べて5倍以上の並列実行が可能だ。
DatabricksでRAPIDSを動かすときの制限事項
WARN:
Databricks上でのAdaptive Query Execution(AQE)はサポートされていない。
Apache Sparkの標準的なAQEはRAPIDSと併用できるが、Databricksのクエリオプティマイザはカスタマイズされており、RAPIDSとは併用できないため、Spark Configで無効にする必要がある。またdeltaのoptimizeWriteも同様である。
spark.sql.adaptive.enabled false
spark.databricks.delta.optimizeWrite.enabled false
WARN:
parquetの圧縮アルゴリズムにlz4を使用すると、読み込み時にNULLになってしまう。
parquetはsnappy以外の圧縮アルゴリズムを使用可能で、RAPIDSでもサポートされているlz4を使用してみたが、RAPIDSで読み込むとすべてのカラムがNULLで読み込まれてしまう。同じlz4で圧縮されたparquetファイルをRAPIDSなしで読み込んだ場合は正常に表示されるので、おそらくRAPIDSのアクセラレータ側での問題だと考えられる。
Databricks環境におけるSpark Config設定例
上記の制限事項を考慮したうえで、以下の通りのSpark Configとなった。公式ガイドを参考に、大き目なparquetファイルを読みやすいようmaxPartitionBytesを512MBに設定。
spark.plugins com.nvidia.spark.SQLPlugin
spark.task.resource.gpu.amount 0.166
spark.rapids.memory.pinnedPool.size 2g
spark.locality.wait 0
spark.databricks.delta.optimizeWrite.enabled false
spark.sql.adaptive.enabled false
spark.sql.files.maxPartitionBytes 536870912
spark.rapids.sql.concurrentGpuTasks 2
テスト方法
- Databricksでも%%timeitのマジックコマンドが使えるので、これで計測を行った。デフォルトでは7回実施にセットされている。
- データはIDPOSの実データをサンプリングして、40店舗1年分のデータセットを生成。ファイルフォーマットはParquet、圧縮アルゴリズムはデフォルトのsnappyを使用し、約4億7千万行で約17GB(展開時は約100GB)。
- 全く同じSQLクエリを実行。特にチューニングなどは実施せず、ブロードキャストヒントなども使用しない。
- 同じ構成のGPUインスタンスで、CPUを利用しAQEを効かせるデフォルトの設定と、**GPUを利用するRAPIDS(ただしAQEは無効)**とを比較し、どちらがGPUインスタンス利用時の設定としてよいパフォーマンスを出せるかを検証した。
WARN:
CPUインスタンスとGPUインスタンスで比較を行ったわけではない。
上記の注意事項でも述べているが、同じ価格帯のCPUインスタンスで確保できるコア数が大きく異なるため、データ読み込み時の並列化がパフォーマンスに大きく影響し比較にならない。今回は、すでに確保済みのGPUインスタンスをいかに活用できるかの視点から、GPUインスタンスにおいてCPUコア数が少ないという弱点は前提条件とみなした。
EXPLAINで実行プランを確認
単純なユニークな顧客IDの件数を取得するクエリをEXPLAINして、実行プランを表示させると、同じクエリでも、RAPIDS利用時にはGPUで計算されるように、実行プランが変更されていることが確認できる。
%sql
EXPLAIN SELECT COUNT(DISTINCT `顧客ID`) FROM all_data
CPU利用時(RAPIDSなし、AQEあり)
== Physical Plan == AdaptiveSparkPlan isFinalPlan=false +- == Current Plan == HashAggregate(keys=[], functions=[finalmerge_count(distinct merge count#439L) AS count(顧客ID#42)#436L]) +- Exchange SinglePartition, true, [id=#146] +- HashAggregate(keys=[], functions=[partial_count(distinct 顧客ID#42) AS count#439L]) +- HashAggregate(keys=[顧客ID#42], functions=[]) +- Exchange hashpartitioning(顧客ID#42, 200), true, [id=#142] +- HashAggregate(keys=[顧客ID#42], f...
GPU利用時(RAPIDSあり、AQEなし)
== Physical Plan == GpuColumnarToRow false +- GpuHashAggregate(keys=[], functions=[gpucount(distinct 顧客ID#42)]), filters=List(None)) +- GpuShuffleCoalesce 2147483647 +- GpuColumnarExchange gpusinglepartitioning$(), false, [id=#5297] +- GpuHashAggregate(keys=[], functions=[partial_gpucount(distinct 顧客ID#42)]), filters=List(None)) +- GpuHashAggregate(keys=[顧客ID#42], functions=[]), filters=List()) +- GpuShuffleCoalesce 2147483647 +- Gpu...
テスト結果
単純なDISTINCTクエリ
一意な顧客IDの数を集計する単純なクエリについてテストを行った。特にWHEREなどで条件をつけず、JOINなどもない。結果は以下の通り。
RAPIDS | AQE | Result |
---|---|---|
False | False | 53.5 s ± 1.19 s per loop (mean ± std. dev. of 7 runs, 1 loop each) |
False | True | 53.4 s ± 1.39 s per loop (mean ± std. dev. of 7 runs, 1 loop each) |
True | False | 43.5 s ± 799 ms per loop (mean ± std. dev. of 7 runs, 1 loop each) |
AQEと比較して、RAPIDS利用で約2割速くなった!このクエリでは、単純にデータをロードしている時間が一番かかっていて、集計自体は単純なため、CPU利用ではAQEの有無にかかわらず実行時間に差がないことがわかる。つまりデータの読み込み時に圧縮されたParquetの展開が行われるが、この時にGPUが利用されることで、処理時間の短縮につながっていることがわかる。
少し複雑なリフト値
併売を見るためによく使われる統計値である、リフト値を算出するクエリについてテストを行った。ビールの購入者と全体の購入者を比較して、それぞれのカテゴリー単位における購入率を算出して比較を行う。データは3回読み込まれ、顧客IDごとにカテゴリー単位での集計と、JOINも複数回発生する。Window関数やArrayなどは使用していないが、少し複雑なクエリだ。結果は以下の通り。
RAPIDS | AQE | Result |
---|---|---|
False | False | 2min 34s ± 3.18 s per loop (mean ± std. dev. of 7 runs, 1 loop each) |
False | True | 2min 25s ± 1.32 s per loop (mean ± std. dev. of 7 runs, 1 loop each) |
True | False | 2min 14s ± 2.26 s per loop (mean ± std. dev. of 7 runs, 1 loop each) |
CPU利用時のAQEでも処理時間の短縮に成功しているが、AQEが無効のRAPIDSでも処理の短縮に成功しており、GPUインスタンスでは、AQEを無効にしてもGPUを利用したRAPIDS使った方が、パフォーマンスは有利のようだ。
ETLのバッチクエリ
GZ圧縮されたCSVファイルを読み込み、データ型の変換や商品マスタをJOINするなどの加工を行って、Parquetファイルとして書き出してテーブルに挿入する、典型的なETLのバッチ処理をテストした。一つの処理でそこそこの時間がかかってしまうため、今回は%%timeitを使用せず、手動で3回実行し、セルに表示されるCommand tookの時間を参照した。結果は以下の通り。
RAPIDS | AQE | Result |
---|---|---|
False | False | 9.52 min / 9.22 min / 9.38 min |
False | True | 9.07 min / 9.26 min / 9.37 min |
True | False | 4.11 min / 4.48 min / 4.1 min |
驚くべきことに、GPUを利用したRAPIDSのパフォーマンスは非常によく、2倍以上高速のようだ。ETLのようなバッチジョブでは、読み込まれるデータと書き出されるデータがほぼ同じ件数になることが多く、基本全件に対して処理が行われるので、Stageごとにデータの偏りが発生する状況が生まれにくい。このため、AQEによるベネフィットを受けにくく、かつ書き出し時には大きな圧縮タスクが発生するため、GPUにとって有利な条件であることが想像できる。実際SparkのJobをリアルタイムで見ていると、GPUのタスクはほぼ一瞬で終わっており、ひたすらCPUでのデータ読み込みを待ち続けていた。
DBUとSpot価格で似たような構成になるD32s v3のCPUインスタンスを使い、同じ条件でテストを行ったところ、以下の通りの結果となった。
Instance | CPU Core | Result |
---|---|---|
D32s v3 (CPU) | 32 x 2 = 64 | 10.09 min / 10.77 min / 10.72 min |
NC6s v3 (GPU RAPIDS) | 6 x 2 = 12 | 4.11 min / 4.48 min / 4.10 min |
やはり、CPUコア数が増えても、並列化の恩恵を受けにくいファイルの書き出しや圧縮タスクについては、GPUが圧倒的に有利であり、コストパフォーマンスでもCPUを上回る可能性が十分にありそうだ。
結論
今回得られた知見
GPUはETLなどIOコストが高く、特に圧縮や展開のタスクに向いている。
ファイルの圧縮と展開は、Sparkの中でもボトルネックの一つであり、これがGPUの利用によって大きく解決されることは、ML以外のタスクにおけるGPUの活用方法として、十分期待できる内容だと思う。
留意事項
RAPIDS使用時には、Delta CacheなどのDeltaがもつ最適化機能は併用できなさそうなので、同じデータに繰り返しアクセスするようなパターンでは、Delta Cacheを有効にすればパフォーマンスは逆転する可能性も高い。今回は、検証を行わなかったが、事前のパフォーマンステストも必要だと感じている。また、Delta CacheはParquetフォーマットのみ有効なので、CSVやORCなど他のフォーマットを利用している場合は注意が必要だ。
GPUはSpot Instanceの活用を
CPUと比べるとコストはまだまだ割高だが、Spot Instanceではそこまで大きく変わらない。特にSpark環境では、Instanceが中断されても、他のInstanceに引き継がれてタスク自体は続行されることが多く、運用上の問題になるリスクは少ない方である。また、設定もSpot Instanceのチェックボックスをオンにするだけなので簡単だ。一方、Spot Instanceのクオータは別に管理されているので、利用時は申請を忘れずに。
今後のT4インスタンスの正式サポートで大きく変わる可能性
Azure Databricksでは、T4インスタンスがまだBetaのため今回のテストからは除いたが、これが正式対応となれば、コストパフォーマンスは激変する可能性がある。NC16as T4 v3ではCPUコア数も16あり、かつ価格も格段に安い。また、DBUも2.5と低めにセットされているので、ETL処理では、GPU利用の方がCPUよりもコストパフォーマンスで上回るのではと予想される。
Azureにおける2021年7月時点でのEast Japanの料金表から抜粋。
VM | Core | Memory | Disk | GPU | 従量課金 | 1年予約 | 3年予約 | Spot |
---|---|---|---|---|---|---|---|---|
NC6s v3 | 6 | 112 GiB | 736 GiB | 1X V100 | $4.194/時間 | $2.6716/時間 (~ 36% の% 割引) | $1.5887/時間 (~ 62% の% 割引) | $0.6090/時間 (~ 85% の% 割引) |
D32s v3 | 32 | 128 GiB | 256 GiB | なし | $2.064/時間 | $1.3014/時間 (~ 37% の% 割引) | $0.8874/時間 (~ 57% の% 割引) | $0.6306/時間 (~ 69% の% 割引) |
NC16as T4 v3 (Beta) | 16 | 110 GiB | 360 GiB | 1X T4 | $1.625/時間 | $1.0403/時間 (~ 36% の% 割引) | $0.6177/時間 (~ 62% の% 割引) | $0.2808/時間 (~ 83% の% 割引) |
RAPIDSへの圧縮アルゴリズムの追加
snappyと比べて負荷は大きいが高い圧縮率となるzstdなどがサポートされれば、ストレージからの読み込み時のネットワーク帯域や並列数を減らすことができるので、このあたりについても、NVIDIAやレポジトリにリクエストをあげておこうと思う。