4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

LoggingServer というmultiprocessing 中のログを集めてまとめて処理するモジュールを作った話。

Last updated at Posted at 2022-03-31

pythonで並列処理中のログをどうやって管理しようか迷っていたところ、公式がめちゃくちゃ良い例を書いてくれていたためせっかくなのでimportして簡単に使用できるライブラリにしました。
またこの記事を書くにあたりこのブログを参考にしました。

Github: https://github.com/Geson-anko/logging_server

インストール

リポジトリをクローンして、このフォルダ内で

pip install -e ./

モジュールインポート

from logging_server import LoggingServer, SocketLogger

使い方

基本的にはGithubのREADME.mdを読んでいただけるとありがたいですが、ここでは最も基本的な使い方を記します。

  • Logging Serverの起動
    multiprocessingは並列処理実行時に __main__モジュールをインポートするためif __name__ == "__main__" で保護しないコードは2重に実行されます!
    Logging Serverは同じホスト・ポートで2重に立ち上げることが出来ないため、
    必ずif __name__ == "__main__"で保護してください。
if __name__ == "__main__":
    ls = LoggingServer(host="localhost", port=9999)
    ls.start() # Server runs in daemon thread.
  • Loggerを使用する
    ホストとポートはサーバーと同一です。
logger = SocketLogger("SocketLogger",host="localhost",port=9999)
logger.debug("debug")
logger.info("info")
logger.warning("warning")
logger.error("error")
logger.critical("critical")

Examples

とりあえず効果を実感できる例を用意しました。実行してみてください。

Good

main.py
from logging_server import LoggingServer, SocketLogger
import logging, sys

def process(process_name:str) -> None:
    logger = SocketLogger(process_name) # SocketLoggerを使用 
    logger.info(f"Start {process_name}")
    # ... some process ...

if __name__ == "__main__":
    import multiprocessing as mp
    
    logger = logging.getLogger() # root logger
    logger.addHandler(logging.StreamHandler(sys.stdout)) # output to console.
    logger.setLevel(logging.NOTSET) # lowest logger level

    ls = LoggingServer()
    ls.start()
    print("start processes.")
    for i in range(10):
        p = mp.Process(target=process, args=(f"process {i}",))
        p.start()
output
About starting Logging Server...
start processes.
Start process 1
Start process 3
Start process 0
Start process 5
Start process 9
Start process 6
Start process 2
Start process 8
Start process 7
Start process 4

Bad

from logging_server import LoggingServer, SocketLogger
import logging, sys

def process(process_name:str) -> None:
    logger = logging.getLogger(process_name) # 標準のgetLoggerを使用
    logger.info(f"Start {process_name}")
    # ... some process ...

if __name__ == "__main__":
    import multiprocessing as mp
    
    logger = logging.getLogger() # root logger
    logger.addHandler(logging.StreamHandler(sys.stdout)) # output to console.
    logger.setLevel(logging.NOTSET) # lowest logger level

    ls = LoggingServer()
    ls.start()
    print("start processes.")
    for i in range(10):
        p = mp.Process(target=process, args=(f"process {i}",))
        p.start()
output
About starting Logging Server...
start processes.

Badがうまくいかない理由

Badの方だとちゃんとRootLoggerを編集しているのにログが表示されません。
pythonのmultiprocessingモジュールはインタプリタごと切り離して並列処理をするため、子ロガーからメインプロセスにログを伝搬することが出来ないことが原因です。

そこで、TCPServerをたててそこにログを送信することによりいつも通りloggingモジュールを使用できるようにしました。

実装

基本的には公式の例に従いThreadingTCPServerをたてて、SocketHandlerをつけたロガーから送信します。しかしロガーに関してはlogging.getLoggerを使用するとループロギングが発生してしまったり、並列処理後にロガーを取得し直さなければいけなかったりなどと結構大変だったので新しくロガークラスを作成しました。

Server と Handler

公式の実装を少し改変し、取得した個々のロガーに対し改変できるようにしています。

Handler

サーバに送られてきたログを処理するHandlerです。
個々のロガーに対してサーバクラスに設定したlogger_modifierメソッドを使用し、変更を加えることができます。

logging_server/handlers.py
import socketserver
import struct
import logging
import pickle
from typing import *

class LogRecordStreamHandler(socketserver.StreamRequestHandler):
    """Read the LogRecord binary and process it."""

    # logger saver
    loggers = dict()

    def handle(self):
        """make the LogRecord object from binary and process it."""
        while True:
            chunk = self.connection.recv(4)
            if len(chunk) < 4:
                break
            slen = struct.unpack(">L",chunk)[0]
            chunk = self.connection.recv(slen)
            while len(chunk) < slen:
                chunk = chunk + self.connection.recv(slen - len(chunk))
            obj = self.unPickle(chunk)
            record = logging.makeLogRecord(obj)
            self.handleLogRecord(record)

    def unPickle(self, data:bytes) -> Any:
        return pickle.loads(data)
    
    def handleLogRecord(self, record:logging.LogRecord) -> None:
        """ process the LogRecord object."""
        if self.server.logname is not None:
            name = self.server.logname
        else:
            name = record.name

        # if already exists the logger, use it.
        if name in self.loggers:
            logger = self.loggers[name]
        else:
            logger = logging.getLogger(name)
            logger.propagate = True
            
            logger = self.modify_logger(logger)

            self.loggers[name] = logger
        logger.handle(record)

    def modify_logger(self,logger:logging.Logger) -> logging.Logger:
        """modify logger if server has logger modifier."""
        if hasattr(self.server,"logger_modifier"):
            _logger = self.server.logger_modifier(logger)
            if _logger: # if logger modifier does not return logger.
                logger = _logger
        return logger

Server

公式の実装にサーバを別スレッドで起動するstartメソッドを追加しました。(サーバーはデーモンスレッドで起動するのでshutdownを呼ばなくても終了します。)

logging_server/server.py
from .handlers import LogRecordStreamHandler
import socketserver
import logging
import logging.handlers
import threading
from typing import *
class LoggingServer(socketserver.ThreadingTCPServer):
    """The SocketServer which receive Logs."""

    allow_reuse_address = True
    logger_modifier:Callable = lambda self,x:x

    def __init__(self,host='localhost',port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, 
                handler=LogRecordStreamHandler, logger_name:str=__name__):
        super().__init__((host, port), handler)
        self.timeout = 1
        self.logname = None
        self.logger = logging.getLogger(logger_name)
        self.__shutdown = False
        self.server_thread:threading.Thread = None

    def serve_until_stopped(self):
        import select
        while not self.__shutdown:
            rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout)
            if rd:
                self.handle_request()

    def start(self):
        self.__shutdown= False
        self.server_thread = threading.Thread(target=self.serve_until_stopped,daemon=True)
        self.server_thread.start()
        self.logger.info("About starting Logging Server...")

    def shutdown(self,timeout:float=0.0):
        self.__shutdown = True
        self.server_thread.join(timeout)
        self.logger.info("Shutdown Logging Server.")

    def __del__(self):
        self.shutdown()
        
    def set_logger_modifier(self, func:Callable) -> None:
        """set func to add handlers or filters to specified logger.
        The func must have a argument for logger, and returns logger class.
        """
        if callable(func):
            self.logger_modifier = func
        else:
            raise ValueError("logger modifier must be callable! input: {}".format(func))

SocketLogger

logging.getLoggerはすでにインスタンスされたロガーを再利用しようとするため、単にSocketHandlerをつけたロガーをメインスレッドで使用するとサーバにログを送信し続けてしまう問題が発生してしまいます。
その問題は logging.Loggerを直接インスタンスすることで解決しました。また、並列処理後にロガーを自動で再取得するデコレータを実装し、通常の末端のロガーと同様に使用できるようにしました。

logging_server/logger.py
import os
import logging
import logging.handlers

def _check_pid(func):
    def check(self,*args, **kwds):
        pid = os.getpid()
        if self._pid != pid:
            self._pid = pid
            self.reset_logger()
        func(self,*args, **kwds)
    return check

class SocketLogger:
    _pid:int = None
    __logger:logging.Logger = None
    def __init__(
        self, name:str, level:int=logging.NOTSET, host="localhost",
        port:int=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
    ) -> None:
        self._pid = os.getpid()
        self.name = name
        self.level = level
        self.host = host
        self.port = port
        self.set_logger()

    @property
    def logger(self):
        return self.__logger

    def setLevel(self, level:int) -> None:
        self.logger.setLevel(level)

    def set_logger(self):
        """set logger class, name, level and socket handler."""
        self.__logger = logging.Logger(self.name)
        self.__logger.setLevel(self.level)
        socket_handler = logging.handlers.SocketHandler(self.host, self.port)
        socket_handler.setLevel(logging.NOTSET)
        self.__logger.addHandler(socket_handler)
        self.__logger.propagate=False # Because another logger is propagating in server process.

    def remove_handlers(self):
        """remove handlers of logger."""
        for hdlr in self.__logger.handlers:
            self.__logger.removeHandler(hdlr)

    def reset_logger(self):
        """reset logger class"""
        self.remove_handlers()
        self.set_logger()

    @_check_pid
    def debug(self,*args, **kwds) -> None: self.logger.debug(*args, **kwds)
    @_check_pid
    def info(self,*args, **kwds) -> None: self.logger.info(*args, **kwds)
    @_check_pid
    def warn(self,*args, **kwds) -> None: self.logger.warn(*args, **kwds)
    @_check_pid
    def warning(self,*args, **kwds) -> None: self.logger.warning(*args, **kwds)
    @_check_pid
    def error(self,*args, **kwds) -> None: self.logger.error(*args, **kwds)
    @_check_pid
    def critical(self,*args, **kwds) -> None: self.logger.critical(*args, **kwds)
    @_check_pid
    def exception(self,*args, **kwds) -> None: self.logger.exception(*args, **kwds)
    @_check_pid
    def log(self, *args,**kwds) -> None: self.logger.log(*args,**kwds)

まとめ

Pythonで並列処理中のロギングをサーバースレッドたててまとめて処理することでいつも通りロギングできるようにしましたとさ。
ロガーの実装が雑なのは許してください。

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?