LoginSignup
18
16

More than 5 years have passed since last update.

【初めての大規模データ②】Daskでの並列分散処理

Last updated at Posted at 2019-01-14

はじめに

このページではPythonの並列分散処理ライブラリDaskの使い方をまとめます。
このDaskでは、pandas・Numpyライクな操作で、目盛りなデータ処理が実現できます。
詳細は以下ページをご参照ください。

公式ページ

Daskのインストール方法

pythonをanacondaでインストールしている方は、デフォルトでインストールされていると思います。
以下のコマンドで、Daskのバージョンが表示されればOKです。

conda list dask

もし、表示されない場合はpip install daskでインストールしてください。
もしくは(condaコマンドが使える場合は)conda install dask

基本的な使い方

テキストファイルの読み込み(read_csv)

Daskによるテキストファイルの読み込みには、dask.dataframeのread_csvを使用します。

 import dask.dataframe as dd

 ddf = dd.read_csv('test.csv')

基本的な使い方はpandas.DataFrameのread_csvと同様です。
ただし、この時点で例えば20GBぐらいのデータでも読み込むことができます!Dask凄いぞ!

詳しくは、下記をご参照ください。
公式ページ

テキストファイルへの書き出し(to_csv)

Daskデータフレームをテキストファイルに書き出すには、dask.DataFrameのto_csvを使用します。

例1

ddf.to_csv('test.csv')

結果を複数ファイルに分割して出力したい場合、以下の様に書きます。

例2
ddf.to_csv('test_*.csv')

その他、基本的な使い方はpandas.DataFrameのto_csvと同様です。

詳しくは、下記をご参照ください。

 https://docs.dask.org/en/latest/dataframe-api.html#dask.dataframe.Series.to_csv

データ処理

データの変換(where,mask,apply)

データの変換方法については、where、mask、applyを使った3種類を記載します。

where

例:ddf['x']の0以下の値をnanにする。


ddf['x'] = ddf['x'].where(ddf['x'] > 0,np.nan)

(※whereは、そのままにしたい値の条件を指定する。)

mask

例:ddf['x']の0以下の値をnanにする。


ddf['x'] = ddf['x'].mask(ddf['x'] <= 0,np.nan)

(maskは変更したい値の条件を指定する)

### apply
:ddf['x']の値をキーにしてdict_changesの値に置き換える

```python

dict_changes = dict({'0':'A','1':'B','2':'C'})

ddf['x'] = ddf['x'].apply(lambda x:dict_cahnges[x],meta=dict(('x','object')))

(※metaオプションで各カラムのデータ型を指定する)

区切り文字による分割(str.split)

データフレームの区切り文字による分割には、str.splitが使えます。
以下の様に書くと、ddf['y']列を「:」で分割した配列を値にもつ、シリーズが生成される。


ddf['y'] = ddf['y'].str.split(':')

  

ダミー変数化(get_dummies)

データのダミー変数化には、dask.dataframeのget_dummiesが使用できます。
ただし、dask.dataframeは行方向にしかデータを分割できないため、ダミー変数を作成する際には1列分のデータを取得するために、すべてのデータを読み込まなければならず、メモリエラーを起こす危険性があります。
そこで、大規模データの処理を行う際には、dask.dataframeを一度dask.arrayに1列ずつに分割した形で変換し、
それから1列分のデータのみを再度dask.dataframeに変換し、get_dummiesしてやるのが良いと思います。
※私はこの縛りに気づき、daskを使うのを諦めました...

また、get_dummiesする前には、対象となるdataframeを.categorize()を使ってカテゴリ型に変換してやる必要もあります。


import dask.array as da #dask.arrayのインポート

dda = ddf.to_dask_array(lengths=True).rechunk((-1,1)) #dask.arrayへの変換、rechunkで分割するサイズを指定



import pandas as pd

ser_header = pd.Series(ddf.columns)



ddf_getDummieCols = dda[:,ser_header[ser_header=='y'].index[0]].to_dask_dataframe(columns=['y'])

dd.get_dummies(ddf_getDummieCols.categorize())

データフレーム同士の列方向の結合(merge)

dask.dataframe同士の結合には、dask.dataframeのmergeが使用できます。
(concatでも横方向の結合はできますが、mergeのほうが早く推奨されてないようです。)

mergeは、インデックスによる結合を行うとより、高速になるようです。
また、謎のエラーが発生することがあるため(原因調査中)、単純なdataframeの横方向の結合時でも、各dataframeのインデックスは一度リセットするようにしてください。
(結合後インデックス系のエラーが出たら、一度reset_indexしてみてください)


ddf1 = ddf1.reset_index().set_index('index')

ddf2 = ddf2.reset_index().set_index('index')

dd.merge(ddf1,ddf2,left_index=True,right_index=True,how='outer')

グループ化(groupby)

dask.dataframeでも、pandas.DataFrameと同じようにgroupbyでデータのグループ化ができます。


ddf.groupby('x')

これの後ろに集約関数を付けることで、さまざま集約値が取得できます。
例:平均値


ddf.groupby('x').mean()

集約関数については、以下を参照ください。

 http://pandas.pydata.org/pandas-docs/stable/basics.html#descriptive-statistics

dask.dataframeの中身の確認

dask.dataframeのデータの中身を確認する際には、ddf.compute()(dataframe全体を読み込む。データ量が大きいときにはおすすめしない。)、ddf.head(n)(最初のn行(引数なしならば5行)などで確認できる。

最後に

途中にも書きましたが、daskでだいたいのことは解決するのですが、ダミー変数化をしようと思った時に結局列データがすべて必要になるためdaskが使いづらい場面が出てきます。
※調べるのに時間がかかりそうだったのでやめただけですが...

そこで、列データが全て必要なダミー変数化や、変数の変更監視については素のPythonで実装することにしましたので、次の記事でそちらについて触れたいと思います。
【初めての大規模データ③】大規模データのダミー変数化

18
16
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
16