0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

[Python] マルチプロセスでのロギングサンプル

Last updated at Posted at 2022-12-04

初めに

業務でloggingモジュールを利用し全ロギングレベルを出力するファイル、ERRORだけ出力するファイルに分けてロギングしていました。
当初はシングルプロセスで稼働させていたのでロジックもあまり気にしていませんでしたが、
マルチプロセス環境下にて動かす必要がでてきました。
調査した結果、QueueHandlerを使う方法が一般的なようです。
マルチプロセス環境下にて共通ファイルへのロギングを検証するためにPoCとして以下を作成しました。

環境

 Python3.8~(試したのは3.8.10)

フォルダ構成

以下の構成で検証しました。この構成でソースコードを配置(コピペ)すれば動くと思います。
-top dir
└ main.py
└ Worker
  └ worker_task.py
└ Logger
  └ logger.py

実行

top dirから以下で実行します。

python main.py

実行時、コマンドプロンプトにログを出力しつつ、top dir直下にerr_file.txt, all_fileを生成します。

  • err_file.txt
    • ERRレベルのみ出力する
  • all_file.txt
    • 全レベルを出力する

ソースコード

ポイントはloggerで設定したqueueを取得し、そのqueueを子プロセス側で利用する点です。

エントリーポイントであるmain.pyです。

main.py
import sys
from concurrent import futures
import os
import multiprocessing
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from Logger.logger import Logger
from Worker.worker_task import WorkerTask


def wrapper_main(obj1, obj2, obj3):
    return obj1.wrapper_main(obj2, obj3)

def main():

    # main logging
    logger = Logger.get_instance(output_path=".\\")
    name = multiprocessing.current_process().name
    
    # process pool executor
    with futures.ProcessPoolExecutor(3) as executor:
        results = []
        multi_pipelines = [WorkerTask("a"), WorkerTask("b"), WorkerTask("c"), WorkerTask("d"), WorkerTask("e")]

        # child_process
        for i, light_pr in enumerate(multi_pipelines, 1):
            results.append(executor.submit(wrapper_main, light_pr, str(i), Logger.get_q()))

        for pipeline in futures.as_completed(results):
            try:
                res = pipeline.result()
                logger.info(f"return... : proc={name}, res={res}")
            except Exception as e:
                logger.exception(f"unexpected error")
        Logger.end()
    

if __name__ == '__main__':
    main()

続いては子プロセスの実処理です。
単純にキックされたら0~9までの値をロギングする程度です。

worker_task.py
#import sys
import time
import logging
from random import random
import logging.handlers
import multiprocessing
import logging
import logging.handlers
import multiprocessing


class WorkerTask():
    
    def __init__(self, unique_val:str):
        self.unique_value = unique_val
        
    def wrapper_main(self, value:str, log_queue: multiprocessing.Queue) -> str:
        try:
            self.logger = logging.getLogger()
            handler = logging.handlers.QueueHandler(log_queue)
            self.logger.addHandler(handler)
            self.logger.setLevel(logging.DEBUG)
            name = multiprocessing.current_process().name
            self.logger.info(f"started.. : proc_name={name}, value={value}, unique_value={self.unique_value}")
            for j in range(10):
                time.sleep(random())
                self.logger.info(f"running.. : proc_name={name}, value={value}, number={j}")
            self.logger.info(f"end...... : proc_name={name}, unique_value={self.unique_value}")
            return_val = "end_" + value
        finally:
            # これがないとプロセスとしてhandlerを追加したままで2回キックされる
            self.logger.error(f"end wrapper main.")
            self.logger.handlers.clear()
        
        return return_val

最後にlogger.pyです。
respect_handler_levelをTrueにすることでloggerで設定したhandlerのレベルを子プロセス側でも認知できます。
なお、昔の名残でシングルトン的に作成していますが、サンプルでは特に関係ありません。
PoCでの作成のため余分な処理は入っています。

logger.py
import logging
import logging.handlers
import multiprocessing
import os

class Logger:
    """
    ロギング用のクラスです。シングルトンで定義しているため、
    利用者はget_instance()メソッドを呼んでください。

    出力方式: コンソール出力とファイル出力です。
    ファイル出力: log_file.txt/err_file.txtに分かれており、前者は全レベルを対象とします。
     err_file.txtはエラーレベル以上のみです。
    出力先: トップフォルダとなります。
    """

    __instance = None

    @classmethod
    def get_instance(
        cls,
        output_path:str=None,
        do_multiproc:bool=False
    ):
        """シングルトンでloggerを取得します。
        Returns:
            Logger
        """
        if not cls.__instance:
            Logger(output_path, do_multiproc)
        return cls.__instance
    
    def __init__(
        self,
        output_path:str,
        do_multiproc:bool
    ):
        """初期化処理をおこないます。
           出力先の確認が完了した後、
           formatter, handlerを読み込みます。
        """

        log_dirpath:str = output_path
        if not os.path.exists(log_dirpath):
            os.makedirs(log_dirpath, exist_ok=True)

        Logger.__instance = logging.getLogger()
        Logger.__instance.setLevel(logging.DEBUG)
        Logger.__instance.handlers.clear()

        # que
        Logger.log_queue = multiprocessing.Manager().Queue(-1)

        # stdout
        formatter_console = logging.Formatter(
            '%(asctime)s,%(processName)s,%(levelname)s,%(message)s'
        )
        formatter_console.default_msec_format = '%s.%03d'
        std_handler = logging.StreamHandler()
        std_handler.setLevel(logging.DEBUG)
        std_handler.setFormatter(formatter_console)
        Logger.__instance.addHandler(std_handler)

        formatter_file = logging.Formatter(
            '%(asctime)s,%(processName)s,%(thread)d,%(levelname)s,%(module)s,%(funcName)s,L%(lineno)d,%(message)s'
        )
        formatter_file.default_msec_format = '%s.%03d'
        # file - all

        file_handler_all = self._create_file_handler(
            "log_file.txt",
            logging.DEBUG,
            formatter_file
        )
        Logger.__instance.addHandler(file_handler_all)
        
        file_handler_err = self._create_file_handler(
            "err_file.txt",
            logging.ERROR,
            formatter_file
        )
        Logger.__instance.addHandler(file_handler_err)

        # multi processing listener
        if not do_multiproc:
            Logger.listener = logging.handlers.QueueListener(
                Logger.log_queue,
                std_handler,
                file_handler_all,
                file_handler_err,
                respect_handler_level=True
            )
            Logger.listener.start()
        
    @classmethod
    def end(cls):
        try:
            Logger.listener.stop()
        except Exception:
            pass
        
    @classmethod
    def get_q(cls):
        return Logger.log_queue


    def _create_file_handler(self, filename:str, level:str, formatter:logging.Formatter)-> logging.FileHandler:
        """logging用のFileHandlerを作成します

        Args:
            filepath (str): 出力するログファイルパス
            level (str): ログレベル
            formatter (logging.Formatter): 出力フォーマット

        Returns:
            logging.FileHandler: 作成したFileHandler
        """
        handler = logging.FileHandler(
            filename=filename, encoding='utf-8'
        )
        handler.setLevel(level)
        handler.setFormatter(formatter)
        return handler

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?