LoginSignup
1
0

More than 1 year has passed since last update.

Apache Spark 3.0でサポートされるRのI/Oのベクトル化

Last updated at Posted at 2021-05-24

How to Improve R Performance in SparkR at Apache Spark 3.0 - The Databricks Blogの翻訳です。

Rはデータサイエンスにおいて人気のあるコンピュータ言語の一つです。RはRStudioや他のRパッケージのような多くの拡張を含め統計分析、データ処理、機械学習に特化したものとなっています。さらに、データサイエンティストが容易にデータの可視化を行えるようになっています。

Apache SparkTMのSparkRを用いることで、Rのコードは容易にスケールするようになります。インタラクティブにジョブを実行できるように、Rシェルを実行することで容易に処理を分散することができます。

SparkRがRプロセスとやり取りする必要がない場合、性能はScala、Java、Pythonのような他の言語のAPIとほぼ同じになります。しかし、SparkRがネイティブのR関数やデータタイプとやりとりをする際には、大きく性能が劣化します。

SparkとR間のデータI/O性能を改善するために、DatabricksランタイムではSparkRにおけるベクトル化(vectorization)を導入しました。Apache Arrow 0.15.1以降のR APIを用いることで、劇的な性能改善をもたらすベクトル化がApace Spark 3.0で利用可能になることを嬉しく思います。

この記事では、SparkRにおけるSparkとRのやり取りの概要と、現在のネイティブの実装とSparkRのベクトル化実装のベンチマークを説明します。

SparkとRのやり取り

SparkRはMLやSQLのようなAPIをサポートするだけではなく、Rコードと直接やりとりを行う際に用いられる一般的なAPIもサポートしています。例えば、SparkデータフレームとRデータフレームをシームレスに変換できますし、Sparkデータフレームに対してRネイティブの関数を分散実行することも可能です。

多くの場合、性能はSparkにおける他の言語のAPIと同等です。例えば、ユーザーコードがSparkのUDFやSQL APIに依存している場合は、全ての処理はJVMの中で行われ、いかなるI/O性能のペナルティを受けることはありません。以下の例では、いずれも1秒以内に処理を完了します。

Scala

// Scala API
// ~1 second
sql("SELECT id FROM range(2000000000)").filter("id > 10").count()

R

# R API
# ~1 second
count(filter(sql("SELECT * FROM range(2000000000)"), "id > 10"))

しかし、Rのネイティブ関数を実行したり、Rのネイティブタイプとの変換が必要になると、劇的に性能が劣化してしまいます。

Scala

// Scala API
val ds = (1L to 100000L).toDS
// ~1 second
ds.mapPartitions(iter => iter.filter(_ < 50000)).count()

R

# R API
df <- createDataFrame(lapply(seq(100000), function (e) list(value=e)))
# ~15 seconds - 15 times slower
count(dapply(
df, function(x) as.data.frame(x[x$value < 50000,]), schema(df)))

上のシンプルなケースでは、それぞれのパーティションにおいて50,000未満の値をフィルタリングしただけですが、SparkRの場合は15倍も遅くなっています。

Scala

// Scala API
// ~0.2 seconds
val df = sql("SELECT * FROM range(1000000)").collect()

R

# R API
# ~8 seconds - 40 times slower
df <- collect(sql("SELECT * FROM range(1000000)"))

上の例ではさらに悪くなっています。同じデータをドライバー側に持ってくるのに、SparkRの場合は40倍遅くなっています。

これは、APIがRのネイティブ関数やデータタイプとのやり取りを行うためであり、これらの実装がさほど効率的ではないためです。特筆すべき性能のペナルティがあるAPIは6個あります。

  • createDataFrame()
  • collect()
  • dapply()
  • dapplyCollect()
  • gapply()
  • gapplyCollect()

createDataFrame()collect()において、JVMのデータをRドライバー側とやり取りを行う際、シリアライズ(デシリアライズ)が必要になります。例えば、JavaのStringは、Rではcharacterになります。dapply()gapply()においては、Rネイティブ関数とデータ両方でシリアライズ(デシリアライズ)が必要なため、JVMとRのエグゼキューターの間で変換が必要です。dapplyCollect()gapplyCollect()においては、ドライバー、エグゼキューター両方でJVM、R間のオーバーヘッドが発生します。

ネイティブの実装


SparkRのデータフレームにおける計算処理は、Sparkクラスターで利用可能なノード全てにまたがって分散処理されます。Rのdata.frameとしてデータを取得する必要や、Rネイティブ関数の実行の必要がない場合は、上記のドライバー、エグゼキューターにおけるRプロセス間のコミュニケーションは発生しません。Rのdata.frameやRネイティブ関数の実行が必要な際は、JVMとRのドライバー/エグゼキューター間はソケットを用いてコミュニケーションを行います。

JVMとRの間でコミュニケーションを行う際、CPUパイプラインのような最新のCPUデザインを使用しない非効率なエンコーディングフォーマットを用いて、行ごとにシリアライズ(デシリアライズ)、データ転送を行います。

ベクトル化実装

Apache Spark 3.0においては、最小のシリアライズ(デシリアライズ)コストで、JVM、Rのドライバー/エグゼキューター間でデータを直接やり取りするために、Apache Arrowを活用した新たなベクトル化実装がSparkRに導入されました。

JVMとRの間で非効率的なフォーマットを用いて行ごとにシリアライズ(デシリアライズ)するのではなく、新たな実装では、効率的な列志向フォーマットを用いたパイプライン、Single Instruction Multiple Data (SIMD)を可能にするApaceh Arrowを活用しています。

新たなベクトル化されたSparkRのAPIはデフォルトでは有効になっていませんが、Spark 3.0において、spark.sql.execution.arrow.sparkr.enabledtrueに設定することで有効化されます。ベクトル化されたdapplyCollect()gapplyCollect()はまだ実装されていません。代わりにdapply()gapply()を使うようにしてください。

ベンチマーク結果

以下のベンチマークは、500,000レコードのシンプルなデータセットに対して同じコードを実行し、ベクトル化を有効にした場合と無効にした場合の実行時間を計測したものです。コードとデータセット、ノートブックはGitHubにあります。

Rデータフレームにcollect()createDataFrame()を適用したケースでは、ベクトル化が有効化された際にはおおよそ17倍、42倍の性能改善が認められます。dapply()gapply()の場合はベクトル化を有効化した場合、43倍、33倍の性能改善が認められます。

spark.sql.execution.arrow.sparkr.enabledtrueに設定して最適化を有効化することで、17倍から43倍の性能改善が認められます。データが大きくなるほど、より大きな性能改善が期待されます。詳細に関しては、以前Databricksランタイムで実行されたベンチマークを参照ください。

まとめ

Apache Arrowを活用することで、Apache Spark 3.0ではRデータフレームに対するベクトル化されたAPI、dapply()gapply()collect()createDataFrame()をサポートします。SparkRのベクトル化を有効にするのことで性能が43倍まで改善され、データサイズが大きくなれば、さらなる性能改善が期待できます。

将来的な取り組みに関しては、Apache Arrowの既知の問題ARROW-4512があります。現時点では、JVMとR間のコミュニケーションは完全に整流化されていません。ArrowのR APIがすぐに利用可能なものになっていないため、バッチでシリアライズ、デシリアライズを行う必要があります。また、Apache Spark 3.xのリリースでdapplyCollect()gapplyCollect()がサポートされる予定です。現時点では、ワークアラウンドとしてdapply()collect()gapply()collect()を使用してください。

これらのDatabricksの新機能を是非ご活用ください。Spark 3.0の詳細に関しては、プレビューのウェビナーもご活用ください。

Databricks 無料トライアル

Databricks 無料トライアル

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0