Python
numpy
Spark
pandas
Dask
BrainPadDay 15

データ分析のための並列処理ライブラリDask

More than 1 year has passed since last update.

この記事は、Brainpad Advent Calender 15日目の記事です。


本記事では、メモリに乗らないようなデータもPandasやNumPyライクに操作を行い、スケールアップ・スケールアウトにも対応できるライブラリ、Daskについて、簡単に紹介をします。


はじめに

Pythonでデータ分析や機械学習をする際、PandasやNumPyを用いる場面が非常に多くなってきました。

しかし、PandasやNumPyではメモリに乗らないデータの扱いが難しかったり、基本的にシングルコアでの処理を行うため、速度が遅い、といった問題があります。例えば、サーバー上で実行する際、CPUの論理コアが32個あっても、1個のCPUしか使用していない、といった感じです。

近年、データ分析関連のライブラリは非常に多様化しており、派閥(?)が沢山あるようです。

個人的には、Pandas作者であるWes McKinney氏が提唱しているPandas2.0に期待をしていますが、鋭意製作中との事で、まだ実際に使用する事が出来ないです。

本記事では、実際にすぐに使えて、学習コストが低い、という点で、Daskというライブラリで、並列処理を用いた分析を行ってみようと思います。

ちなみに、本記事では紹介をしませんでしたが、Daskは分散処理を行ったり、既存の機械学習ライブラリに、高速化を適用することが出来ます。

本記事で行う事


  • Daskの簡単な紹介

  • Dask.Arrayでの並列処理時のメモリ、実行時間比較

  • Dask.DataFrameでの並列処理時のメモリ、実行時間比較


Daskとは


概要

Daskは、柔軟に並列処理・分散処理を行う分析ライブラリです。

NumPyやPandasと競合するライブラリではなく、それらを高機能にしたラッパーライブラリのようになっています。

Daskの使用用途は大きく二つあります。


  1. NumPy/Pandas/Listのような操作感で、メモリに乗らないような大きいデータ(Out-of-Core)の解析(DB、Sparkなどに近い用途)

  2. カスタムタスクスケジューリング(Luigi, Airflow, Celery, or Makefilesなどに近い用途)

また、Daskのスケーリング方法も大きく二つに分かれます。


  1. スレッド、または、プロセスを使用する単一マシンでの並列処理(スケールアップ)

  2. 複数ノードでのクラスターによる並列処理(スケールアウト)

あまり知られていないようですが、分散処理も可能です。

本記事では、シングルマシンでの並列処理について、フォーカスしています。

Daskで主に使われる以下の3つのクラスは、別オブジェクトの要素から成る集合で構成されています。

集合
要素

Dask.Array
NumPy

Dask.DatFrame
Pandas.DataFrame

Dask.Bag
PythonのObject

例えば、Dask.DataFrameは、Pandas.DataFrameの要素で構成されています。







Dask Documentより



立ち位置

メリット
デメリット

NumPy/Pandas
- メモリに乗れば、手軽に扱える
- Pythonネイティブ
- メモリに乗らないデータサイズだと難しい
- 基本的に並列処理をするのは手間がかかる

Dask
- 複雑なタスクグラフで並列・分散処理が出来る
- Pythonネイティブ
- 手軽に扱える(NumPy/Pandasライクの操作感)
- あまりにも大規模なデータだと難しい

Spark
- APIが幅広く使える + SQL LIKE
- エンタープライズの保証がある

- 複雑な計算を扱えない
- JVMがメイン

Daskが便利な場面を簡単にまとめると、


  • Pandasでは速度・リソース的に厳しい場面の時

  • Sparkを使うほどではない、もしくは、Sparkで扱うには複雑な処理が入る時

  • 後からスケールアップ・スケールアウトしたい時

また、Pandas作者のWes McKinney氏曰く、Pandasを使用する際は、データセットのサイズの5倍から10倍のRAMを用意することが推奨とされています。


タスクグラフについて

Daskではプログラムを中規模のタスク(計算単位)に分割するような、タスクグラフを構築します。

これにより、タスクの依存性を考慮し、独立なタスクを同時に並列実行出来る事が可能です。

実際に簡単なコードを書いてみましょう。

import numpy as np

import dask.array as da
x = np.arange(100).reshape((10, 10))
dx = da.from_array(x, chunks=(5, 5))
res = (dx * dx).mean()
res.visualize()





上記の例では、(10,10)の行列を4分割して、要素毎の二乗の平均を取ったタスクグラフになっており、独立しているタスクを同時に並列実行しています。


Daskを試してみる


Dask.Array

それでは、いよいよDaskを試してみましょう。

(1500, 1500)の2つの行列の内積の平均を取ってみましょう。

まずは、(1500, 1500)の行列を作成します。



x = np.random.randint(100, size=(1500, 1500))

y = np.random.randint(100, size=(1500, 1500))


NumPyの場合

s = np.dot(x, y).mean()

Numpyで(1500, 1500)の2つの行列の内積の平均を計算してみます。

peak memory: 153.44 MiB

Wall time: 19.5 s

という結果になりました。


Daskの場合

dx = da.from_array(x, chunks=(750, 750))

dy = da.from_array(y, chunks=(750, 750))
res = da.dot(dx, dy).mean()
s = res.compute()

Daskでも(1500, 1500)の2つの行列の内積の平均を計算してみます。

daskの計算部分(compute関数を読んでいる箇所)では、

peak memory: 231.05 MiB

Wall time: 1.9 s

という結果になりました。

NumPyとくらべて、10倍ぐらいの速さで計算を行う事ができました。

ちなみに、ここでなぜかメモリが増えてしまっているのは、

Daskが計算以外の情報を保持する(グラフの情報など)ためだと思われます。そのため、実際にメモリを多く使用するような状況では、

Dask自体のメモリ量を無視する事ができ、問題がなさそうです。


Dask.DataFrame

次に、Dask.DataFrameを試してみます。

Dask.DataFrameの例で使用するデータセットは、NYCの駐車券のデータセットを用いました。

sed -i -e '1d' Parking_Violations_Issued_-_Fiscal_Year_2015.csv;sed -i -e '1d' Parking_Violations_Issued_-_Fiscal_Year_2016.csv;cat Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv Parking_Violations_Issued_-_Fiscal_Year_2015.csv Parking_Violations_Issued_-_Fiscal_Year_2016.csv > all.csv

結合すると、CSVのファイルサイズが6.5Gになります。

ちなみに、今回はメモリが3.5Gの環境で検証を行っています。

まずは、簡単に、"Vehicle Color"のユニーク要素の頻度を数えてみます。


Pandasの場合

まずは、標準の読み込みを試してみます。

%time %memit df = pd.read_csv('all.csv')

MemoryError

そもそも読み込む事が出来ず、メモリエラーになりました。

次に、chunksizeを指定した読み込みによる処理を行ってみます。

df_chunks = pd.read_csv(df_path, chunksize=1000000, dtype="object")

pieces = [df["Vehicle Color"].value_counts() for df in df_chunks]
s = pd.concat(pieces)

計算処理が

peak memory: 2088.89 MiB,

Wall time: 4min 12s

という結果になりました。


Daskの場合

Daskで同じ処理をした時の比較をしてみましょう。

ddf = dd.read_csv('all.csv', dtype="object")

ddf["Vehicle Color"].value_counts().compute(get=dask.multiprocessing.get)

peak memory: 781.77 MiB,

Wall time: 1min 45s

という結果になりました。

メモリを3倍近く抑えながら、2.5倍近く早い処理を行う事が出来ました。

ちなみに、Pandasと読み込み条件を揃えるため、dtype='object'を設定しています。Daskでは、通常、特定サンプル数から推定したdtypeを途中から読み込み時に使用して、高速化を行っています。


まとめ

今回は、単純な例での検証しか行う事ができませんでしたが、簡単に並列処理を行ったり、Out-of-Coreのデータを読み込む事が出来ました。

実際には、Cython化と組み合わせたり、分散処理を行う事で、更に早く分析処理を行う事が出来ます。また、今回は触れませんでしたが、

Daskは既存の機械学習ライブラリと組み合わせて使用する事で、高速化する事も可能です。

最後に余談ですが、今回はCSVを用いた分析を行いましたが、Arrowや、Parquetなどの列志向ファイルフォーマットを使用する事でより早く読み込む事が可能です。


参考文献