15
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

お前らの遅いPython集計処理を爆速にするための各種ライブラリベンチマーク結果

Last updated at Posted at 2018-12-17

この記事は全部俺 Advent Calendar 2018の17日目の記事です。
タイトルはごめんなさい:bow:
煽ってみました。

まとめ

Pythonにおける集計処理について、Pandas, Dask, Pysparkを使用した計測を行いました。
Coloboratory(GPU, TPUなし)で計測しました。
計測結果は、すべての要素に+1をする処理を行ったもので%timeitでの結果になります。

ライブラリ 実行速度
Pandas 22.6 ms
Dask 47 ms
Pyspark UDF 2.71 ms
Pyspark Pandas_UDF 2.98 ms

要素ごとの処理

計測処理:すべての要素に+1をする処理

pandas

import numpy as np
import pandas as pd

pdf = pd.DataFrame(index=range(0, 10 * 1000 * 1000), data=np.random.random(10 * 1000 * 1000), columns=["value"])

%time pdf["value"] + 1
10 loops, best of 3: 22.6 ms per loop

Dask

import dask.dataframe as dd
ddf = dd.from_pandas(pdf, 2)

%timeit (ddf + 1).compute()
10 loops, best of 3: 47 ms per loop

Pyspark

まずはPysparkをColaboratoryで使用する準備から始めます。

!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://apache.osuosl.org/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz
!tar xf spark-2.4.0-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.0-bin-hadoop2.7"

# findsparkで環境設定
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

!pip install pyspark

Apache Arrow(メモリアロケートに最適化された列指向フォーマット)を遊行にするため、以下のコマンドも実施します。

!pip install -q pyarrow
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

Python UDF

from pyspark.sql.types import *
from pyspark.sql.functions import col, count, rand, collect_list, explode, struct, count, lit
from pyspark.sql.functions import udf, pandas_udf, PandasUDFType

sdf = spark.createDataFrame(pdf)

@udf('double')
def plus_one(v):
    return v + 1

%timeit sdf.withColumn('value', plus_one(sdf.value))
The slowest run took 4.94 times longer than the fastest. This could mean that an intermediate result is being cached.
100 loops, best of 3: 2.71 ms per loop

キャッシュされてるかもしれないと出ていますが、100ループして最遅が2.71 * 4.94 = 13.39 msなので、Pysparkが最速になっています。

Pandas UDF

@pandas_udf("double", PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return v + 1

%timeit sdf.withColumn('value', pandas_plus_one(sdf.value))
The slowest run took 4.41 times longer than the fastest. This could mean that an intermediate result is being cached.
100 loops, best of 3: 2.98 ms per loop

こちらも同様に、キャッシュされてるかもしれないと出ていますが、100ループして最遅が2.98 * 4.41 = 13.14 msなので、速いですね。

簡単な考察/解説

一番遅いのは素直にPandasを使用したものになると思いきや、Daskが一番遅くなってしまいました。
これは、この程度の計算処理だとパーティションを作成してマージするコストの方が大きくなってしまうからかもしれません。

Pysparkは、環境作成に手間がかかるものの、やっぱり爆速です。
余談ですが、PysparkのUDF(User Define Function)の2種類には、以下の違いがあります。

  • Python UDF
    • データのシリアライズ/デシリアライズにpickleを使用
    • データフェッチはブロックごとだが、UDFの処理は行ごと
  • Pandas UDF
    • データのシリアライズ/デシリアライズにカラムナデータフォーマットのApache Arrowを使用
    • UDF処理もブロックごと(なので、集計処理が早い)
    • データ集計方法が3種類ある
      • PandasUDFType.SCALAR
        • 1つ以上のベクトルから1つのベクトルへの演算処理
        • すべての要素に1を足したり、2列のベクトル同士の和をとって新しいベクトルを作成したりする処理
      • PandasUDFType.GROUPED_MAP
        • 1つのテーブル(pd.DataFrame)から1つのテーブルへの演算処理
        • すべての要素から、平均値を引いて新しいテーブルを作成したりする処理
      • PandasUDFType.GROUPED_AGG
        • 1つのテーブル(pd.DataFrame)から1つのスカラー値を得る演算処理
        • すべての要素の平均値など

参考

databricks
PySparkでネイティブPythonコードを実行する方法

15
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?