やりたいこと
Python標準のloggingを使用してログファイルの先頭にヘッダ行を出力します。
この記事を参考に、よりシンプルな実装を目指します。
また、空のログファイルがすでに存在する場合もヘッダ行を出力するようにします。
実装
FileHandlerから継承する場合
logging.FileHandlerを使用する場合の実装です。
サンプルコード(FileHandler)
import os
import time
from logging import FileHandler, getLogger
class FileHandlerWithHeader(FileHandler):
def __init__(
self,
filename,
fileheader, # 引数にfileheaderを追加
mode="a",
encoding=None,
delay=False,
errors=None,
):
self.fileheader = fileheader
# ファイルが空かどうか確認するために読み書きできるようにする
# (mode="a"or"w"で開くとファイルの読み込みができない)
if 0 > mode.find("+"):
mode = mode + "+"
super().__init__(filename, mode, encoding, delay, errors)
def _open(self):
stream = super()._open()
# ファイルの先頭がEOFだった場合にヘッダ行を書き込む
stream.seek(0)
if "" == stream.read():
stream.write(self.fileheader + self.terminator)
return stream
# ログを出力してみる
logger = getLogger("logger")
logger.addHandler(FileHandlerWithHeader("LOG_FILE_NAME", "LOG_FILE_HEADER"))
for i in range(20):
logger.error(f"ERROR MESSAGE{i}")
time.sleep(0.2)
TimedRotatingFileHandlerから継承する場合
logging.handlers.TimedRotatingFileHandlerを使用する場合の実装です。
__init()__の引数でmodeを指定できない(mode="a"で固定となる)ので、modeを変更する処理を_open()の中へ移動しています。
サンプルコード(TimedRotatingFileHandler)
import os
import time
from logging import getLogger
from logging.handlers import TimedRotatingFileHandler
class TimedRoatingFileHandlerWithHeader(TimedRotatingFileHandler):
def __init__(
self,
filename,
fileheader, # 引数にfileheaderを追加
when="h",
interval=1,
backupCount=0,
encoding=None,
delay=False,
utc=False,
atTime=None,
errors=None,
):
self.fileheader = fileheader
super().__init__(
filename,
when,
interval,
backupCount,
encoding,
delay,
utc,
atTime,
errors,
)
def _open(self):
# ファイルが空かどうか確認するために読み書きできるようにする
# (mode="a"or"w"で開くとファイルの読み込みができない)
if 0 > self.mode.find("+"):
self.mode = self.mode + "+"
stream = super()._open()
# ファイルの先頭がEOFだった場合にヘッダ行を書き込む
stream.seek(0)
if "" == stream.read():
stream.write(self.fileheader + self.terminator)
return stream
# ログを出力してみる
logger = getLogger("logger")
logger.addHandler(
TimedRoatingFileHandlerWithHeader(
"LOG_FILE_NAME",
"LOG_FILE_HEADER",
"S", # 動作確認のためにwhen="S"とする
)
)
for i in range(20):
logger.error(f"ERROR MESSAGE{i}")
time.sleep(0.2)