やりたいこと
- 以下のようなプログラムを考える。
- PandasのDataFrameに対しgroupby()でグルーピングする。
- グルーピング後の各DataFrameに対して何かの処理(解析等)を行いグループ毎の結果を得る。
- グループ毎の処理結果をまとめて1つのデータフレームにする。
- 2の処理を並列化したい。
環境
Ubuntu 18.04.2 LTS
Python 3.6.7
Pandas 0.23.4
やり方
- 以下のようなコードで実現できる(参考リンク1)。
ソースコード
example.py
import pandas as pd
from multiprocessing import Pool, cpu_count
import random
# 各グループのDataFrameに対する処理
def func(group):
# DataFrameの取り出し
df_group = group['group']
# DataFrameに対する処理
val = df_group['value'].mean()
# 処理結果をDataFrame化し返却
return pd.DataFrame([[group['name'], val]], columns=['group_id', 'value'])
if __name__=='__main__':
# 処理対象のDataFrame
df = pd.DataFrame([[i, random.random()] for i in range(100) for j in range(1000)], columns=['group_id', 'value'])
# group_idでグルーピング
df_grouped = df.groupby('group_id')
# 各グループに対する処理を複数プロセスで実行
with Pool(cpu_count()) as p:
ret_list = p.map(func, [{'name': name, 'group': group} for name, group in df_grouped])
# 結果を1つのDataFrameに結合
df_result = pd.concat(ret_list).reset_index(drop=True)
# 結果の確認
print(df_result)
実行結果
group_id value
0 0 0.504357
1 1 0.490388
2 2 0.495620
3 3 0.501150
4 4 0.499959
5 5 0.500072
.. ... ...
95 95 0.494388
96 96 0.495345
97 97 0.504455
98 98 0.491322
99 99 0.496092
解説
- DataFrame中には100,000個の乱数($0 \leqq x < 1$)が存在し、1,000個ごとに別々の
group_id
($0 \leqq x \leqq 99$)が割り振られている。このプログラムではグループ毎の乱数の平均値を算出している。- 正直わざわざ処理を並列化するほどの重い処理でもない。しかしDataFrameの規模が大きくなったり、グループ毎の処理が複雑になったりすると並列化の恩恵が大きくなる。
- multiprocessingパッケージのPoolを使ってグルーピング後のDataFrameを1つずつデータ処理関数
func()
に渡し、CPUコア数と同じ数のfunc()
を別々のプロセスで実行することで並列化を実現している。- 参考リンクでは関数に
group
(グループ毎のDataFrame)しか渡していないが、func()
内でname
(DataFrameに対応するgroup_id
)の方も使用したいため、引数を辞書形式にして両方渡している。 - multiprocessingパッケージの処理並列化は便利だが、気をつけないとメモリの使用量が激増する(後述)。
- 参考リンクでは関数に
- 各グループの結果DataFrameを
concat()
で結合した後に、reset_index()
でindexに連番を振り直している。- 無くても動くが、func()内で明示的にindexを振らないと全行のindexが0になる。
メモリ使用量増大の対策
- 処理対象のDataFrame(最初のコードだと
df
)のサイズが大きかったり他の大きなデータを扱っていると、Poolで並列処理を始めた瞬間にメモリ使用量が激増する(数倍になる)場合がある。- サブプロセス起動時にメインプロセスの全データがサブプロセスにコピーされることが原因(参考リンク2)。
- 本記事の程度のデータサイズでは問題ないが、数GBのデータを扱っていたりすると無視できない問題になる。
- この問題を回避するためには、子プロセスの起動方法を
spawn
に設定すればよい。- 具体的にはPoolを使用する前に以下のコードを追加する。
example.py
import multiprocessing
multiprocessing.set_start_method('spawn')
- ちなみに、起動方法を
spawn
にすると子プロセスの起動のたびにpythonインタープリターの起動が発生するため、追加のオーバーヘッド時間が発生する(参考リンク3)。
処理関数に毎回同じ引数を渡したい
- データ処理関数
func()
に毎回同じ引数を渡したい場合、functoolsを使って以下のように実現できる。- この例だと
const_arg
が渡している引数である。
- この例だと
example.py
# (略)
import functools
# 各グループのDataFrameに対する処理
def func(group, const_arg):
# (略)
# 引数const_argを使った処理
val = df_group['value'].mean() * const_arg
# (略)
if __name__=='__main__':
# (略)
# 関数に渡したい引数
CONST_ARG = 2.0
# 各グループに対する処理を複数プロセスで実行
with Pool(cpu_count()) as p:
ret_list = p.map(functools.partial(func, const_arg = CONST_ARG), \
[{'name': name, 'group': group} for name, group in df_grouped])
# (略)
参考リンク
- stackoverflow - Parallelize apply after pandas groupby
- [multiprocessing.Poolがやたらメモリを消費するときの対策 - 静かなる名辞]
(https://www.haya-programming.com/entry/2018/12/28/203555) - [17.2. multiprocessing — プロセスベースの並列処理 Python 3.6.5 ドキュメント]
(https://docs.python.jp/3/library/multiprocessing.html)