はじめに
このページではPythonの並列分散処理ライブラリDaskの使い方をまとめます。
このDaskでは、pandas・Numpyライクな操作で、目盛りなデータ処理が実現できます。
詳細は以下ページをご参照ください。
Daskのインストール方法
pythonをanacondaでインストールしている方は、デフォルトでインストールされていると思います。
以下のコマンドで、Daskのバージョンが表示されればOKです。
conda list dask
もし、表示されない場合はpip install dask
でインストールしてください。
もしくは(condaコマンドが使える場合は)conda install dask
基本的な使い方
###テキストファイルの読み込み(read_csv)
Daskによるテキストファイルの読み込みには、dask.dataframeのread_csvを使用します。
import dask.dataframe as dd
ddf = dd.read_csv('test.csv')
基本的な使い方はpandas.DataFrameのread_csvと同様です。
ただし、この時点で例えば20GBぐらいのデータでも読み込むことができます!Dask凄いぞ!
詳しくは、下記をご参照ください。
公式ページ
###テキストファイルへの書き出し(to_csv)
Daskデータフレームをテキストファイルに書き出すには、dask.DataFrameのto_csvを使用します。
ddf.to_csv('test.csv')
結果を複数ファイルに分割して出力したい場合、以下の様に書きます。
ddf.to_csv('test_*.csv')
その他、基本的な使い方はpandas.DataFrameのto_csvと同様です。
詳しくは、下記をご参照ください。
https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.Series.to_csv
##データ処理
###データの変換(where,mask,apply)
データの変換方法については、where、mask、applyを使った3種類を記載します。
###where
例:ddf['x']の0以下の値をnanにする。
ddf['x'] = ddf['x'].where(ddf['x'] > 0,np.nan)
(※whereは、そのままにしたい値の条件を指定する。)
mask
例:ddf['x']の0以下の値をnanにする。
ddf['x'] = ddf['x'].mask(ddf['x'] <= 0,np.nan)
(※maskは、変更したい値の条件を指定する。)
### apply
例:ddf['x']の値をキーにして、dict_changesの値に置き換える。
```python
dict_changes = dict({'0':'A','1':'B','2':'C'})
ddf['x'] = ddf['x'].apply(lambda x:dict_cahnges[x],meta=dict(('x','object')))
(※metaオプションで各カラムのデータ型を指定する)
区切り文字による分割(str.split)
データフレームの区切り文字による分割には、str.splitが使えます。
以下の様に書くと、ddf['y']列を「:」で分割した配列を値にもつ、シリーズが生成される。
ddf['y'] = ddf['y'].str.split(':')
ダミー変数化(get_dummies)
データのダミー変数化には、dask.dataframeのget_dummies
が使用できます。
ただし、dask.dataframeは行方向にしかデータを分割できないため、ダミー変数を作成する際には1列分のデータを取得するために、すべてのデータを読み込まなければならず、メモリエラーを起こす危険性があります。
そこで、大規模データの処理を行う際には、dask.dataframeを一度dask.arrayに1列ずつに分割した形で変換し、
それから1列分のデータのみを再度dask.dataframeに変換し、get_dummiesしてやるのが良いと思います。
※私はこの縛りに気づき、daskを使うのを諦めました...
また、get_dummies
する前には、対象となるdataframeを.categorize()
を使ってカテゴリ型に変換してやる必要もあります。
import dask.array as da #dask.arrayのインポート
dda = ddf.to_dask_array(lengths=True).rechunk((-1,1)) #dask.arrayへの変換、rechunkで分割するサイズを指定
import pandas as pd
ser_header = pd.Series(ddf.columns)
ddf_getDummieCols = dda[:,ser_header[ser_header=='y'].index[0]].to_dask_dataframe(columns=['y'])
dd.get_dummies(ddf_getDummieCols.categorize())
データフレーム同士の列方向の結合(merge)
dask.dataframe同士の結合には、dask.dataframeのmergeが使用できます。
(concatでも横方向の結合はできますが、mergeのほうが早く推奨されてないようです。)
mergeは、インデックスによる結合を行うとより、高速になるようです。
また、謎のエラーが発生することがあるため(原因調査中)、単純なdataframeの横方向の結合時でも、各dataframeのインデックスは一度リセットするようにしてください。
(結合後インデックス系のエラーが出たら、一度reset_indexしてみてください)
ddf1 = ddf1.reset_index().set_index('index')
ddf2 = ddf2.reset_index().set_index('index')
dd.merge(ddf1,ddf2,left_index=True,right_index=True,how='outer')
グループ化(groupby)
dask.dataframeでも、pandas.DataFrameと同じようにgroupbyでデータのグループ化ができます。
ddf.groupby('x')
これの後ろに集約関数を付けることで、さまざま集約値が取得できます。
例:平均値
ddf.groupby('x').mean()
集約関数については、以下を参照ください。
http://pandas.pydata.org/pandas-docs/stable/basics.html#descriptive-statistics
dask.dataframeの中身の確認
dask.dataframeのデータの中身を確認する際には、ddf.compute()
(dataframe全体を読み込む。データ量が大きいときにはおすすめしない。)、ddf.head(n)
(最初のn行(引数なしならば5行)などで確認できる。
##最後に
途中にも書きましたが、daskでだいたいのことは解決するのですが、ダミー変数化をしようと思った時に結局列データがすべて必要になるためdaskが使いづらい場面が出てきます。
※調べるのに時間がかかりそうだったのでやめただけですが...
そこで、列データが全て必要なダミー変数化や、変数の変更監視については素のPythonで実装することにしましたので、次の記事でそちらについて触れたいと思います。
[【初めての大規模データ③】大規模データのダミー変数化]
(https://qiita.com/katsuki104/items/def2962af8af896f4007)