複数プロセス(worker)を並列実行して,一つのプログレスバーを更新したい.
overview
-
Pool
+imap_unordered()
という手もあるが,- 引数にリストなどのiterableを渡さなければならない
- 一つのiterableしか
imap_unordered()
の引数に渡せない. - workerの引数も一つしか渡せない.(単純にやるからだけど)
- pickle化するのでリストの要素が
- そこで
Process
+queue
を使ってみる.- リストの代わりに
queue
に詰める. -
tqdm
をManager
で管理する. - workerに
queue
とtqdm
も渡す.- workerの中で
tqdm
をupdate
- workerの中で
- リストの代わりに
motivation
pool.imap_unordered()
を使っていたけれども,各workerに渡すのが別々のものと共通のものとが混在していて,iterableを作る効率が悪い.
そこで別々のものはqueueに詰めてしまって,各workerはそこから取り出すことにする.共通のものは単にworkerの引数にわたすだけ.
code
ではまず必要なモジュールのインポート.
モジュールのインポート
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
from tqdm import tqdm
import queue
次にworkerの定義.
- 引数はキューq,ロックオブジェクト,tqdmのプログレスバー,worker番号
- worker番号
i
は単なる例.なんでもよい.
- worker番号
- キューから取り出す.キューが空なら即終了.キューが空になるまでループ.
- 取り出したデータを処理
- lockしてtqdmのプログレスバーを更新
- lockしないと更新表示がずれる
def worker(q, lock, pbar, i):
while True:
try:
item = q.get(timeout=0)
except queue.Empty:
return
# ここでitemを処理
with lock:
pbar.update(1)
pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))
tqdmをmanagerで管理するための定義.
class TqdmManager(BaseManager):
# https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
pass
TqdmManager.register('Tqdm', tqdm)
ではメイン.
- キュー
q
,ロック,tqdmプログレスバーpbar
のオブジェクトをmanagerで作成. -
q
にデータを詰める -
Process
で個数num_worker
のプロセスを生成.- 引数は
args
で指定.ここでiterable以外にいろいろ渡せる.
- 引数は
- workerをスタート.
-
join
で終了まで待つ. -
close
でクリーンアップ.
-
- 最後に
pbar
をclose
メイン
if __name__ == '__main__':
with TqdmManager() as tqdm_manager, Manager() as manager:
num_workers = 13
len_data = 1182
q = manager.Queue()
lock = manager.Lock()
pbar = tqdm_manager.Tqdm(total=len_data)
for item in range(len_data):
q.put(item)
p_all = [Process(target=worker,
args=(q, lock, pbar, i))
for i in range(num_workers)]
[p.start() for p in p_all]
[p.join() for p in p_all]
[p.close() for p in p_all]
pbar.close()
execute
$ python3 ./queue_test.py
100%|█████████████████████████████████| 1182/1182 [00:01<00:00, 601.64it/s, 1181 1]
$
limitation
managerを使うと,そのための別プロセスが起動するのでやや遅くなる.
misc
-
tqdm
をBaseManager
で派生して管理するという例は見かけない.なんでもmanagerにできるので便利. -
[p.start() for p in p_all]
というインライン表記もあまり見かけない.なぜ?
Yet another version
queueにデータを入れるのが時間かかる場合,先にworkerをスタートしてからqueueにデータを投げ入れる.
- queueに入れればすぐにworkerに受け渡される
- workerでは
q.get
のtimeout
を1程度にしておく(データ待ちの時間が発生するため) - queueに入れるデータが尽きたら,queueに終了目印の
None
を入れておく.- workerの方は
None
が来たら即終了
- workerの方は
queue_test.py
from multiprocessing import Process, Manager
from tqdm import tqdm
import queue
from multiprocessing.managers import BaseManager
def worker(q, lock, pbar, i):
while True:
try:
item = q.get(timeout=1)
except queue.Empty:
return
if item is None:
return
# ここでitemを処理
with lock:
pbar.update(1)
pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))
class TqdmManager(BaseManager):
# https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
pass
TqdmManager.register('Tqdm', tqdm)
if __name__ == '__main__':
with TqdmManager() as tqdm_manager, Manager() as manager:
num_workers = 13
len_data = 1182
q = manager.Queue()
lock = manager.Lock()
pbar = tqdm_manager.Tqdm(total=len_data)
p_all = [Process(target=worker,
args=(q, lock, pbar, i))
for i in range(num_workers)]
[p.start() for p in p_all]
for item in range(len_data):
q.put(item)
for _ in range(num_workers):
q.put(None)
[p.join() for p in p_all]
[p.close() for p in p_all]
pbar.close()
追記:SyncManagerから派生
BaseManager
から派生させるのではなく,SyncManager
から派生させればqueueも使えた.managerが一つになるのでこのほうがよい.
SyncManagerから派生するバージョン
from multiprocessing import Process
from tqdm import tqdm
import queue
from multiprocessing.managers import SyncManager
def worker(q, lock, pbar, worker_id):
while True:
try:
item = q.get(timeout=1)
except queue.Empty:
return
if item is None:
return
# ここでitemを処理
with lock:
pbar.update(1)
pbar.set_postfix_str('item {0:3d} id {1:3d}'.format(item, worker_id))
class TqdmManager(SyncManager):
# https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
pass
if __name__ == '__main__':
TqdmManager.register('Tqdm', tqdm)
with TqdmManager() as manager:
num_workers = 13
len_data = 1182
q = manager.Queue()
lock = manager.Lock()
pbar = manager.Tqdm(total=len_data)
p_all = [Process(target=worker,
args=(q, lock, pbar, worker_id))
for worker_id in range(num_workers)]
[p.start() for p in p_all]
for item in range(len_data):
q.put(item)
for _ in range(num_workers):
q.put(None)
[p.join() for p in p_all]
[p.close() for p in p_all]
pbar.close()
なおSyncManager
から派生せずにSyncManager
に直接registerもできるが,汚染してしまうのでやめたほうがよい.