はじめに
Pythonのmultiprocessing
モジュールを用いて複数のプロセスでタスクを実行する際、各プロセスから出力されるログを適切に管理することは、デバッグやモニタリングにおいて非常に重要です。しかし、複数のプロセスから同じロガーにログを出力する場合、ログが重複して出力されるという問題が発生することがあります。この記事では、この問題の原因と、initializer
関数を利用した解決策について詳しく解説します。
問題の発生状況
Pythonのmultiprocessing
モジュールを用いて複数のプロセスでタスクを実行し、ログを取得する際に、特定のプロセスのログが重複して出力される問題が発生しました。
具体的な例:
次のコードを実行した結果、SpawnPoolWorker-3
のプロセスからのログが重複して出力されていることが確認されました。
def worker_process(t: float, queue: multiprocessing.Queue) -> float:
logger = logging.getLogger()
handler = logging.handlers.QueueHandler(queue)
logger.addHandler(handler)
logger.setLevel(logging.DEBUG)
logger.info(f'Waiting for {t} seconds')
time.sleep(t)
logger.info(f'Processed after {t} seconds')
return t
queue = multiprocessing.Manager().Queue(-1)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(processName)s %(message)s")
handler.setFormatter(formatter)
listener = logging.handlers.QueueListener(queue, handler)
listener.start()
args = [(t, queue) for t in [0.5, 0.1, 0.1, 0.1]]
with multiprocessing.Pool(processes=2) as pool:
ret = pool.starmap(worker_process, args)
#...
出力
SpawnPoolWorker-2 Waiting for 0.5 seconds
SpawnPoolWorker-3 Waiting for 0.1 seconds
SpawnPoolWorker-3 Processed after 0.1 seconds
SpawnPoolWorker-3 Waiting for 0.1 seconds # <-
SpawnPoolWorker-3 Waiting for 0.1 seconds # <-
SpawnPoolWorker-3 Processed after 0.1 seconds # <-
SpawnPoolWorker-3 Processed after 0.1 seconds # <-
SpawnPoolWorker-3 Waiting for 0.1 seconds # <-
SpawnPoolWorker-3 Waiting for 0.1 seconds # <-
SpawnPoolWorker-3 Waiting for 0.1 seconds # <-
SpawnPoolWorker-3 Processed after 0.1 seconds # <-
SpawnPoolWorker-3 Processed after 0.1 seconds # <-
SpawnPoolWorker-3 Processed after 0.1 seconds # <-
SpawnPoolWorker-2 Processed after 0.5 seconds
問題の原因
この問題は、以下の2つの要因が複合的に作用することで発生しています。
- 各ワーカープロセスが独立したロガーインスタンスを持つ: 各ワーカープロセスは、それぞれ独立したPythonプロセスとして実行されるため、独自のロガーインスタンスを持ちます。
-
ロガーの初期化が適切に行われていない:
worker_process
関数内で毎回新しいロガーを作成しているため、各ワーカープロセスが複数のロガーインスタンスを持つ可能性があります。
解決策:initializer
関数による初期化
multiprocessing.Pool
のinitializer
引数に渡す関数worker_init
を利用することで、この問題を解決できます。worker_init
関数は、各ワーカープロセスが開始される前に呼び出される関数です。この関数内で、各ワーカープロセスに独自のロガーを設定することで、ログの重複を回避することができます。
コードの解説
1. worker_init
関数の定義
def worker_init(queue: multiprocessing.Queue):
handler = logging.handlers.QueueHandler(queue)
root = logging.getLogger()
root.addHandler(handler)
root.setLevel(logging.DEBUG)
root.info('Worker initialized')
-
queue
: ログをメインプロセスに転送するためのキュー - 各ワーカープロセスで、
logging.getLogger()
でルートロガーを取得し、キューハンドラーを追加します。 - ルートロガーのレベルを
DEBUG
に設定します。
2. worker_process
関数
def worker_process(t: float) -> float:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
# ...
worker_process
関数内では、worker_init
で設定されたロガーを使用するため、新たにキューハンドラーを追加する必要はありません。
3. multiprocessing.Pool
の初期化
with multiprocessing.Pool(
processes=2,
initializer=worker_init,
initargs=(queue,)
) as pool:
# ...
multiprocessing.Pool
を初期化する際に、initializer
引数にworker_init
関数を、initargs
引数にキューを渡します。これにより、各ワーカープロセスが開始される前にworker_init
関数が呼び出され、ロガーが初期化されます。
改善点
- ログの重複防止: 各ワーカープロセスが独立したロガーを持つため、ログが重複して出力される問題が解消されます。
-
コードの簡潔化:
worker_process
関数内でロガーを毎回作成する必要がなくなります。 - 可読性の向上: コードの構造がより明確になり、保守性が向上します。
まとめ
Pythonのmultiprocessing
モジュールを用いた並列処理において、ログの重複はよくある問題です。initializer
関数を利用することで、各ワーカープロセスに独自のロガーを設定し、この問題を効果的に解決することができます。本記事で紹介した手法は、複数のプロセスからログを出力する際のベストプラクティスと言えるでしょう。
注意点
-
worker_init
関数で設定したロガーは、そのプロセス内の全てのモジュールで共有されます。 -
multiprocessing.Manager
を利用してキューを作成することで、プロセス間でデータを共有することができます。
最終的なコードのそ出力結果
コード
# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
# Next import lines for this demo only
import time
def worker_init(queue: multiprocessing.Queue):
handler = logging.handlers.QueueHandler(queue)
root = logging.getLogger()
root.addHandler(handler)
root.setLevel(logging.DEBUG)
root.info('Worker initialized')
def worker_process(t: float) -> float:
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
logger.info(f'Waiting for {t} seconds')
time.sleep(t)
logger.info(f'Processed after {t} seconds')
return t
def main():
queue = multiprocessing.Manager().Queue(-1)
handler = logging.StreamHandler()
formatter = logging.Formatter("%(processName)s %(message)s")
handler.setFormatter(formatter)
listener = logging.handlers.QueueListener(queue, handler)
listener.start()
args = [0.5, 0.1, 0.1, 0.1]
with multiprocessing.Pool(
processes=2,
initializer=worker_init,
initargs=(queue,)
) as pool:
ret = pool.map(worker_process, args)
queue.put_nowait(None)
print(ret)
if __name__ == '__main__':
main()
出力
SpawnPoolWorker-2 Worker initialized
SpawnPoolWorker-2 Waiting for 0.5 seconds
SpawnPoolWorker-3 Worker initialized
SpawnPoolWorker-3 Waiting for 0.1 seconds
SpawnPoolWorker-3 Processed after 0.1 seconds
SpawnPoolWorker-3 Waiting for 0.1 seconds
SpawnPoolWorker-3 Processed after 0.1 seconds
SpawnPoolWorker-3 Waiting for 0.1 seconds
SpawnPoolWorker-3 Processed after 0.1 seconds
SpawnPoolWorker-2 Processed after 0.5 seconds
[0.5, 0.1, 0.1, 0.1]