1
3

More than 1 year has passed since last update.

multiprocessingで一つのtqdmを複数プロセスから更新する

Last updated at Posted at 2021-12-30

複数プロセス(worker)を並列実行して,一つのプログレスバーを更新したい.

overview

  • Pool + imap_unordered()という手もあるが,
    • 引数にリストなどのiterableを渡さなければならない
    • 一つのiterableしかimap_unordered()の引数に渡せない.
    • workerの引数も一つしか渡せない.(単純にやるからだけど)
    • pickle化するのでリストの要素が
  • そこでProcess + queueを使ってみる.
    • リストの代わりにqueueに詰める.
    • tqdmManagerで管理する.
    • workerにqueuetqdmも渡す.
      • workerの中でtqdmをupdate

motivation

pool.imap_unordered()を使っていたけれども,各workerに渡すのが別々のものと共通のものとが混在していて,iterableを作る効率が悪い.

そこで別々のものはqueueに詰めてしまって,各workerはそこから取り出すことにする.共通のものは単にworkerの引数にわたすだけ.

code

ではまず必要なモジュールのインポート.

モジュールのインポート
from multiprocessing import Process, Manager
from multiprocessing.managers import BaseManager
from tqdm import tqdm
import queue

次にworkerの定義.

  • 引数はキューq,ロックオブジェクト,tqdmのプログレスバー,worker番号
    • worker番号iは単なる例.なんでもよい.
  • キューから取り出す.キューが空なら即終了.キューが空になるまでループ.
  • 取り出したデータを処理
  • lockしてtqdmのプログレスバーを更新
    • lockしないと更新表示がずれる
def worker(q, lock, pbar, i):

    while True:
        try:
            item = q.get(timeout=0)
        except queue.Empty:
            return

        # ここでitemを処理

        with lock:
            pbar.update(1)
            pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))

tqdmをmanagerで管理するための定義.

class TqdmManager(BaseManager):
    # https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
    pass


TqdmManager.register('Tqdm', tqdm)

ではメイン.

  • キューq,ロック,tqdmプログレスバーpbarのオブジェクトをmanagerで作成.
  • qにデータを詰める
  • Processで個数num_workerのプロセスを生成.
    • 引数はargsで指定.ここでiterable以外にいろいろ渡せる.
  • workerをスタート.
    • joinで終了まで待つ.
    • closeでクリーンアップ.
  • 最後にpbarclose
メイン
if __name__ == '__main__':

    with TqdmManager() as tqdm_manager, Manager() as manager:

        num_workers = 13
        len_data = 1182

        q = manager.Queue()
        lock = manager.Lock()
        pbar = tqdm_manager.Tqdm(total=len_data)

        for item in range(len_data):
            q.put(item)

        p_all = [Process(target=worker,
                         args=(q, lock, pbar, i))
                 for i in range(num_workers)]
        [p.start() for p in p_all]
        [p.join() for p in p_all]
        [p.close() for p in p_all]

        pbar.close()

execute

$ python3 ./queue_test.py
100%|█████████████████████████████████| 1182/1182 [00:01<00:00, 601.64it/s, 1181   1]
$ 

limitation

managerを使うと,そのための別プロセスが起動するのでやや遅くなる.

misc

  • tqdmBaseManagerで派生して管理するという例は見かけない.なんでもmanagerにできるので便利.
  • [p.start() for p in p_all]というインライン表記もあまり見かけない.なぜ?

Yet another version

queueにデータを入れるのが時間かかる場合,先にworkerをスタートしてからqueueにデータを投げ入れる.

  • queueに入れればすぐにworkerに受け渡される
  • workerではq.gettimeoutを1程度にしておく(データ待ちの時間が発生するため)
  • queueに入れるデータが尽きたら,queueに終了目印のNoneを入れておく.
    • workerの方はNoneが来たら即終了
queue_test.py
from multiprocessing import Process, Manager
from tqdm import tqdm
import queue
from multiprocessing.managers import BaseManager


def worker(q, lock, pbar, i):

    while True:
        try:
            item = q.get(timeout=1)
        except queue.Empty:
            return
        if item is None:
            return

        # ここでitemを処理

        with lock:
            pbar.update(1)
            pbar.set_postfix_str('{0:3d} {1:3d}'.format(item, i))


class TqdmManager(BaseManager):
    # https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
    pass


TqdmManager.register('Tqdm', tqdm)


if __name__ == '__main__':

    with TqdmManager() as tqdm_manager, Manager() as manager:

        num_workers = 13
        len_data = 1182

        q = manager.Queue()
        lock = manager.Lock()
        pbar = tqdm_manager.Tqdm(total=len_data)

        p_all = [Process(target=worker,
                         args=(q, lock, pbar, i))
                 for i in range(num_workers)]
        [p.start() for p in p_all]

        for item in range(len_data):
            q.put(item)
        for _ in range(num_workers):
            q.put(None)

        [p.join() for p in p_all]
        [p.close() for p in p_all]

        pbar.close()

追記:SyncManagerから派生

BaseManagerから派生させるのではなく,SyncManagerから派生させればqueueも使えた.managerが一つになるのでこのほうがよい.

SyncManagerから派生するバージョン
from multiprocessing import Process
from tqdm import tqdm
import queue
from multiprocessing.managers import SyncManager


def worker(q, lock, pbar, worker_id):

    while True:
        try:
            item = q.get(timeout=1)
        except queue.Empty:
            return
        if item is None:
            return

        # ここでitemを処理

        with lock:
            pbar.update(1)
            pbar.set_postfix_str('item {0:3d} id {1:3d}'.format(item, worker_id))


class TqdmManager(SyncManager):
    # https://docs.python.org/ja/3/library/multiprocessing.html#customized-managers
    pass


if __name__ == '__main__':

    TqdmManager.register('Tqdm', tqdm)

    with TqdmManager() as manager:

        num_workers = 13
        len_data = 1182

        q = manager.Queue()
        lock = manager.Lock()
        pbar = manager.Tqdm(total=len_data)

        p_all = [Process(target=worker,
                         args=(q, lock, pbar, worker_id))
                 for worker_id in range(num_workers)]
        [p.start() for p in p_all]

        for item in range(len_data):
            q.put(item)
        for _ in range(num_workers):
            q.put(None)

        [p.join() for p in p_all]
        [p.close() for p in p_all]

        pbar.close()

なおSyncManagerから派生せずにSyncManagerに直接registerもできるが,汚染してしまうのでやめたほうがよい.

1
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
3