この記事は全部俺 Advent Calendar 2018の17日目の記事です。
タイトルはごめんなさい
煽ってみました。
まとめ
Pythonにおける集計処理について、Pandas, Dask, Pysparkを使用した計測を行いました。
Coloboratory(GPU, TPUなし)で計測しました。
計測結果は、すべての要素に+1
をする処理を行ったもので%timeit
での結果になります。
ライブラリ | 実行速度 |
---|---|
Pandas | 22.6 ms |
Dask | 47 ms |
Pyspark UDF | 2.71 ms |
Pyspark Pandas_UDF | 2.98 ms |
要素ごとの処理
計測処理:すべての要素に+1
をする処理
pandas
import numpy as np
import pandas as pd
pdf = pd.DataFrame(index=range(0, 10 * 1000 * 1000), data=np.random.random(10 * 1000 * 1000), columns=["value"])
%time pdf["value"] + 1
10 loops, best of 3: 22.6 ms per loop
Dask
import dask.dataframe as dd
ddf = dd.from_pandas(pdf, 2)
%timeit (ddf + 1).compute()
10 loops, best of 3: 47 ms per loop
Pyspark
まずはPysparkをColaboratoryで使用する準備から始めます。
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"
# findsparkで環境設定
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
!pip install pyspark
Apache Arrow(メモリアロケートに最適化された列指向フォーマット)を遊行にするため、以下のコマンドも実施します。
!pip install -q pyarrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
Python UDF
from pyspark.sql.types import *
from pyspark.sql.functions import col, count, rand, collect_list, explode, struct, count, lit
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType
sdf = spark.createDataFrame(pdf)
@udf('double')
def plus_one(v):
return v + 1
%timeit sdf.withColumn('value', plus_one(sdf.value))
The slowest run took 4.94 times longer than the fastest. This could mean that an intermediate result is being cached.
100 loops, best of 3: 2.71 ms per loop
キャッシュされてるかもしれないと出ていますが、100ループして最遅が2.71 * 4.94 = 13.39 ms
なので、Pysparkが最速になっています。
Pandas UDF
@pandas_udf("double", PandasUDFType.SCALAR)
def pandas_plus_one(v):
return v + 1
%timeit sdf.withColumn('value', pandas_plus_one(sdf.value))
The slowest run took 4.41 times longer than the fastest. This could mean that an intermediate result is being cached.
100 loops, best of 3: 2.98 ms per loop
こちらも同様に、キャッシュされてるかもしれないと出ていますが、100ループして最遅が2.98 * 4.41 = 13.14 ms
なので、速いですね。
簡単な考察/解説
一番遅いのは素直にPandasを使用したものになると思いきや、Daskが一番遅くなってしまいました。
これは、この程度の計算処理だとパーティションを作成してマージするコストの方が大きくなってしまうからかもしれません。
Pysparkは、環境作成に手間がかかるものの、やっぱり爆速です。
余談ですが、PysparkのUDF(User Define Function)の2種類には、以下の違いがあります。
- Python UDF
- データのシリアライズ/デシリアライズに
pickle
を使用 - データフェッチはブロックごとだが、UDFの処理は行ごと
- データのシリアライズ/デシリアライズに
- Pandas UDF
- データのシリアライズ/デシリアライズにカラムナデータフォーマットの
Apache Arrow
を使用 - UDF処理もブロックごと(なので、集計処理が早い)
- データ集計方法が3種類ある
-
PandasUDFType.SCALAR
- 1つ以上のベクトルから1つのベクトルへの演算処理
- すべての要素に1を足したり、2列のベクトル同士の和をとって新しいベクトルを作成したりする処理
-
PandasUDFType.GROUPED_MAP
- 1つのテーブル(
pd.DataFrame
)から1つのテーブルへの演算処理 - すべての要素から、平均値を引いて新しいテーブルを作成したりする処理
- 1つのテーブル(
-
PandasUDFType.GROUPED_AGG
- 1つのテーブル(
pd.DataFrame
)から1つのスカラー値を得る演算処理 - すべての要素の平均値など
- 1つのテーブル(
-
- データのシリアライズ/デシリアライズにカラムナデータフォーマットの