2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonのマルチプロセスでログが重複する問題:発生状況と解決策

Posted at

はじめに

Pythonのmultiprocessingモジュールを用いて複数のプロセスでタスクを実行する際、各プロセスから出力されるログを適切に管理することは、デバッグやモニタリングにおいて非常に重要です。しかし、複数のプロセスから同じロガーにログを出力する場合、ログが重複して出力されるという問題が発生することがあります。この記事では、この問題の原因と、initializer関数を利用した解決策について詳しく解説します。

問題の発生状況

Pythonのmultiprocessingモジュールを用いて複数のプロセスでタスクを実行し、ログを取得する際に、特定のプロセスのログが重複して出力される問題が発生しました。

具体的な例:

次のコードを実行した結果、SpawnPoolWorker-3のプロセスからのログが重複して出力されていることが確認されました。

def worker_process(t: float, queue: multiprocessing.Queue) -> float:
    logger = logging.getLogger()
    handler = logging.handlers.QueueHandler(queue)
    logger.addHandler(handler)
    logger.setLevel(logging.DEBUG)
 
    logger.info(f'Waiting for {t} seconds')
    time.sleep(t)
    logger.info(f'Processed after {t} seconds')
    return t
 
 
queue = multiprocessing.Manager().Queue(-1)

handler = logging.StreamHandler()
formatter = logging.Formatter("%(processName)s %(message)s")
handler.setFormatter(formatter)
listener = logging.handlers.QueueListener(queue, handler)
listener.start()
 
args = [(t, queue) for t in [0.5, 0.1, 0.1, 0.1]]
with multiprocessing.Pool(processes=2) as pool:
    ret = pool.starmap(worker_process, args)
#...

出力

SpawnPoolWorker-2 Waiting for 0.5 seconds
SpawnPoolWorker-3 Waiting for 0.1 seconds
SpawnPoolWorker-3 Processed after 0.1 seconds
SpawnPoolWorker-3 Waiting for 0.1 seconds  # <-
SpawnPoolWorker-3 Waiting for 0.1 seconds  # <-
SpawnPoolWorker-3 Processed after 0.1 seconds  # <-
SpawnPoolWorker-3 Processed after 0.1 seconds  # <-
SpawnPoolWorker-3 Waiting for 0.1 seconds  # <-
SpawnPoolWorker-3 Waiting for 0.1 seconds  # <-
SpawnPoolWorker-3 Waiting for 0.1 seconds  # <-
SpawnPoolWorker-3 Processed after 0.1 seconds  # <-
SpawnPoolWorker-3 Processed after 0.1 seconds  # <-
SpawnPoolWorker-3 Processed after 0.1 seconds  # <-
SpawnPoolWorker-2 Processed after 0.5 seconds

問題の原因

この問題は、以下の2つの要因が複合的に作用することで発生しています。

  1. 各ワーカープロセスが独立したロガーインスタンスを持つ: 各ワーカープロセスは、それぞれ独立したPythonプロセスとして実行されるため、独自のロガーインスタンスを持ちます。
  2. ロガーの初期化が適切に行われていない: worker_process関数内で毎回新しいロガーを作成しているため、各ワーカープロセスが複数のロガーインスタンスを持つ可能性があります。

解決策:initializer関数による初期化

multiprocessing.Poolinitializer引数に渡す関数worker_initを利用することで、この問題を解決できます。worker_init関数は、各ワーカープロセスが開始される前に呼び出される関数です。この関数内で、各ワーカープロセスに独自のロガーを設定することで、ログの重複を回避することができます。

コードの解説

1. worker_init関数の定義

def worker_init(queue: multiprocessing.Queue):
    handler = logging.handlers.QueueHandler(queue)
    root = logging.getLogger()
    root.addHandler(handler)
    root.setLevel(logging.DEBUG)
    root.info('Worker initialized')
  • queue: ログをメインプロセスに転送するためのキュー
  • 各ワーカープロセスで、logging.getLogger()でルートロガーを取得し、キューハンドラーを追加します。
  • ルートロガーのレベルをDEBUGに設定します。

2. worker_process関数

def worker_process(t: float) -> float:
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
    # ...

worker_process関数内では、worker_initで設定されたロガーを使用するため、新たにキューハンドラーを追加する必要はありません。

3. multiprocessing.Poolの初期化

with multiprocessing.Pool(
    processes=2,
    initializer=worker_init,
    initargs=(queue,)
) as pool:
    # ...

multiprocessing.Poolを初期化する際に、initializer引数にworker_init関数を、initargs引数にキューを渡します。これにより、各ワーカープロセスが開始される前にworker_init関数が呼び出され、ロガーが初期化されます。

改善点

  • ログの重複防止: 各ワーカープロセスが独立したロガーを持つため、ログが重複して出力される問題が解消されます。
  • コードの簡潔化: worker_process関数内でロガーを毎回作成する必要がなくなります。
  • 可読性の向上: コードの構造がより明確になり、保守性が向上します。

まとめ

Pythonのmultiprocessingモジュールを用いた並列処理において、ログの重複はよくある問題です。initializer関数を利用することで、各ワーカープロセスに独自のロガーを設定し、この問題を効果的に解決することができます。本記事で紹介した手法は、複数のプロセスからログを出力する際のベストプラクティスと言えるでしょう。

注意点

  • worker_init関数で設定したロガーは、そのプロセス内の全てのモジュールで共有されます。
  • multiprocessing.Managerを利用してキューを作成することで、プロセス間でデータを共有することができます。

最終的なコードのそ出力結果

コード

# You'll need these imports in your own code
import logging
import logging.handlers
import multiprocessing
 
# Next import lines for this demo only
import time
 
 
def worker_init(queue: multiprocessing.Queue):
    handler = logging.handlers.QueueHandler(queue)
    root = logging.getLogger()
    root.addHandler(handler)
    root.setLevel(logging.DEBUG)
    root.info('Worker initialized')


def worker_process(t: float) -> float:
    logger = logging.getLogger()
    logger.setLevel(logging.DEBUG)
 
    logger.info(f'Waiting for {t} seconds')
    time.sleep(t)
    logger.info(f'Processed after {t} seconds')
    return t
 
 
def main():
    queue = multiprocessing.Manager().Queue(-1)
 
    handler = logging.StreamHandler()
    formatter = logging.Formatter("%(processName)s %(message)s")
    handler.setFormatter(formatter)
    listener = logging.handlers.QueueListener(queue, handler)
    listener.start()
 
    args = [0.5, 0.1, 0.1, 0.1]
    with multiprocessing.Pool(
        processes=2,
        initializer=worker_init,
        initargs=(queue,)
    ) as pool:
        ret = pool.map(worker_process, args)

    queue.put_nowait(None)
    print(ret)
 
 
if __name__ == '__main__':
    main()

出力

SpawnPoolWorker-2 Worker initialized
SpawnPoolWorker-2 Waiting for 0.5 seconds
SpawnPoolWorker-3 Worker initialized
SpawnPoolWorker-3 Waiting for 0.1 seconds
SpawnPoolWorker-3 Processed after 0.1 seconds
SpawnPoolWorker-3 Waiting for 0.1 seconds
SpawnPoolWorker-3 Processed after 0.1 seconds
SpawnPoolWorker-3 Waiting for 0.1 seconds
SpawnPoolWorker-3 Processed after 0.1 seconds
SpawnPoolWorker-2 Processed after 0.5 seconds
[0.5, 0.1, 0.1, 0.1]
2
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?