pythonで並列処理中のログをどうやって管理しようか迷っていたところ、公式がめちゃくちゃ良い例を書いてくれていたためせっかくなのでimportして簡単に使用できるライブラリにしました。
またこの記事を書くにあたりこのブログを参考にしました。
Github: https://github.com/Geson-anko/logging_server
インストール
リポジトリをクローンして、このフォルダ内で
pip install -e ./
モジュールインポート
from logging_server import LoggingServer, SocketLogger
使い方
基本的にはGithubのREADME.mdを読んでいただけるとありがたいですが、ここでは最も基本的な使い方を記します。
- Logging Serverの起動
※multiprocessing
は並列処理実行時に__main__
モジュールをインポートするためif __name__ == "__main__"
で保護しないコードは2重に実行されます!
Logging Serverは同じホスト・ポートで2重に立ち上げることが出来ないため、
必ずif __name__ == "__main__"
で保護してください。
if __name__ == "__main__":
ls = LoggingServer(host="localhost", port=9999)
ls.start() # Server runs in daemon thread.
- Loggerを使用する
ホストとポートはサーバーと同一です。
logger = SocketLogger("SocketLogger",host="localhost",port=9999)
logger.debug("debug")
logger.info("info")
logger.warning("warning")
logger.error("error")
logger.critical("critical")
Examples
とりあえず効果を実感できる例を用意しました。実行してみてください。
Good
from logging_server import LoggingServer, SocketLogger
import logging, sys
def process(process_name:str) -> None:
logger = SocketLogger(process_name) # SocketLoggerを使用
logger.info(f"Start {process_name}")
# ... some process ...
if __name__ == "__main__":
import multiprocessing as mp
logger = logging.getLogger() # root logger
logger.addHandler(logging.StreamHandler(sys.stdout)) # output to console.
logger.setLevel(logging.NOTSET) # lowest logger level
ls = LoggingServer()
ls.start()
print("start processes.")
for i in range(10):
p = mp.Process(target=process, args=(f"process {i}",))
p.start()
About starting Logging Server...
start processes.
Start process 1
Start process 3
Start process 0
Start process 5
Start process 9
Start process 6
Start process 2
Start process 8
Start process 7
Start process 4
Bad
from logging_server import LoggingServer, SocketLogger
import logging, sys
def process(process_name:str) -> None:
logger = logging.getLogger(process_name) # 標準のgetLoggerを使用
logger.info(f"Start {process_name}")
# ... some process ...
if __name__ == "__main__":
import multiprocessing as mp
logger = logging.getLogger() # root logger
logger.addHandler(logging.StreamHandler(sys.stdout)) # output to console.
logger.setLevel(logging.NOTSET) # lowest logger level
ls = LoggingServer()
ls.start()
print("start processes.")
for i in range(10):
p = mp.Process(target=process, args=(f"process {i}",))
p.start()
About starting Logging Server...
start processes.
Badがうまくいかない理由
Badの方だとちゃんとRootLoggerを編集しているのにログが表示されません。
pythonのmultiprocessing
モジュールはインタプリタごと切り離して並列処理をするため、子ロガーからメインプロセスにログを伝搬することが出来ないことが原因です。
そこで、TCPServerをたててそこにログを送信することによりいつも通りloggingモジュールを使用できるようにしました。
実装
基本的には公式の例に従いThreadingTCPServer
をたてて、SocketHandler
をつけたロガーから送信します。しかしロガーに関してはlogging.getLogger
を使用するとループロギングが発生してしまったり、並列処理後にロガーを取得し直さなければいけなかったりなどと結構大変だったので新しくロガークラスを作成しました。
Server と Handler
公式の実装を少し改変し、取得した個々のロガーに対し改変できるようにしています。
Handler
サーバに送られてきたログを処理するHandlerです。
個々のロガーに対してサーバクラスに設定したlogger_modifier
メソッドを使用し、変更を加えることができます。
import socketserver
import struct
import logging
import pickle
from typing import *
class LogRecordStreamHandler(socketserver.StreamRequestHandler):
"""Read the LogRecord binary and process it."""
# logger saver
loggers = dict()
def handle(self):
"""make the LogRecord object from binary and process it."""
while True:
chunk = self.connection.recv(4)
if len(chunk) < 4:
break
slen = struct.unpack(">L",chunk)[0]
chunk = self.connection.recv(slen)
while len(chunk) < slen:
chunk = chunk + self.connection.recv(slen - len(chunk))
obj = self.unPickle(chunk)
record = logging.makeLogRecord(obj)
self.handleLogRecord(record)
def unPickle(self, data:bytes) -> Any:
return pickle.loads(data)
def handleLogRecord(self, record:logging.LogRecord) -> None:
""" process the LogRecord object."""
if self.server.logname is not None:
name = self.server.logname
else:
name = record.name
# if already exists the logger, use it.
if name in self.loggers:
logger = self.loggers[name]
else:
logger = logging.getLogger(name)
logger.propagate = True
logger = self.modify_logger(logger)
self.loggers[name] = logger
logger.handle(record)
def modify_logger(self,logger:logging.Logger) -> logging.Logger:
"""modify logger if server has logger modifier."""
if hasattr(self.server,"logger_modifier"):
_logger = self.server.logger_modifier(logger)
if _logger: # if logger modifier does not return logger.
logger = _logger
return logger
Server
公式の実装にサーバを別スレッドで起動するstart
メソッドを追加しました。(サーバーはデーモンスレッドで起動するのでshutdown
を呼ばなくても終了します。)
from .handlers import LogRecordStreamHandler
import socketserver
import logging
import logging.handlers
import threading
from typing import *
class LoggingServer(socketserver.ThreadingTCPServer):
"""The SocketServer which receive Logs."""
allow_reuse_address = True
logger_modifier:Callable = lambda self,x:x
def __init__(self,host='localhost',port=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
handler=LogRecordStreamHandler, logger_name:str=__name__):
super().__init__((host, port), handler)
self.timeout = 1
self.logname = None
self.logger = logging.getLogger(logger_name)
self.__shutdown = False
self.server_thread:threading.Thread = None
def serve_until_stopped(self):
import select
while not self.__shutdown:
rd, wr, ex = select.select([self.socket.fileno()], [], [], self.timeout)
if rd:
self.handle_request()
def start(self):
self.__shutdown= False
self.server_thread = threading.Thread(target=self.serve_until_stopped,daemon=True)
self.server_thread.start()
self.logger.info("About starting Logging Server...")
def shutdown(self,timeout:float=0.0):
self.__shutdown = True
self.server_thread.join(timeout)
self.logger.info("Shutdown Logging Server.")
def __del__(self):
self.shutdown()
def set_logger_modifier(self, func:Callable) -> None:
"""set func to add handlers or filters to specified logger.
The func must have a argument for logger, and returns logger class.
"""
if callable(func):
self.logger_modifier = func
else:
raise ValueError("logger modifier must be callable! input: {}".format(func))
SocketLogger
logging.getLogger
はすでにインスタンスされたロガーを再利用しようとするため、単にSocketHandlerをつけたロガーをメインスレッドで使用するとサーバにログを送信し続けてしまう問題が発生してしまいます。
その問題は logging.Logger
を直接インスタンスすることで解決しました。また、並列処理後にロガーを自動で再取得するデコレータを実装し、通常の末端のロガーと同様に使用できるようにしました。
import os
import logging
import logging.handlers
def _check_pid(func):
def check(self,*args, **kwds):
pid = os.getpid()
if self._pid != pid:
self._pid = pid
self.reset_logger()
func(self,*args, **kwds)
return check
class SocketLogger:
_pid:int = None
__logger:logging.Logger = None
def __init__(
self, name:str, level:int=logging.NOTSET, host="localhost",
port:int=logging.handlers.DEFAULT_TCP_LOGGING_PORT,
) -> None:
self._pid = os.getpid()
self.name = name
self.level = level
self.host = host
self.port = port
self.set_logger()
@property
def logger(self):
return self.__logger
def setLevel(self, level:int) -> None:
self.logger.setLevel(level)
def set_logger(self):
"""set logger class, name, level and socket handler."""
self.__logger = logging.Logger(self.name)
self.__logger.setLevel(self.level)
socket_handler = logging.handlers.SocketHandler(self.host, self.port)
socket_handler.setLevel(logging.NOTSET)
self.__logger.addHandler(socket_handler)
self.__logger.propagate=False # Because another logger is propagating in server process.
def remove_handlers(self):
"""remove handlers of logger."""
for hdlr in self.__logger.handlers:
self.__logger.removeHandler(hdlr)
def reset_logger(self):
"""reset logger class"""
self.remove_handlers()
self.set_logger()
@_check_pid
def debug(self,*args, **kwds) -> None: self.logger.debug(*args, **kwds)
@_check_pid
def info(self,*args, **kwds) -> None: self.logger.info(*args, **kwds)
@_check_pid
def warn(self,*args, **kwds) -> None: self.logger.warn(*args, **kwds)
@_check_pid
def warning(self,*args, **kwds) -> None: self.logger.warning(*args, **kwds)
@_check_pid
def error(self,*args, **kwds) -> None: self.logger.error(*args, **kwds)
@_check_pid
def critical(self,*args, **kwds) -> None: self.logger.critical(*args, **kwds)
@_check_pid
def exception(self,*args, **kwds) -> None: self.logger.exception(*args, **kwds)
@_check_pid
def log(self, *args,**kwds) -> None: self.logger.log(*args,**kwds)
まとめ
Pythonで並列処理中のロギングをサーバースレッドたててまとめて処理することでいつも通りロギングできるようにしましたとさ。
ロガーの実装が雑なのは許してください。