初めに
業務でloggingモジュールを利用し全ロギングレベルを出力するファイル、ERRORだけ出力するファイルに分けてロギングしていました。
当初はシングルプロセスで稼働させていたのでロジックもあまり気にしていませんでしたが、
マルチプロセス環境下にて動かす必要がでてきました。
調査した結果、QueueHandlerを使う方法が一般的なようです。
マルチプロセス環境下にて共通ファイルへのロギングを検証するためにPoCとして以下を作成しました。
環境
Python3.8~(試したのは3.8.10)
フォルダ構成
以下の構成で検証しました。この構成でソースコードを配置(コピペ)すれば動くと思います。
-top dir
└ main.py
└ Worker
└ worker_task.py
└ Logger
└ logger.py
実行
top dirから以下で実行します。
python main.py
実行時、コマンドプロンプトにログを出力しつつ、top dir直下にerr_file.txt, all_fileを生成します。
- err_file.txt
- ERRレベルのみ出力する
- all_file.txt
- 全レベルを出力する
ソースコード
ポイントはloggerで設定したqueueを取得し、そのqueueを子プロセス側で利用する点です。
エントリーポイントであるmain.pyです。
import sys
from concurrent import futures
import os
import multiprocessing
sys.path.append(os.path.join(os.path.dirname(__file__), '..'))
from Logger.logger import Logger
from Worker.worker_task import WorkerTask
def wrapper_main(obj1, obj2, obj3):
return obj1.wrapper_main(obj2, obj3)
def main():
# main logging
logger = Logger.get_instance(output_path=".\\")
name = multiprocessing.current_process().name
# process pool executor
with futures.ProcessPoolExecutor(3) as executor:
results = []
multi_pipelines = [WorkerTask("a"), WorkerTask("b"), WorkerTask("c"), WorkerTask("d"), WorkerTask("e")]
# child_process
for i, light_pr in enumerate(multi_pipelines, 1):
results.append(executor.submit(wrapper_main, light_pr, str(i), Logger.get_q()))
for pipeline in futures.as_completed(results):
try:
res = pipeline.result()
logger.info(f"return... : proc={name}, res={res}")
except Exception as e:
logger.exception(f"unexpected error")
Logger.end()
if __name__ == '__main__':
main()
続いては子プロセスの実処理です。
単純にキックされたら0~9までの値をロギングする程度です。
#import sys
import time
import logging
from random import random
import logging.handlers
import multiprocessing
import logging
import logging.handlers
import multiprocessing
class WorkerTask():
def __init__(self, unique_val:str):
self.unique_value = unique_val
def wrapper_main(self, value:str, log_queue: multiprocessing.Queue) -> str:
try:
self.logger = logging.getLogger()
handler = logging.handlers.QueueHandler(log_queue)
self.logger.addHandler(handler)
self.logger.setLevel(logging.DEBUG)
name = multiprocessing.current_process().name
self.logger.info(f"started.. : proc_name={name}, value={value}, unique_value={self.unique_value}")
for j in range(10):
time.sleep(random())
self.logger.info(f"running.. : proc_name={name}, value={value}, number={j}")
self.logger.info(f"end...... : proc_name={name}, unique_value={self.unique_value}")
return_val = "end_" + value
finally:
# これがないとプロセスとしてhandlerを追加したままで2回キックされる
self.logger.error(f"end wrapper main.")
self.logger.handlers.clear()
return return_val
最後にlogger.pyです。
respect_handler_levelをTrueにすることでloggerで設定したhandlerのレベルを子プロセス側でも認知できます。
なお、昔の名残でシングルトン的に作成していますが、サンプルでは特に関係ありません。
PoCでの作成のため余分な処理は入っています。
import logging
import logging.handlers
import multiprocessing
import os
class Logger:
"""
ロギング用のクラスです。シングルトンで定義しているため、
利用者はget_instance()メソッドを呼んでください。
出力方式: コンソール出力とファイル出力です。
ファイル出力: log_file.txt/err_file.txtに分かれており、前者は全レベルを対象とします。
err_file.txtはエラーレベル以上のみです。
出力先: トップフォルダとなります。
"""
__instance = None
@classmethod
def get_instance(
cls,
output_path:str=None,
do_multiproc:bool=False
):
"""シングルトンでloggerを取得します。
Returns:
Logger
"""
if not cls.__instance:
Logger(output_path, do_multiproc)
return cls.__instance
def __init__(
self,
output_path:str,
do_multiproc:bool
):
"""初期化処理をおこないます。
出力先の確認が完了した後、
formatter, handlerを読み込みます。
"""
log_dirpath:str = output_path
if not os.path.exists(log_dirpath):
os.makedirs(log_dirpath, exist_ok=True)
Logger.__instance = logging.getLogger()
Logger.__instance.setLevel(logging.DEBUG)
Logger.__instance.handlers.clear()
# que
Logger.log_queue = multiprocessing.Manager().Queue(-1)
# stdout
formatter_console = logging.Formatter(
'%(asctime)s,%(processName)s,%(levelname)s,%(message)s'
)
formatter_console.default_msec_format = '%s.%03d'
std_handler = logging.StreamHandler()
std_handler.setLevel(logging.DEBUG)
std_handler.setFormatter(formatter_console)
Logger.__instance.addHandler(std_handler)
formatter_file = logging.Formatter(
'%(asctime)s,%(processName)s,%(thread)d,%(levelname)s,%(module)s,%(funcName)s,L%(lineno)d,%(message)s'
)
formatter_file.default_msec_format = '%s.%03d'
# file - all
file_handler_all = self._create_file_handler(
"log_file.txt",
logging.DEBUG,
formatter_file
)
Logger.__instance.addHandler(file_handler_all)
file_handler_err = self._create_file_handler(
"err_file.txt",
logging.ERROR,
formatter_file
)
Logger.__instance.addHandler(file_handler_err)
# multi processing listener
if not do_multiproc:
Logger.listener = logging.handlers.QueueListener(
Logger.log_queue,
std_handler,
file_handler_all,
file_handler_err,
respect_handler_level=True
)
Logger.listener.start()
@classmethod
def end(cls):
try:
Logger.listener.stop()
except Exception:
pass
@classmethod
def get_q(cls):
return Logger.log_queue
def _create_file_handler(self, filename:str, level:str, formatter:logging.Formatter)-> logging.FileHandler:
"""logging用のFileHandlerを作成します
Args:
filepath (str): 出力するログファイルパス
level (str): ログレベル
formatter (logging.Formatter): 出力フォーマット
Returns:
logging.FileHandler: 作成したFileHandler
"""
handler = logging.FileHandler(
filename=filename, encoding='utf-8'
)
handler.setLevel(level)
handler.setFormatter(formatter)
return handler