複数プロセス(worker)を並列実行して,一つのプログレスバーを更新したい.
overview
-
Pool+imap_unordered()という手もあるが,- 引数にリストなどのiterableを渡さなければならない
- 一つのiterableしか
imap_unordered()の引数に渡せない. - workerの引数も一つしか渡せない.(単純にやるからだけど)
- pickle化するのでリストの要素が
- そこで
Process+queueを使ってみる.- リストの代わりに
queueに詰める. -
tqdmをManagerで管理する. - workerに
queueとtqdmも渡す.- workerの中で
tqdmをupdate
- workerの中で
- リストの代わりに
motivation
pool.imap_unordered()を使っていたけれども,各workerに渡すのが別々のものと共通のものとが混在していて,iterableを作る効率が悪い.
そこで別々のものはqueueに詰めてしまって,各workerはそこから取り出すことにする.共通のものは単にworkerの引数にわたすだけ.
code
ではまず必要なモジュールのインポート.
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
from tqdm import tqdm
import queue
次にworkerの定義.
- 引数はキューq,ロックオブジェクト,tqdmのプログレスバー,worker番号
- worker番号
iは単なる例.なんでもよい.
- worker番号
- キューから取り出す.キューが空なら即終了.キューが空になるまでループ.
- 取り出したデータを処理
- lockしてtqdmのプログレスバーを更新
- lockしないと更新表示がずれる
def worker(q, lock, pbar, i):
while True:
try:
item = q.get(timeout=0)
except queue.Empty:
return
# ここでitemを処理
with lock:
pbar.update(1)
pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))
tqdmをmanagerで管理するための定義.
class TqdmManager(BaseManager):
# https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
pass
TqdmManager.register('Tqdm', tqdm)
ではメイン.
- キュー
q,ロック,tqdmプログレスバーpbarのオブジェクトをmanagerで作成. -
qにデータを詰める -
Processで個数num_workerのプロセスを生成.- 引数は
argsで指定.ここでiterable以外にいろいろ渡せる.
- 引数は
- workerをスタート.
-
joinで終了まで待つ. -
closeでクリーンアップ.
-
- 最後に
pbarをclose
if __name__ == '__main__':
with TqdmManager() as tqdm_manager, Manager() as manager:
num_workers = 13
len_data = 1182
q = manager.Queue()
lock = manager.Lock()
pbar = tqdm_manager.Tqdm(total=len_data)
for item in range(len_data):
q.put(item)
p_all = [Process(target=worker,
args=(q, lock, pbar, i))
for i in range(num_workers)]
[p.start() for p in p_all]
[p.join() for p in p_all]
[p.close() for p in p_all]
pbar.close()
execute
$ python3 ./queue_test.py
100%|█████████████████████████████████| 1182/1182 [00:01<00:00, 601.64it/s, 1181 1]
$
limitation
managerを使うと,そのための別プロセスが起動するのでやや遅くなる.
misc
-
tqdmをBaseManagerで派生して管理するという例は見かけない.なんでもmanagerにできるので便利. -
[p.start() for p in p_all]というインライン表記もあまり見かけない.なぜ?
Yet another version
queueにデータを入れるのが時間かかる場合,先にworkerをスタートしてからqueueにデータを投げ入れる.
- queueに入れればすぐにworkerに受け渡される
- workerでは
q.getのtimeoutを1程度にしておく(データ待ちの時間が発生するため) - queueに入れるデータが尽きたら,queueに終了目印の
Noneを入れておく.- workerの方は
Noneが来たら即終了
- workerの方は
from multiprocessing import Process, Manager
from tqdm import tqdm
import queue
from multiprocessing.managers import BaseManager
def worker(q, lock, pbar, i):
while True:
try:
item = q.get(timeout=1)
except queue.Empty:
return
if item is None:
return
# ここでitemを処理
with lock:
pbar.update(1)
pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))
class TqdmManager(BaseManager):
# https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
pass
TqdmManager.register('Tqdm', tqdm)
if __name__ == '__main__':
with TqdmManager() as tqdm_manager, Manager() as manager:
num_workers = 13
len_data = 1182
q = manager.Queue()
lock = manager.Lock()
pbar = tqdm_manager.Tqdm(total=len_data)
p_all = [Process(target=worker,
args=(q, lock, pbar, i))
for i in range(num_workers)]
[p.start() for p in p_all]
for item in range(len_data):
q.put(item)
for _ in range(num_workers):
q.put(None)
[p.join() for p in p_all]
[p.close() for p in p_all]
pbar.close()
追記:SyncManagerから派生
BaseManagerから派生させるのではなく,SyncManagerから派生させればqueueも使えた.managerが一つになるのでこのほうがよい.
from multiprocessing import Process
from tqdm import tqdm
import queue
from multiprocessing.managers import SyncManager
def worker(q, lock, pbar, worker_id):
while True:
try:
item = q.get(timeout=1)
except queue.Empty:
return
if item is None:
return
# ここでitemを処理
with lock:
pbar.update(1)
pbar.set_postfix_str('item {0:3d} id {1:3d}'.format(item, worker_id))
class TqdmManager(SyncManager):
# https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
pass
if __name__ == '__main__':
TqdmManager.register('Tqdm', tqdm)
with TqdmManager() as manager:
num_workers = 13
len_data = 1182
q = manager.Queue()
lock = manager.Lock()
pbar = manager.Tqdm(total=len_data)
p_all = [Process(target=worker,
args=(q, lock, pbar, worker_id))
for worker_id in range(num_workers)]
[p.start() for p in p_all]
for item in range(len_data):
q.put(item)
for _ in range(num_workers):
q.put(None)
[p.join() for p in p_all]
[p.close() for p in p_all]
pbar.close()
なおSyncManagerから派生せずにSyncManagerに直接registerもできるが,汚染してしまうのでやめたほうがよい.