0
0

Postfixのキュー監視

Last updated at Posted at 2024-04-01

プログラムとコンフィグ

monitor_postfix_queue.py
import json
import yaml
import csv
from pathlib import Path
from datetime import datetime
import re
import subprocess
import sys
import os
import socket
import logging
import argparse
from collections import Counter
from shutil import copy2

BASE_DIR = Path(__file__).resolve().parent.parent
LOG_DIR = BASE_DIR / 'log'
DEF_DIR = BASE_DIR / 'def'
TMP_DIR = BASE_DIR / 'tmp'

POSTQUEUE_J_01_FILENAME = "postqueue-j_01.json"
POSTQUEUE_J_02_FILENAME = "postqueue-j_02.json"
QUEUE_SUMMARY_FILENAME = "queue_summary_result.prf"

#===============================================================================
# ロギング設定
#===============================================================================
def setup_logging():
    log_file_path = LOG_DIR / 'pythonlog.log'
    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    file_handler = logging.FileHandler(str(log_file_path), encoding='utf-8')
    formatter = logging.Formatter(f'%(asctime)s {socket.gethostname()} [{Path(__file__).name}:%(levelname)s] [%(process)d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
    file_handler.setFormatter(formatter)
    stream_handler = logging.StreamHandler(sys.stdout)
    stream_handler.setFormatter(formatter)
    logger.addHandler(file_handler)
    logger.addHandler(stream_handler)

def parse_arguments():
    """コマンドライン引数を解析する。"""
    parser = argparse.ArgumentParser(description='Postfixキュー監視ツール')
    group = parser.add_mutually_exclusive_group(required=True)
    group.add_argument('--use-log-file', nargs='?', default=None, help='`postqueue -j` コマンドの出力の代わりに指定したログファイルを使用します。パスを指定してください。')
    group.add_argument('--config-path', help='Postfixキュー設定ディレクトリへのパス。このオプションが指定された場合、`postqueue -j -c {config_path}` コマンドが実行されます。')
    parser.add_argument('--output-dir', help='結果を保存するディレクトリのパス。指定されない場合、デフォルトの出力ディレクトリが使用されます。', default=None)
    return parser.parse_args()

#===============================================================================
# ユーティリティ関数
#===============================================================================
def load_json_objects_from_file(file_path):
    logging.debug(f"{file_path} からJSONオブジェクトを読み込み開始します。")
    data = []
    try:
        with open(file_path, 'r') as file:
            text = file.read()
            decoder = json.JSONDecoder()
            pos = 0
            while pos < len(text):
                obj, pos_new = decoder.raw_decode(text, pos)
                data.append(obj)
                pos = pos_new
                while pos < len(text) and text[pos] in (' ', '\n', '\t', '\r'):
                    pos += 1
        logging.info(f"{file_path} から{len(data)}件のJSONオブジェクトを読み込みました。")
    except Exception as e:
        logging.error(f"{file_path} の読み込み中にエラーが発生しました。", exc_info=True)
    return data

def split_recipients(json_data, output_dir):
    """
    各メールキュー項目の受信者を個別のキュー項目に分割し、結果を指定されたディレクトリに保存します。

    :param json_data: 元のJSONデータ
    :param output_dir: 結果を保存するディレクトリのパス
    """
    split_data = []  # 分割後のデータを格納するリスト

    for item in json_data:
        for recipient in item["recipients"]:
            new_item = item.copy()
            new_item["recipients"] = [recipient]
            split_data.append(new_item)

    # 結果をJSONファイルとして保存
    output_file_path = Path(output_dir) / POSTQUEUE_J_02_FILENAME
    with open(output_file_path, 'w', encoding='utf-8') as f:
        json.dump(split_data, f, ensure_ascii=False, indent=4)

    total_queue_count = len(split_data)
    logging.info(f"ファイルが保存されました: {output_file_path}")
    logging.info(f"分割結果のJSONオブジェクト数: {len(split_data)}")

    return split_data, total_queue_count

def fetch_postqueue_data(use_log_file=None, output_dir=None, config_path=None):
    """postqueue -jの出力を取得するか、指定されたログファイルを読み込む。"""
    if use_log_file:
        return process_log_file(use_log_file, output_dir)
    elif config_path:
        return execute_postqueue_command(config_path, output_dir)
    else:
        logging.error("必要な引数が提供されていません。")
        return None

def process_log_file(log_file, output_dir):
    """指定されたログファイルを処理し、データを読み込む"""
    output_file_path = output_dir / POSTQUEUE_J_01_FILENAME
    try:
        copy2(log_file, output_file_path)
        logging.info(f"ログファイルをコピーしました: {log_file} -> {output_file_path}")
        return load_json_objects_from_file(output_file_path)
    except Exception as e:
        logging.error(f"ログファイルのコピー中にエラーが発生しました: {e}", exc_info=True)
        return None

def execute_postqueue_command(config_path, output_dir):
    """`postqueue -j -c` コマンドを実行し、結果を処理する"""
    output_file_path = output_dir / POSTQUEUE_J_01_FILENAME
    try:
        result = subprocess.run(['postqueue', '-j', '-c', config_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
        if result.returncode == 0 and result.stdout:
            logging.info(f"`postqueue -j -c {config_path}` コマンドの実行に成功しました。")
            with open(output_file_path, 'w') as f:
                f.write(result.stdout)
            return load_json_objects_from_file(output_file_path)
        else:
            logging.info("滞留メールはありません。")
            return []
    except Exception as e:
        logging.error("`postqueue -j -c` コマンドの実行中にエラーが発生しました。", exc_info=True)
        return None

def match_entries(data, rule, output_dir, rule_type):
    current_timestamp = datetime.now().timestamp()
    domain_counts = Counter()
    domain_matched_entries = {}
    matched_entries = []
    total_matched_entries = []

    for index, entry in enumerate(data, start=1):  # enumerateを使ってループのインデックスを取得
        recipient_domain = next((recipient["address"].split('@')[-1] for recipient in entry["recipients"] if "@" in recipient["address"]), None)
        delay_reason = next((recipient.get("delay_reason", "") for recipient in entry["recipients"]), None)

        # arrival_time判定
        if (current_timestamp - rule["arrival_time_thresholdMinutes"] * 60) <= datetime.fromtimestamp(int(entry["arrival_time"])).timestamp():
            logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: arrival_time")
            logging.debug(f"{(current_timestamp - rule['arrival_time_thresholdMinutes'] * 60)} <= {datetime.fromtimestamp(int(entry['arrival_time'])).timestamp()}")
            continue
        # queue_name判定
        if not re.match(rule["queue_name"], entry["queue_name"]):
            logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: queue_name")
            continue
        # sender判定
        if not re.match(rule["sender"] + "$", entry["sender"]):
            logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: sender")
            continue
        # recipient_domain判定
        if not re.match(rule["recipient_domain"].replace("@", ""), recipient_domain):
            logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: recipient_domain")
            continue
        # delay_reason判定
        if not re.match(rule["delay_reason"], delay_reason):
            logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: delay_reason")
            continue

        # ドメイン別フィルタリング or 全ドメイン
        if rule.get("recipient_domain_specific_filter", False):
            # ドメイン毎にカウント
            domain_counts[recipient_domain] += 1
            if recipient_domain not in domain_matched_entries:
                domain_matched_entries[recipient_domain] = []
                logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, domain: {recipient_domain}, action: initialized, count: 1")
            else:
                logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, domain: {recipient_domain}, action: incremented, count: {domain_counts[recipient_domain]}")
            domain_matched_entries[recipient_domain].append(entry)
        else:
            # 全ドメイン毎にカウント
            logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, append")
            total_matched_entries.append(entry)

    # ドメイン別フィルタリング or 全ドメイン
    if rule.get("recipient_domain_specific_filter", False):
         # ドメイン別フィルタリング
        for domain, count in domain_counts.items():
            # 一致フィルタと除外フィルタでdelay_reson_countの意味を逆転させる
            if rule_type == 'match_filters':
                if count >= rule["delay_reason_count"]:
                    logging.debug(f"{rule_type}, Num: {rule['Num']}, domain: {domain}, count: {count} >= delay_reason_count: {rule['delay_reason_count']}, extend")
                    matched_entries.extend(domain_matched_entries[domain])
                else:
                    logging.debug(f"{rule_type}, Num: {rule['Num']}, domain: {domain}, count: {count} < delay_reason_count: {rule['delay_reason_count']}, not added")
            elif rule_type == 'exception_filters':
                if count <= rule["delay_reason_count"]:
                    logging.debug(f"{rule_type}, Num: {rule['Num']}, domain: {domain}, count: {count} >= delay_reason_count: {rule['delay_reason_count']}, extend")
                    matched_entries.extend(domain_matched_entries[domain])
                else:
                    logging.debug(f"{rule_type}, Num: {rule['Num']}, domain: {domain}, count: {count} < delay_reason_count: {rule['delay_reason_count']}, not added")
    else:
        # 全ドメイン
        # 一致フィルタと除外フィルタでdelay_reson_countの意味を逆転させる
        if rule_type == 'match_filters':
            if len(total_matched_entries) >= rule["delay_reason_count"]:
                logging.debug(f"{rule_type}, Num: {rule['Num']}, count: {len(total_matched_entries)} >= delay_reason_count: {rule['delay_reason_count']}, extend")
                matched_entries.extend(total_matched_entries)
        elif rule_type == 'exception_filters':
            if len(total_matched_entries) <= rule["delay_reason_count"]:
                logging.debug(f"{rule_type}, Num: {rule['Num']}, count: {len(total_matched_entries)} >= delay_reason_count: {rule['delay_reason_count']}, extend")
                matched_entries.extend(total_matched_entries)

    # 出力ファイルへの書き込み
    logging.info(f"{rule_type}, Num: {rule['Num']}, 一致件数: {len(matched_entries)}")
    if matched_entries:
        matched_entries_file_path = output_dir / f"{rule_type}_{rule['Num']}.json"
        with open(matched_entries_file_path, 'w') as f:
            json.dump(matched_entries, f, indent=2)
            logging.info(f"ファイルに書き込みました: {matched_entries_file_path}, {rule_type}, Num: {rule['Num']}")

    return matched_entries

def setup_directories(args):
    """ログファイルの出力ディレクトリを準備し、それらのパスを返す。"""
    # 一時ファイル用のディレクトリを準備
    if args.output_dir:
        output_dir = Path(args.output_dir).resolve()
    else:
        # コマンドライン引数で出力ディレクトリが指定されていない場合のデフォルトのディレクトリ
        current_time_formatted = datetime.now().strftime("%Y%m%d%H%M%S")
        pid = os.getpid()
        output_dir = TMP_DIR / f'{Path(__file__).stem}' / f'{current_time_formatted}_{pid}'
        output_dir.mkdir(parents=True, exist_ok=True)

    # LOG_DIRが存在しない場合は作成する
    LOG_DIR.mkdir(parents=True, exist_ok=True)

    return output_dir

def filter_data(json_data, config, output_dir):
    """設定ルールに基づいてデータをフィルタリングし、一致したエントリを除外した結果を返す。"""
    matched_data = []
    filtered_data = json_data[:]

    # マッチフィルタの適用
    if 'match_filters' in config:
        for rule in config['match_filters']:
            matched_entries = match_entries(filtered_data, rule, output_dir, "match_filters")
            # 重複を避けるために、既にマッチしたエントリは追加しない
            for entry in matched_entries:
                if entry not in matched_data:
                    matched_data.append(entry)

    # マッチフィルタ後の件数
    mached_after_filter_count = len(matched_data)

    # matched_dataに何もない場合、例外フィルタは適用されず、元のデータが返される
    if not matched_data:
        return [], 0, 0

    # 例外条件の処理
    exception_filters = config.get('exception_filters', [])  # デフォルト値として空のリストを設定
    for rule in exception_filters:
        exception_matched = match_entries(filtered_data, rule, output_dir, "exception_filters")
        matched_data = [entry for entry in matched_data if entry not in exception_matched]

    # マッチフィルタ後の件数
    exclude_after_filter_count = len(matched_data)

    return matched_data ,mached_after_filter_count, exclude_after_filter_count

def write_csv(data, csv_file_path):
    with open(csv_file_path, 'w', newline='') as file:
        writer = csv.writer(file, lineterminator='\n')
        # CSVのヘッダーを書き込む
        writer.writerow(['date', 'queue_name', 'queue_id', 'arrival_time', 'message_size', 'forced_expire', 'sender', 'recipient_domain', 'delay_reason'])
        current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')

        for item in data:
            for recipient in item['recipients']:
                recipient_domain = "@" + recipient['address'].split('@')[-1] if '@' in recipient['address'] else ''
                human_readable_arrival_time = datetime.fromtimestamp(item['arrival_time']).strftime('%Y-%m-%d %H:%M:%S')

                # CSVに書き込む行のデータを準備
                row = [
                    current_time,
                    item["queue_name"],
                    f'="{item["queue_id"]}"',  # Excelでの正しい文字列処理のためにフォーマット
                    human_readable_arrival_time,
                    item["message_size"],
                    item["forced_expire"],
                    item["sender"],
                    recipient_domain,
                    recipient.get("delay_reason", "")
                ]
                writer.writerow(row)

def load_yaml_config(config_path):
    """YAML設定ファイルを読み込む。"""
    if not config_path.exists():
        logging.error(f"設定ファイルが見つかりません: {config_path}")
        sys.exit(1)
    with config_path.open('r', encoding='utf-8') as file:
        config = yaml.safe_load(file)
    return config

def get_and_process_data(args, output_dir):
    """データの取得と処理を行う。"""
    json_data = fetch_postqueue_data(args.use_log_file, output_dir, args.config_path)
    if json_data is None:
        logging.error("データの取得に失敗しました。プログラムを終了します。")
        sys.exit(1)
    if not json_data:
        logging.info("滞留するデータはありません。")
        return [], 0
    return split_recipients(json_data, output_dir)

def filter_and_output_csv(data, config, output_dir, csv_file_path):
    """データをフィルタリングし、CSVファイルに出力する。"""
    filtered_data, mached_after_filter_count, exclude_after_filter_count = filter_data(data, config, output_dir)
    if not filtered_data:
        logging.info("条件に一致するデータはありませんでした。")
    else:
        write_csv(filtered_data, csv_file_path)
        logging.info(f"CSVファイルが出力されました: {csv_file_path}")

    return mached_after_filter_count, exclude_after_filter_count

def initialize_program(args):
    """プログラムの初期設定を行い、必要なパスを設定する。"""
    output_dir = setup_directories(args)  # csv_output_dirの削除に伴い変更

    # プログラム名に基づいた設定ファイル名とCSVファイル名の生成
    program_name = Path(__file__).stem
    csv_file_path = output_dir / f'{program_name}.csv'
    config_path = DEF_DIR / f'{program_name}.yaml'

    return output_dir, csv_file_path, config_path

def main():
    # ロギング設定
    setup_logging()

    # 引数解析
    args = parse_arguments()

    logging.info(f"プログラムを開始します。オプション: {args}")

    # プログラムの初期設定とパスの取得
    output_dir, csv_file_path, config_path = initialize_program(args)

    # 設定ファイルの読み込み
    config = load_yaml_config(config_path)

    # データの取得と処理
    processed_data, total_queue_count = get_and_process_data(args, output_dir)

    # データのフィルタリングとCSVへの出力
    if processed_data:
        mached_after_filter_count, exclude_after_filter_count = filter_and_output_csv(processed_data, config, output_dir, csv_file_path)
    else:
        logging.info("処理するデータがないため、フィルタリングとCSV出力をスキップします。")
        mached_after_filter_count, exclude_after_filter_count = 0, 0

    logging.info(f"キュー滞留件数: {total_queue_count}, 一致filter後件数: {mached_after_filter_count}, 除外filter後件数: {exclude_after_filter_count}")

    queue_summary_file = Path(output_dir) / QUEUE_SUMMARY_FILENAME
    logging.info(f"サマリファイルが出力されました: {queue_summary_file}")
    with open(queue_summary_file, 'w', encoding='utf-8') as f:
        f.write(f"TOTAL_QUEUE_COUNT={total_queue_count}\n")
        f.write(f"MACHED_AFTER_FILTER_COUNT={mached_after_filter_count}\n")
        f.write(f"EXCLUDE_AFTER_FILTER_COUNT={exclude_after_filter_count}\n")

    logging.info("プログラムを終了します。")

if __name__ == '__main__':
    main()
monitor_postfix_queue.yaml
---
match_filters:
    - Num: '0'
      queue_name: '.*'
      arrival_time_thresholdMinutes: 60
      sender: '.*'
      recipient_domain: '.*'
      delay_reason: '.*'
      delay_reason_count: 1
exception_filters:
    - Num: '0'
      queue_name: '.*'
      arrival_time_thresholdMinutes: 0
      sender: '.*'
      recipient_domain: '.*'
      delay_reason: 'Host or domain name not found. Name service error for name=.* type=MX: Host not found, try again'
      delay_reason_count: 10000
    - Num: '1'
      queue_name: '.*'
      arrival_time_thresholdMinutes: 60
      sender: '.*'
      recipient_domain: '.*'
      'recipient_domain_specific_filter': true
      delay_reason: 'lost connection with .*\[.*\] while receiving the initial server greeting'
      delay_reason_count: 2
    - Num: '2'
      queue_name: '.*'
      arrival_time_thresholdMinutes: 60
      sender: '.*'
      recipient_domain: '.*'
      'recipient_domain_specific_filter': true
      delay_reason: 'connect to .*\[.*\]:25: Connection timed out'
      delay_reason_count: 2
    - Num: '3'
      queue_name: '.*'
      arrival_time_thresholdMinutes: 60
      sender: '.*'
      recipient_domain: '.*'
      'recipient_domain_specific_filter': true
      delay_reason: 'host .*\[.*\] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 \[.*\] \(in reply to end of DATA command\)'
      delay_reason_count: 2

テストログ作成支援メモ

Host or domain name not found. Name service error for name=test1.com type=MX: Host not found, try again
Host or domain name not found. Name service error for name=test2.com type=MX: Host not found, try again
Host or domain name not found. Name service error for name=test3.com type=MX: Host not found, try again
lost connection with test1.com[1.1.1.1] while receiving the initial server greeting
lost connection with test2.com[2.2.2.2] while receiving the initial server greeting
lost connection with test3.com[3.3.3.3] while receiving the initial server greeting
connect to test1.com[1.1.1.1]:25: Connection timed out
connect to test2.com[2.2.2.2]:25: Connection timed out
connect to test3.com[3.3.3.3]:25: Connection timed out
host test1.com[1.1.1.1] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
host test2.com[2.2.2.2] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
host test3.com[3.3.3.3] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
Host or domain name not found. Name service error for name=test4.com type=MX: Host not found, try again
Host or domain name not found. Name service error for name=test5.com type=MX: Host not found, try again
lost connection with test4.com[4.4.4.4] while receiving the initial server greeting
lost connection with test5.com[5.5.5.5] while receiving the initial server greeting
connect to test4.com[4.4.4.4]:25: Connection timed out
connect to test5.com[5.5.5.5]:25: Connection timed out
host test4.com[4.4.4.4] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
host test5.com[5.5.5.5] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
Host or domain name not found. Name service error for name=test6.com type=MX: Host not found, try again
lost connection with test6.com[6.6.6.6] while receiving the initial server greeting
connect to test6.com[6.6.6.6]:25: Connection timed out
host test6.com[6.6.6.6] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0