Help us understand the problem. What is going on with this article?

データ分析のための並列処理ライブラリDask

この記事は、Brainpad Advent Calender 15日目の記事です。

本記事では、メモリに乗らないようなデータもPandasやNumPyライクに操作を行い、スケールアップ・スケールアウトにも対応できるライブラリ、Daskについて、簡単に紹介をします。

はじめに

Pythonでデータ分析や機械学習をする際、PandasやNumPyを用いる場面が非常に多くなってきました。
しかし、PandasやNumPyではメモリに乗らないデータの扱いが難しかったり、基本的にシングルコアでの処理を行うため、速度が遅い、といった問題があります。例えば、サーバー上で実行する際、CPUの論理コアが32個あっても、1個のCPUしか使用していない、といった感じです。

近年、データ分析関連のライブラリは非常に多様化しており、派閥(?)が沢山あるようです。
個人的には、Pandas作者であるWes McKinney氏が提唱しているPandas2.0に期待をしていますが、鋭意製作中との事で、まだ実際に使用する事が出来ないです。

本記事では、実際にすぐに使えて、学習コストが低い、という点で、Daskというライブラリで、並列処理を用いた分析を行ってみようと思います。
ちなみに、本記事では紹介をしませんでしたが、Daskは分散処理を行ったり、既存の機械学習ライブラリに、高速化を適用することが出来ます。

本記事で行う事

  • Daskの簡単な紹介
  • Dask.Arrayでの並列処理時のメモリ、実行時間比較
  • Dask.DataFrameでの並列処理時のメモリ、実行時間比較

Daskとは

概要

Daskは、柔軟に並列処理・分散処理を行う分析ライブラリです。
NumPyやPandasと競合するライブラリではなく、それらを高機能にしたラッパーライブラリのようになっています。
Daskの使用用途は大きく二つあります。

  1. NumPy/Pandas/Listのような操作感で、メモリに乗らないような大きいデータ(Out-of-Core)の解析(DB、Sparkなどに近い用途)
  2. カスタムタスクスケジューリング(Luigi, Airflow, Celery, or Makefilesなどに近い用途)

また、Daskのスケーリング方法も大きく二つに分かれます。

  1. スレッド、または、プロセスを使用する単一マシンでの並列処理(スケールアップ)
  2. 複数ノードでのクラスターによる並列処理(スケールアウト)

あまり知られていないようですが、分散処理も可能です。
本記事では、シングルマシンでの並列処理について、フォーカスしています。

Daskで主に使われる以下の3つのクラスは、別オブジェクトの要素から成る集合で構成されています。

集合 要素
Dask.Array NumPy
Dask.DatFrame Pandas.DataFrame
Dask.Bag PythonのObject

例えば、Dask.DataFrameは、Pandas.DataFrameの要素で構成されています。



Dask Documentより

立ち位置

メリット デメリット
NumPy/Pandas - メモリに乗れば、手軽に扱える
- Pythonネイティブ
- メモリに乗らないデータサイズだと難しい
- 基本的に並列処理をするのは手間がかかる
Dask - 複雑なタスクグラフで並列・分散処理が出来る
- Pythonネイティブ
- 手軽に扱える(NumPy/Pandasライクの操作感)
- あまりにも大規模なデータだと難しい
Spark - APIが幅広く使える + SQL LIKE
- エンタープライズの保証がある
- 複雑な計算を扱えない
- JVMがメイン

Daskが便利な場面を簡単にまとめると、

  • Pandasでは速度・リソース的に厳しい場面の時
  • Sparkを使うほどではない、もしくは、Sparkで扱うには複雑な処理が入る時
  • 後からスケールアップ・スケールアウトしたい時

また、Pandas作者のWes McKinney氏曰く、Pandasを使用する際は、データセットのサイズの5倍から10倍のRAMを用意することが推奨とされています。

タスクグラフについて

Daskではプログラムを中規模のタスク(計算単位)に分割するような、タスクグラフを構築します。
これにより、タスクの依存性を考慮し、独立なタスクを同時に並列実行出来る事が可能です。

実際に簡単なコードを書いてみましょう。

↓↓↓↓↓↓↓ あなたの記事の内容
```python

───────
py
↑↑↑↑↑↑↑ 編集リクエストの内容
import numpy as np
import dask.array as da
x = np.arange(100).reshape((10, 10))
dx = da.from_array(x, chunks=(5, 5))
res = (dx * dx).mean()
res.visualize()



上記の例では、(10,10)の行列を4分割して、要素毎の二乗の平均を取ったタスクグラフになっており、独立しているタスクを同時に並列実行しています。

Daskを試してみる

Dask.Array

それでは、いよいよDaskを試してみましょう。
(1500, 1500)の2つの行列の内積の平均を取ってみましょう。

まずは、(1500, 1500)の行列を作成します。

x = np.random.randint(100, size=(1500, 1500))
y = np.random.randint(100, size=(1500, 1500))

NumPyの場合

s = np.dot(x, y).mean()

Numpyで(1500, 1500)の2つの行列の内積の平均を計算してみます。

peak memory: 153.44 MiB
Wall time: 19.5 s
という結果になりました。

Daskの場合

dx = da.from_array(x, chunks=(750, 750))
dy = da.from_array(y, chunks=(750, 750))
res = da.dot(dx, dy).mean()
s = res.compute()

Daskでも(1500, 1500)の2つの行列の内積の平均を計算してみます。

daskの計算部分(compute関数を読んでいる箇所)では、
peak memory: 231.05 MiB
Wall time: 1.9 s
という結果になりました。
NumPyとくらべて、10倍ぐらいの速さで計算を行う事ができました。

ちなみに、ここでなぜかメモリが増えてしまっているのは、
Daskが計算以外の情報を保持する(グラフの情報など)ためだと思われます。そのため、実際にメモリを多く使用するような状況では、
Dask自体のメモリ量を無視する事ができ、問題がなさそうです。

Dask.DataFrame

次に、Dask.DataFrameを試してみます。
Dask.DataFrameの例で使用するデータセットは、NYCの駐車違反のデータセットを用いました。

sed -i -e '1d' Parking_Violations_Issued_-_Fiscal_Year_2015.csv;sed -i -e '1d' Parking_Violations_Issued_-_Fiscal_Year_2016.csv;cat Parking_Violations_Issued_-_Fiscal_Year_2014__August_2013___June_2014_.csv Parking_Violations_Issued_-_Fiscal_Year_2015.csv Parking_Violations_Issued_-_Fiscal_Year_2016.csv > all.csv

結合すると、CSVのファイルサイズが6.5Gになります。
ちなみに、今回はメモリが3.5Gの環境で検証を行っています。

まずは、簡単に、"Vehicle Color"のユニーク要素の頻度を数えてみます。

Pandasの場合

まずは、標準の読み込みを試してみます。

%time %memit df = pd.read_csv('all.csv')
  MemoryError

そもそも読み込む事が出来ず、メモリエラーになりました。
次に、chunksizeを指定した読み込みによる処理を行ってみます。

df_chunks = pd.read_csv(df_path, chunksize=1000000, dtype="object")
pieces = [df["Vehicle Color"].value_counts()  for df in df_chunks]
s =  pd.concat(pieces)

計算処理が
peak memory: 2088.89 MiB,
Wall time: 4min 12s
という結果になりました。

Daskの場合

Daskで同じ処理をした時の比較をしてみましょう。

ddf = dd.read_csv('all.csv', dtype="object")
ddf["Vehicle Color"].value_counts().compute(get=dask.multiprocessing.get)

peak memory: 781.77 MiB,
Wall time: 1min 45s
という結果になりました。
メモリを3倍近く抑えながら、2.5倍近く早い処理を行う事が出来ました。

ちなみに、Pandasと読み込み条件を揃えるため、dtype='object'を設定しています。Daskでは、通常、特定サンプル数から推定したdtypeを途中から読み込み時に使用して、高速化を行っています。

まとめ

今回は、単純な例での検証しか行う事ができませんでしたが、簡単に並列処理を行ったり、Out-of-Coreのデータを読み込む事が出来ました。
実際には、Cython化と組み合わせたり、分散処理を行う事で、更に早く分析処理を行う事が出来ます。また、今回は触れませんでしたが、
Daskは既存の機械学習ライブラリと組み合わせて使用する事で、高速化する事も可能です。

最後に余談ですが、今回はCSVを用いた分析を行いましたが、Arrowや、Parquetなどの列志向ファイルフォーマットを使用する事でより早く読み込む事が可能です。

参考文献

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした