1
2

More than 3 years have passed since last update.

シングルノードマシンにおけるApache Sparkのベンチマーク

Last updated at Posted at 2021-09-14

Benchmarking Apache Spark on a Single Node Machine - The Databricks Blogの翻訳です。

2018年5月時点の記事です。

Apache Sparkは、分散環境におけるビッグデータ処理に対する統合分析エンジンのデファクトとなりました。まだ、多くのユーザーが大規模Sparkクラスターを選択するのではなく、シングルマシン、時にはラップトップ上でSparkを実行することを選択しています。主に以下の理由からSparkが選択されています。

  1. ラップトップの「スモールデータ」からクラスターの「ビッグデータ」にスケールする単一かつ統合されたAPI
  2. Python、R、Scala、Java、ANSI SQLサポートを通じた多言語プログラミングモードのサポート
  3. Pandasユーザー定義関数(UDF)を通じたPandasのようなPyDataツールとの密連携

上のようなことは明白かもしれませんが、多くの場合、ユーザーは以下のことを知ると驚きます。

  • シングルノードにおけるSparkのインストールには設定は不要です(ダウンロードして実行するだけです)。
  • Sparkは多くの場合、並列実行性によって、シングルノードのPyDataツールよりも高速です。
  • Sparkでは処理の前に全てのデータをロードする必要がないため、より少ないメモリー消費で済み、ラップトップのメモリーサイズよりも大きいデータを処理できる場合があります。

PyDataツールは、Apache Sparkの利用しやすさとパフォーマンスに貢献しています。例えば、SparkのデータフレームAPIはPandasのデータフレームAPIにインスパイアされました。別の例としては、Spark 2.3におけるPandas UDFはSparkとPandasを組み合わせることでPySparkのパフォーマンスを劇的に改善しました。

本記事では、PySparkを用いたシングルノードの計算のメリットをデモンストレートし、我々の知見を共有します。実験を通じて、シングルノードマシンのメモリーよりも大きいデータセットに対して、なぜPandasではなくPySparkを用いたいと考えるのかを説明します。

ラップトップ上にApache Sparkをセットアップする

Sparkはもともと分散データ処理のために設計されましたが、初めてSparkに触る開発者が容易に実験できるように、ローカル環境へのインストールを容易にするために多大な労力が費やされました。tarballをダウンロード、解凍し、セットアップなしにすぐに使い始めることができます。例えば、以下のコマンドを実行することでSparkのtarballをダウンロードし、Sparkを起動します。

Bash
wget http://www-us.apache.org/dist/spark/spark-2.3.0/spark-2.3.0-bin-hadoop2.7.tgz
tar -xvf spark-2.3.0-bin-hadoop2.7.tgz
cd spark-2.3.0-bin-hadoop2.7
bin/pyspark

さらに、SparkはPyPI、Homebrew、Condaからも利用でき、以下のようなコマンドでのインストールが可能です。

Bash
pip install pyspark

homebrew install apache-spark

シングルノードのパフォーマンス

IntelがCPUクロックを高速化できるようになって数年が経ちました。周波数を上げることでシングルコアをより強力なものにするよりも、最新のチップはコア数をスケールさせるようになっています。このため、ラップトップやワークステーションが16コアを持つことは珍しくなく、サーバーに至っては64コア、128コアを持つようにもなっています。このように、これらのマルチコア、シングルノードマシンは、従来のシングルコアマシンよりも分散システムに似たものとなっています。

データがシングルマシンのメモリに収まる際には、分散システムはシングルノードシステムよりも遅くなるということをよく耳にします。一般的なSQLクエリーを用いてSparkとPandasのメモリー使用量とパフォーマンスを比較することで、必ずしもそうではないことを確認しました。シングルノードにおいてSparkとPandasを比較数rために3つの一般的なSQLクエリーを使用しました。

Query 1. SELECT max(ss_list_price) FROM store_sales

Query 2. SELECT count(distinct ss_customer_sk) FROM store_sales

Query 3. SELECT sum(ss_net_profit) FROM store_sales GROUP BY ss_store_sk

これらのデモンストレーションを行うために、(ParquetとCSV両方の)最大データサイズを計測しました。Pandasは244GBメモリーのシングルノードでデータをロードすることができ、これら3つのクエリーの性能を比較しました。

セットアップ及び設定

ハードウェア

以下の設定の仮想マシンを使用しました。

  • CPUコア数: 32仮想コア (16物理コア), Intel Xeon CPU E5-2686 v4 @ 2.30GHz
  • システムメモリー: 244 GB
  • シャッフルに利用できる総ディスク容量: 4 x 1900 GB NVMe SSD

ソフトウェア

  • OS: Ubuntu 16.04
  • Spark: ローカルクラスターモードのApache Spark 2.3.0
  • Pandasバージョン: 0.20.3
  • Pythonバージョン: 2.7.12

PySparkおよびPandas

このベンチマークにおける入力データはTPC-DSの「store_sales」テーブルであり、Long/Doubleデータ型の23のカラムから構成されます。

スケーラビリティ

Pandasにおいては、データファイルをロードするためのメモリーリソースが必要となります。以下のテストにおいては、PandasとPyarrowを用いて10から270倍の「store_sales」テーブルをロードし、Pythonが処理できる最大データサイズを記録しました。以下のグラフでは、シングルノードマシンで処理できる常駐セットサイズ(RSS)が線形に増加している様子を示しています。

また、i3.8xlargeインスタンスでPandasがエラーが発生する直前のファイルサイズも計測しました。

Sparkにおいては、同じAPIを使用してラップトップの小規模データセットからクラスター上の「ビッグデータ」にスケールすることは簡単です。シングルノードであっても、メモリにデータが収まらない場合にはSparkのオペレーターはディスクにスピルさせることで、あらゆるサイズのデータに対して処理を実行することができます。

パフォーマンス

このベンチマークでは、Parquetファイルフォーマットの「store_sales」テーブル(10倍から260倍)に対するSQLを実行しました。

PySparkは10GBのメモリー、16スレッドのローカルクラスターモードで実行します。

入力データサイズが増加すると、Pandasでは39GB以上のParquetファイルの取り扱いに失敗しクラッシュしていますが、PySparkは限られたリソースにおいても優れたパフォーマンスを示していることを観測しました。


全てのコアにおける並列実行によって、テストにおいてはPySparkでメモリーにデータをキャッシュしなくても、PySparkはPandasよりも高速であることを観測できました。これをデモンストレーションするために、250倍規模の入力データ(約35GBのディスク上のサイズ)を用い、異なるスレッドの数でPySparkのベンチマークを計測しました。

PySparkとPandas UDF

一方で、Apache Arrow上に構築されているPandas UDFは、シングルノードマシンであっても、分散クラスターであっても、Python開発者に対して高いパフォーマンスを提供します。これはApache Spark 2.3で導入され、Two SigmaのLi JinがPySparkと密連携されたPandas UDFをデモンストレーションし、行ごとに処理を行う標準Python UDFとPandas UDFにおけるCumulative Probability(累積確率)の計算や他の計算関数のベンチマーク結果を比較しました。

Pandas UDFはスカラーオペレーションをベクトル化するために使用されます。scipyパッケージを用いて標準分布 N(0,1)における累積確率の値を計算する際のPySparkのスカラーPandas UDFの利用を考えてみます。

Python
import pandas as pd
from scipy import stats

@pandas_udf('double')
def cdf(v):
return pd.Series(stats.norm.cdf(v))
#
# use Pandas UDF now in the Spark DataFrame
#
df.withColumn('cumulative_probability', cdf(df.v))

Pandas UDFとしては、このコードは通常のPython UDFよりもはるかに高速です。以下のチャートでは、適用された全ての計算関数において、Pandas UDFが行ごとに処理を行うPython UDFよりもはるかに高い性能を示していることがわかります。

Pandas UDFに関するブログ記事から抜粋した部分的なチャート

結論

ここでは、なぜシングルマシンにおいてもSparkを使用するのかを説明しました。いくつかのSparkリリースにおいて、PandasはSparkに多大なる貢献をし、Sparkと密接に統合されました。大きな成果の一つがPandas UDFです。実際、Pandas APIとSparkデータフレームの類似性によって、これらの相互オペレーションが容易となり、多くの開発者は両方を組み合わせて活用しています。

シングルノードのメモリーに収まらない大規模データセットに対するシングルノードでの分析においては、Sparkは高速なランタイムと、マルチコアの並列性による優れたスケーラビリティ、適切にパイプライン化された実行エンジンを提供します。

このベンチマークで使用されたソースコードには、以下からアクセスすることができます。

Databricks 無料トライアル

Databricks 無料トライアル

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2