この記事は、Brainpad Advent Calender 15日目の記事です。
本記事では、メモリに乗らないようなデータもPandasやNumPyライクに操作を行い、スケールアップ・スケールアウトにも対応できるライブラリ、Daskについて、簡単に紹介をします。
はじめに
Pythonでデータ分析や機械学習をする際、PandasやNumPyを用いる場面が非常に多くなってきました。
しかし、PandasやNumPyではメモリに乗らないデータの扱いが難しかったり、基本的にシングルコアでの処理を行うため、速度が遅い、といった問題があります。例えば、サーバー上で実行する際、CPUの論理コアが32個あっても、1個のCPUしか使用していない、といった感じです。
近年、データ分析関連のライブラリは非常に多様化しており、派閥(?)が沢山あるようです。
個人的には、Pandas作者であるWes McKinney氏が提唱しているPandas2.0に期待をしていますが、鋭意製作中との事で、まだ実際に使用する事が出来ないです。
本記事では、実際にすぐに使えて、学習コストが低い、という点で、Daskというライブラリで、並列処理を用いた分析を行ってみようと思います。
ちなみに、本記事では紹介をしませんでしたが、Daskは分散処理を行ったり、既存の機械学習ライブラリに、高速化を適用することが出来ます。
本記事で行う事
- Daskの簡単な紹介
- Dask.Arrayでの並列処理時のメモリ、実行時間比較
- Dask.DataFrameでの並列処理時のメモリ、実行時間比較
Daskとは
概要
Daskは、柔軟に並列処理・分散処理を行う分析ライブラリです。
NumPyやPandasと競合するライブラリではなく、それらを高機能にしたラッパーライブラリのようになっています。
Daskの使用用途は大きく二つあります。
- NumPy/Pandas/Listのような操作感で、メモリに乗らないような大きいデータ(Out-of-Core)の解析(DB、Sparkなどに近い用途)
- カスタムタスクスケジューリング(Luigi, Airflow, Celery, or Makefilesなどに近い用途)
また、Daskのスケーリング方法も大きく二つに分かれます。
- スレッド、または、プロセスを使用する単一マシンでの並列処理(スケールアップ)
- 複数ノードでのクラスターによる並列処理(スケールアウト)
あまり知られていないようですが、分散処理も可能です。
本記事では、シングルマシンでの並列処理について、フォーカスしています。
Daskで主に使われる以下の3つのクラスは、別オブジェクトの要素から成る集合で構成されています。
集合 | 要素 |
---|---|
Dask.Array | NumPy |
Dask.DatFrame | Pandas.DataFrame |
Dask.Bag | PythonのObject |
例えば、Dask.DataFrameは、Pandas.DataFrameの要素で構成されています。
立ち位置
メリット | デメリット | |
---|---|---|
NumPy/Pandas | - メモリに乗れば、手軽に扱える - Pythonネイティブ |
- メモリに乗らないデータサイズだと難しい - 基本的に並列処理をするのは手間がかかる |
Dask | - 複雑なタスクグラフで並列・分散処理が出来る - Pythonネイティブ - 手軽に扱える(NumPy/Pandasライクの操作感) |
- あまりにも大規模なデータだと難しい |
Spark | - APIが幅広く使える + SQL LIKE - エンタープライズの保証がある |
- 複雑な計算を扱えない - JVMがメイン |
Daskが便利な場面を簡単にまとめると、
- Pandasでは速度・リソース的に厳しい場面の時
- Sparkを使うほどではない、もしくは、Sparkで扱うには複雑な処理が入る時
- 後からスケールアップ・スケールアウトしたい時
また、Pandas作者のWes McKinney氏曰く、Pandasを使用する際は、データセットのサイズの5倍から10倍のRAMを用意することが推奨とされています。
タスクグラフについて
Daskではプログラムを中規模のタスク(計算単位)に分割するような、タスクグラフを構築します。
これにより、タスクの依存性を考慮し、独立なタスクを同時に並列実行出来る事が可能です。
実際に簡単なコードを書いてみましょう。
↓↓↓↓↓↓↓ あなたの記事の内容
───────
```py
↑↑↑↑↑↑↑ 編集リクエストの内容
import numpy as np
import dask.array as da
x = np.arange(100).reshape((10, 10))
dx = da.from_array(x, chunks=(5, 5))
res = (dx * dx).mean()
res.visualize()
上記の例では、(10,10)の行列を4分割して、要素毎の二乗の平均を取ったタスクグラフになっており、独立しているタスクを同時に並列実行しています。
Daskを試してみる
Dask.Array
それでは、いよいよDaskを試してみましょう。
(1500, 1500)の2つの行列の内積の平均を取ってみましょう。
まずは、(1500, 1500)の行列を作成します。
x = np.random.randint(100, size=(1500, 1500))
y = np.random.randint(100, size=(1500, 1500))
NumPyの場合
s = np.dot(x, y).mean()
Numpyで(1500, 1500)の2つの行列の内積の平均を計算してみます。
peak memory: 153.44 MiB
Wall time: 19.5 s
という結果になりました。
Daskの場合
dx = da.from_array(x, chunks=(750, 750))
dy = da.from_array(y, chunks=(750, 750))
res = da.dot(dx, dy).mean()
s = res.compute()
Daskでも(1500, 1500)の2つの行列の内積の平均を計算してみます。
daskの計算部分(compute関数を読んでいる箇所)では、
peak memory: 231.05 MiB
Wall time: 1.9 s
という結果になりました。
NumPyとくらべて、10倍ぐらいの速さで計算を行う事ができました。
ちなみに、ここでなぜかメモリが増えてしまっているのは、
Daskが計算以外の情報を保持する(グラフの情報など)ためだと思われます。そのため、実際にメモリを多く使用するような状況では、
Dask自体のメモリ量を無視する事ができ、問題がなさそうです。
Dask.DataFrame
次に、Dask.DataFrameを試してみます。
Dask.DataFrameの例で使用するデータセットは、NYCの駐車違反のデータセットを用いました。
sed -i -e '1d' Parking_Violations_Issued_-_Fiscal_Year_2015.csv;sed -i -e '1d' Parking_Violations_Issued_-_Fiscal_Year_2016.csv;cat Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv Parking_Violations_Issued_-_Fiscal_Year_2015.csv Parking_Violations_Issued_-_Fiscal_Year_2016.csv > all.csv
結合すると、CSVのファイルサイズが6.5Gになります。
ちなみに、今回はメモリが3.5Gの環境で検証を行っています。
まずは、簡単に、"Vehicle Color"のユニーク要素の頻度を数えてみます。
Pandasの場合
まずは、標準の読み込みを試してみます。
%time %memit df = pd.read_csv('all.csv')
MemoryError
そもそも読み込む事が出来ず、メモリエラーになりました。
次に、chunksizeを指定した読み込みによる処理を行ってみます。
df_chunks = pd.read_csv(df_path, chunksize=1000000, dtype="object")
pieces = [df["Vehicle Color"].value_counts() for df in df_chunks]
s = pd.concat(pieces)
計算処理が
peak memory: 2088.89 MiB,
Wall time: 4min 12s
という結果になりました。
Daskの場合
Daskで同じ処理をした時の比較をしてみましょう。
ddf = dd.read_csv('all.csv', dtype="object")
ddf["Vehicle Color"].value_counts().compute(get=dask.multiprocessing.get)
peak memory: 781.77 MiB,
Wall time: 1min 45s
という結果になりました。
メモリを3倍近く抑えながら、2.5倍近く早い処理を行う事が出来ました。
ちなみに、Pandasと読み込み条件を揃えるため、dtype='object'
を設定しています。Daskでは、通常、特定サンプル数から推定したdtypeを途中から読み込み時に使用して、高速化を行っています。
まとめ
今回は、単純な例での検証しか行う事ができませんでしたが、簡単に並列処理を行ったり、Out-of-Coreのデータを読み込む事が出来ました。
実際には、Cython化と組み合わせたり、分散処理を行う事で、更に早く分析処理を行う事が出来ます。また、今回は触れませんでしたが、
Daskは既存の機械学習ライブラリと組み合わせて使用する事で、高速化する事も可能です。
最後に余談ですが、今回はCSVを用いた分析を行いましたが、Arrowや、Parquetなどの列志向ファイルフォーマットを使用する事でより早く読み込む事が可能です。