multipocessingの進捗を表示する
初めに、プログラム全体を以下に示す。
import os
import time
import random
from tqdm import tqdm
from multiprocessing import Process,cpu_count,Pool
def hoge(a,b,c):
"""
並列化したい処理が書かれている関数
"""
time.sleep(random.random()*5)
with open("tmp.txt","a") as f:
f.write("{}を{}個{}で購入した\n".format(a,b,c))
def wrap_hoge(params):
"""
imap_unorderedの引数で指定する関数オブジェクトは1引数しか取れないため、hogeをラップする
"""
hoge(*params)
params = []
param1s = ["リンゴ","ミカン","メロン"]
param2s = [1,2,3]
param3s = ["A店","B店","C店"]
for param1 in param1s:
for param2 in param2s:
for param3 in param3s:
params.append((param1,param2,param3))
t = tqdm(total=len(params))
with Pool(int(os.cpu_count()*0.75)) as pool:
process_iterable = pool.imap_unordered(wrap_hoge,params)
for i in process_iterable:
t.update()
tqdmは以下のように書くことで、最大数を指定することができる。そして、1つの処理が終了するごとにt.update()を呼び出すことでプログレスバーが進んでいき、totalで指定した回数だけt.update()を呼び出した時にプログラスバーが100パーセントになる。
t = tqdm(total=len(params))
pool.imap_unorderedに実行したい関数と引数のリストを与えると、イテレータが返ってくる。
このイテレータはどれか1つのプロセスが終わると, for文が1回実行される。そのため、プロセスが終了した個数だけ、for文を進めることができる。今回は、それぞれのプロセスの終了順は気にしないのでimap_unorderedを使用した。終了順を気にする場合は、imapを使用すればよい。
process_iterable = pool.imap_unordered(wrap_hoge,params)
for i in process_iterable:
pool.imap_unorderedに与える関数は引数が1つという制限がある。そのため、複数の引数を指定可能にするためにtupleを受け取って、その中身をunpackして関数に与える関数(wrap_hoge)でラップすることで、複数の引数の関数の呼び出しを可能にする。
def wrap_hoge(params):
"""
imap_unorderedの引数で指定する関数オブジェクトは1引数しか取れないため、hogeをラップする
"""
hoge(*params)
実行結果は以下のようになる。
tmp.txt
リンゴを2個C店で購入した
リンゴを2個B店で購入した
リンゴを1個B店で購入した
リンゴを1個C店で購入した
ミカンを1個C店で購入した
リンゴを2個A店で購入した
ミカンを1個B店で購入した
ミカンを2個A店で購入した
ミカンを2個C店で購入した
リンゴを3個C店で購入した
リンゴを1個A店で購入した
ミカンを3個B店で購入した
ミカンを1個A店で購入した
ミカンを3個A店で購入した
リンゴを3個A店で購入した
リンゴを3個B店で購入した
メロンを2個B店で購入した
ミカンを2個B店で購入した
ミカンを3個C店で購入した
メロンを2個A店で購入した
メロンを1個C店で購入した
メロンを1個A店で購入した
メロンを2個C店で購入した
メロンを1個B店で購入した
メロンを3個B店で購入した
メロンを3個A店で購入した
メロンを3個C店で購入した