1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

スケジューラーで並列実行時のログファイルについて

Last updated at Posted at 2023-08-28

【前置き】

・作業内で並列実行した処理ごとのログファイルの作成をしようとした際、前任の方が書いていたソースだと実現できていなかったため色々調べて解決することが出来たため簡単に備忘録として残します。

【やること】

・「複数のバッチで処理を並列実行しその処理ごとのログファイルを作成」

【必要なもの(環境)】

・Cloud Scheduler(2つ以上)
・Cloud Run(1つでOK)
・Cloud Storage
※今回はGCPの環境を使用していたが動きに関してはバッチでも再現可

【ソース&解説】

ソースを解説する前に結論を述べると....

loggerの定義で処理ごとに子loggerオブジェクトを生成してしまえば実現可能である!!

まず前のソースがこちら(例)

main.py(修正前)
import logging
import random
from flask import Flask, request
from google.cloud import storage

app = Flask(__name__)

# ログレベルを設定
app.logger.setLevel(logging.INFO)

# ログフォーマットを設定
log_formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")

# stdout へ出力するハンドラを設定
stream_handler = logging.StreamHandler(stream=sys.stdout)
stream_handler.setLevel(logging.INFO)
stream_handler.setFormatter(log_formatter)
app.logger.addHandler(stream_handler)

@app.route("/", methods=["POST"])
def index():
    # ファイルへログを出力するハンドラを設定する
    start_time = datetime.now().strftime("%Y%m%d-%H%M%S")
    log_file_path = Path(f"{start_time}.log")
    file_handler = logging.FileHandler(str(log_file_path))
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(log_formatter)
    app.logger.addHandler(file_handler)

    #ログに出力するメッセージ
    app.logger.info("処理開始")
    app.logger.info(random.randint(1,10))
    app.logger.info("処理終了")

    # ログファイルを Cloud Storage へ送る
    save_log_file(log_file_path)

    # ファイルハンドラを削除する
    app.logger.removeHandler(file_handler)

    return ""

def save_log_file(log_file_path: Path) -> None:
    #ログファイルを Cloud Storage へ送る
    source_file_name = str(log_file_path)
    bucket_name = "Cloud StorageのBacket名"
    destination_blob_name = f"ログファイル保存フォルダ名/{source_file_name}"
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)
    generation_match_precondition = 0
    blob.upload_from_filename(source_file_name, if_generation_match=generation_match_precondition)
    

この場合並列実行した際1つ目の処理のログに2つ目の処理のログが入り込んでしまう

【出力例】
#1つ目のスケジューラー実行のログ
2023-08-01 09:30:00,159 - main - INFO - 処理開始
2023-08-01 09:30:00,159 - main - INFO - 処理開始
2023-08-01 09:30:00,160 - main - INFO - 3
2023-08-01 09:30:00,160 - main - INFO - 7
2023-08-01 09:30:00,159 - main - INFO - 処理終了

#2つ目のスケジューラー実行のログ
2023-08-01 09:30:00,159 - main - INFO - 処理開始
2023-08-01 09:30:00,160 - main - INFO - 3
2023-08-01 09:30:00,160 - main - INFO - 7
2023-08-01 09:30:00,159 - main - INFO - 処理終了
2023-08-01 09:30:00,159 - main - INFO - 処理終了

そのため下記のように処理ごとにlogging.getLogger("任意の文字列").getChild("任意の文字列")で子loggerオブジェクトを設定する必要がある

main.py(修正後)
import logging
import random
from flask import Flask, request
from google.cloud import storage

app = Flask(__name__)

@app.route("/", methods=["POST"])
def index():
    start_time = datetime.now().strftime("%Y%m%d-%H%M%S")
    set_logger(request)

    #ログに出力するメッセージ
    app.logger.info("処理開始")
    app.logger.info(random.randint(1,10))
    app.logger.info("処理終了")

    # ログファイルを Cloud Storage へ送る
    save_log_file(log_file_path)

    # ファイルハンドラを削除する
    app.logger.removeHandler(file_handler)

    return ""

def set_logger(request: Request, start_time: str):
    #Cloud Scheduerで設定したリクエストの値をloggerの名前として設定する
    request_body: dict[str, Any] = request.get_json()
    logger_name = request_body.get("log",None)

    #loggerを定義←ここ!!
    app.logger = logging.getLogger(__name__).getChild(logger_name)

    # ログレベルを設定
    app.logger.setLevel(logging.INFO)
    
    # ログフォーマットを設定
    log_formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
    
    # stdout へ出力するハンドラを設定
    stream_handler = logging.StreamHandler(stream=sys.stdout)
    stream_handler.setLevel(logging.INFO)
    stream_handler.setFormatter(log_formatter)
    app.logger.addHandler(stream_handler)

    # ファイルへログを出力するハンドラを設定する
    log_file_path = Path(f"{logger_name}_{start_time}.log")
    file_handler = logging.FileHandler(str(log_file_path))
    file_handler.setLevel(logging.INFO)
    file_handler.setFormatter(log_formatter)
    app.logger.addHandler(file_handler)

    return log_file_path, file_handler
【出力例】
#1つ目のスケジューラー実行のログファイルの内容
2023-08-01 09:30:00,159 - main.out_first - INFO - 処理開始
2023-08-01 09:30:00,160 - main.out_first - INFO - 3
2023-08-01 09:30:00,159 - main.out_first - INFO - 処理終了

#2つ目のスケジューラー実行のログファイルの内容
2023-08-01 09:30:00,159 - main.out_second - INFO - 処理開始
2023-08-01 09:30:00,160 - main.out_second - INFO - 7
2023-08-01 09:30:00,159 - main.out_second - INFO - 処理終了

【おまけ(Cloud Schedulerのリクエスト本文を取得する方法)】

ソース解説で修正後のソースにあったこの部分↓

#Cloud Scheduerで設定したリクエストの値をloggerの名前として設定する
    request_body: dict[str, Any] = request.get_json()
    logger_name = request_body.get("log",None)

やっていることとしては単純にリクエストのJSONをPythonの辞書型のオブジェクトに変換し「log」のキーから値を取得している。

しかしそのリクエストのJSONはCloud Schedulerの編集画面にある「実行内容を構成する」の箇所で指定することができる。
Cloud Shedulerの編集画面

書き方例
{"log" : "out_first","value" : 1}
※値の部分を「""(ダブルクオーテーション)」で囲めば文字列型、数字のみだとint型として扱われる

【あとがき】

 今回やったことはあくまで並列実行で処理ごとにログファイルを作成することなので処理を1回動かしログファイルを1つ作成する場合は前者のソースの書き方で問題はありません。
 また並列実行ではログファイルを出力する前の処理で状況によっては定義→設定の間にloggerの名前が変わってしまう可能性があるため、その場合は各自time.sleep()並列実行しないように調整するなどをして上手く対処してください。

Qiita投稿時はまだまだ暑い日が続いているので体調管理に気を付けながら業務中に学んだことがあればまた投稿します!

【参考ドキュメント】

・Pythonのログ出力についての記事

・loggingの複数ファイル、モジュールでの使用方法についての記事

・Cloud ScheduerのHTTPリクエストについての記事

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?