9
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

PandasでDataFrameにgroupbyを適用した後の処理を並列化する(on Linux)

Last updated at Posted at 2019-02-25

やりたいこと

  • 以下のようなプログラムを考える。
  1. PandasのDataFrameに対しgroupby()でグルーピングする。
  2. グルーピング後の各DataFrameに対して何かの処理(解析等)を行いグループ毎の結果を得る。
  3. グループ毎の処理結果をまとめて1つのデータフレームにする。
  • 2の処理を並列化したい。

環境

Ubuntu 18.04.2 LTS
Python 3.6.7
Pandas 0.23.4

やり方

ソースコード

example.py
import pandas as pd
from multiprocessing import Pool, cpu_count
import random

# 各グループのDataFrameに対する処理
def func(group):
    # DataFrameの取り出し
    df_group = group['group']

    # DataFrameに対する処理
    val = df_group['value'].mean()

    # 処理結果をDataFrame化し返却
    return pd.DataFrame([[group['name'], val]], columns=['group_id', 'value'])

if __name__=='__main__':
    # 処理対象のDataFrame
    df = pd.DataFrame([[i, random.random()] for i in range(100) for j in range(1000)], columns=['group_id', 'value'])

    # group_idでグルーピング 
    df_grouped = df.groupby('group_id')

    # 各グループに対する処理を複数プロセスで実行
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [{'name': name, 'group': group} for name, group in df_grouped])

    # 結果を1つのDataFrameに結合
    df_result = pd.concat(ret_list).reset_index(drop=True)

    # 結果の確認
    print(df_result)

実行結果

    group_id     value
0          0  0.504357
1          1  0.490388
2          2  0.495620
3          3  0.501150
4          4  0.499959
5          5  0.500072
..       ...       ...
95        95  0.494388
96        96  0.495345
97        97  0.504455
98        98  0.491322
99        99  0.496092

解説

  • DataFrame中には100,000個の乱数($0 \leqq x < 1$)が存在し、1,000個ごとに別々のgroup_id($0 \leqq x \leqq 99$)が割り振られている。このプログラムではグループ毎の乱数の平均値を算出している。
    • 正直わざわざ処理を並列化するほどの重い処理でもない。しかしDataFrameの規模が大きくなったり、グループ毎の処理が複雑になったりすると並列化の恩恵が大きくなる。
  • multiprocessingパッケージのPoolを使ってグルーピング後のDataFrameを1つずつデータ処理関数func()に渡し、CPUコア数と同じ数のfunc()を別々のプロセスで実行することで並列化を実現している。
    • 参考リンクでは関数にgroup(グループ毎のDataFrame)しか渡していないが、func()内でname(DataFrameに対応するgroup_id)の方も使用したいため、引数を辞書形式にして両方渡している。
    • multiprocessingパッケージの処理並列化は便利だが、気をつけないとメモリの使用量が激増する(後述)。
  • 各グループの結果DataFrameをconcat()で結合した後に、reset_index()でindexに連番を振り直している。
    • 無くても動くが、func()内で明示的にindexを振らないと全行のindexが0になる。

メモリ使用量増大の対策

  • 処理対象のDataFrame(最初のコードだとdf)のサイズが大きかったり他の大きなデータを扱っていると、Poolで並列処理を始めた瞬間にメモリ使用量が激増する(数倍になる)場合がある。
    • サブプロセス起動時にメインプロセスの全データがサブプロセスにコピーされることが原因(参考リンク2)。
    • 本記事の程度のデータサイズでは問題ないが、数GBのデータを扱っていたりすると無視できない問題になる。
  • この問題を回避するためには、子プロセスの起動方法をspawnに設定すればよい。
    • 具体的にはPoolを使用する前に以下のコードを追加する。
example.py
import multiprocessing
multiprocessing.set_start_method('spawn')
  • ちなみに、起動方法をspawnにすると子プロセスの起動のたびにpythonインタープリターの起動が発生するため、追加のオーバーヘッド時間が発生する(参考リンク3)。

処理関数に毎回同じ引数を渡したい

  • データ処理関数func()に毎回同じ引数を渡したい場合、functoolsを使って以下のように実現できる。
    • この例だとconst_argが渡している引数である。
example.py
# (略)
import functools

# 各グループのDataFrameに対する処理
def func(group, const_arg):
    # (略)
    # 引数const_argを使った処理
    val = df_group['value'].mean() * const_arg
    # (略)

if __name__=='__main__':
    # (略)
    # 関数に渡したい引数
    CONST_ARG = 2.0
    # 各グループに対する処理を複数プロセスで実行
    with Pool(cpu_count()) as p:
        ret_list = p.map(functools.partial(func, const_arg = CONST_ARG), \
                         [{'name': name, 'group': group} for name, group in df_grouped])
    # (略)

参考リンク

  1. stackoverflow - Parallelize apply after pandas groupby
  2. [multiprocessing.Poolがやたらメモリを消費するときの対策 - 静かなる名辞]
    (https://www.haya-programming.com/entry/2018/12/28/203555)
  3. [17.2. multiprocessing — プロセスベースの並列処理 Python 3.6.5 ドキュメント]
    (https://docs.python.jp/3/library/multiprocessing.html)
9
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?