import json
import yaml
import csv
from pathlib import Path
from datetime import datetime
import re
import subprocess
import sys
import os
import socket
import logging
import argparse
from collections import Counter
from shutil import copy2
BASE_DIR = Path(__file__).resolve().parent.parent
LOG_DIR = BASE_DIR / 'log'
DEF_DIR = BASE_DIR / 'def'
TMP_DIR = BASE_DIR / 'tmp'
POSTQUEUE_J_01_FILENAME = "postqueue-j_01.json"
POSTQUEUE_J_02_FILENAME = "postqueue-j_02.json"
QUEUE_SUMMARY_FILENAME = "queue_summary_result.prf"
# ロギング設定
def setup_logging():
log_file_path = LOG_DIR / 'pythonlog.log'
logger = logging.getLogger()
file_handler = logging.FileHandler(str(log_file_path), encoding='utf-8')
formatter = logging.Formatter(f'%(asctime)s {socket.gethostname()} [{Path(__file__).name}:%(levelname)s] [%(process)d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
stream_handler = logging.StreamHandler(sys.stdout)
def parse_arguments():
parser = argparse.ArgumentParser(description='Postfixキュー監視ツール')
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('--use-log-file', nargs='?', default=None, help='`postqueue -j` コマンドの出力の代わりに指定したログファイルを使用します。パスを指定してください。')
group.add_argument('--config-path', help='Postfixキュー設定ディレクトリへのパス。このオプションが指定された場合、`postqueue -j -c {config_path}` コマンドが実行されます。')
parser.add_argument('--output-dir', help='結果を保存するディレクトリのパス。指定されない場合、デフォルトの出力ディレクトリが使用されます。', default=None)
return parser.parse_args()
# ユーティリティ関数
def load_json_objects_from_file(file_path):
logging.debug(f"{file_path} からJSONオブジェクトを読み込み開始します。")
data = []
with open(file_path, 'r') as file:
text = file.read()
decoder = json.JSONDecoder()
pos = 0
while pos < len(text):
obj, pos_new = decoder.raw_decode(text, pos)
pos = pos_new
while pos < len(text) and text[pos] in (' ', '\n', '\t', '\r'):
pos += 1
logging.info(f"{file_path} から{len(data)}件のJSONオブジェクトを読み込みました。")
except Exception as e:
logging.error(f"{file_path} の読み込み中にエラーが発生しました。", exc_info=True)
return data
def split_recipients(json_data, output_dir):
:param json_data: 元のJSONデータ
:param output_dir: 結果を保存するディレクトリのパス
split_data = [] # 分割後のデータを格納するリスト
for item in json_data:
for recipient in item["recipients"]:
new_item = item.copy()
new_item["recipients"] = [recipient]
# 結果をJSONファイルとして保存
output_file_path = Path(output_dir) / POSTQUEUE_J_02_FILENAME
with open(output_file_path, 'w', encoding='utf-8') as f:
json.dump(split_data, f, ensure_ascii=False, indent=4)
total_queue_count = len(split_data)
logging.info(f"ファイルが保存されました: {output_file_path}")
logging.info(f"分割結果のJSONオブジェクト数: {len(split_data)}件")
return split_data, total_queue_count
def fetch_postqueue_data(use_log_file=None, output_dir=None, config_path=None):
"""postqueue -jの出力を取得するか、指定されたログファイルを読み込む。"""
if use_log_file:
return process_log_file(use_log_file, output_dir)
elif config_path:
return execute_postqueue_command(config_path, output_dir)
return None
def process_log_file(log_file, output_dir):
output_file_path = output_dir / POSTQUEUE_J_01_FILENAME
copy2(log_file, output_file_path)
logging.info(f"ログファイルをコピーしました: {log_file} -> {output_file_path}")
return load_json_objects_from_file(output_file_path)
except Exception as e:
logging.error(f"ログファイルのコピー中にエラーが発生しました: {e}", exc_info=True)
return None
def execute_postqueue_command(config_path, output_dir):
"""`postqueue -j -c` コマンドを実行し、結果を処理する"""
output_file_path = output_dir / POSTQUEUE_J_01_FILENAME
result = subprocess.run(['postqueue', '-j', '-c', config_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)
if result.returncode == 0 and result.stdout:
logging.info(f"`postqueue -j -c {config_path}` コマンドの実行に成功しました。")
with open(output_file_path, 'w') as f:
return load_json_objects_from_file(output_file_path)
return []
except Exception as e:
logging.error("`postqueue -j -c` コマンドの実行中にエラーが発生しました。", exc_info=True)
return None
def match_entries(data, rule, output_dir, rule_type):
current_timestamp = datetime.now().timestamp()
domain_counts = Counter()
domain_matched_entries = {}
matched_entries = []
total_matched_entries = []
for index, entry in enumerate(data, start=1): # enumerateを使ってループのインデックスを取得
recipient_domain = next((recipient["address"].split('@')[-1] for recipient in entry["recipients"] if "@" in recipient["address"]), None)
delay_reason = next((recipient.get("delay_reason", "") for recipient in entry["recipients"]), None)
# arrival_time判定
if (current_timestamp - rule["arrival_time_thresholdMinutes"] * 60) <= datetime.fromtimestamp(int(entry["arrival_time"])).timestamp():
logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: arrival_time")
logging.debug(f"{(current_timestamp - rule['arrival_time_thresholdMinutes'] * 60)} <= {datetime.fromtimestamp(int(entry['arrival_time'])).timestamp()}")
# queue_name判定
if not re.match(rule["queue_name"], entry["queue_name"]):
logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: queue_name")
# sender判定
if not re.match(rule["sender"] + "$", entry["sender"]):
logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: sender")
# recipient_domain判定
if not re.match(rule["recipient_domain"].replace("@", ""), recipient_domain):
logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: recipient_domain")
# delay_reason判定
if not re.match(rule["delay_reason"], delay_reason):
logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, not append, 理由: delay_reason")
# ドメイン別フィルタリング or 全ドメイン
if rule.get("recipient_domain_specific_filter", False):
# ドメイン毎にカウント
domain_counts[recipient_domain] += 1
if recipient_domain not in domain_matched_entries:
domain_matched_entries[recipient_domain] = []
logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, domain: {recipient_domain}, action: initialized, count: 1")
logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, domain: {recipient_domain}, action: incremented, count: {domain_counts[recipient_domain]}")
# 全ドメイン毎にカウント
logging.debug(f"{rule_type}, Num: {rule['Num']}, index: {index}, append")
# ドメイン別フィルタリング or 全ドメイン
if rule.get("recipient_domain_specific_filter", False):
# ドメイン別フィルタリング
for domain, count in domain_counts.items():
# 一致フィルタと除外フィルタでdelay_reson_countの意味を逆転させる
if rule_type == 'match_filters':
if count >= rule["delay_reason_count"]:
logging.debug(f"{rule_type}, Num: {rule['Num']}, domain: {domain}, count: {count} >= delay_reason_count: {rule['delay_reason_count']}, extend")
logging.debug(f"{rule_type}, Num: {rule['Num']}, domain: {domain}, count: {count} < delay_reason_count: {rule['delay_reason_count']}, not added")
elif rule_type == 'exception_filters':
if count <= rule["delay_reason_count"]:
logging.debug(f"{rule_type}, Num: {rule['Num']}, domain: {domain}, count: {count} >= delay_reason_count: {rule['delay_reason_count']}, extend")
logging.debug(f"{rule_type}, Num: {rule['Num']}, domain: {domain}, count: {count} < delay_reason_count: {rule['delay_reason_count']}, not added")
# 全ドメイン
# 一致フィルタと除外フィルタでdelay_reson_countの意味を逆転させる
if rule_type == 'match_filters':
if len(total_matched_entries) >= rule["delay_reason_count"]:
logging.debug(f"{rule_type}, Num: {rule['Num']}, count: {len(total_matched_entries)} >= delay_reason_count: {rule['delay_reason_count']}, extend")
elif rule_type == 'exception_filters':
if len(total_matched_entries) <= rule["delay_reason_count"]:
logging.debug(f"{rule_type}, Num: {rule['Num']}, count: {len(total_matched_entries)} >= delay_reason_count: {rule['delay_reason_count']}, extend")
# 出力ファイルへの書き込み
logging.info(f"{rule_type}, Num: {rule['Num']}, 一致件数: {len(matched_entries)}")
if matched_entries:
matched_entries_file_path = output_dir / f"{rule_type}_{rule['Num']}.json"
with open(matched_entries_file_path, 'w') as f:
json.dump(matched_entries, f, indent=2)
logging.info(f"ファイルに書き込みました: {matched_entries_file_path}, {rule_type}, Num: {rule['Num']}")
return matched_entries
def setup_directories(args):
# 一時ファイル用のディレクトリを準備
if args.output_dir:
output_dir = Path(args.output_dir).resolve()
# コマンドライン引数で出力ディレクトリが指定されていない場合のデフォルトのディレクトリ
current_time_formatted = datetime.now().strftime("%Y%m%d%H%M%S")
pid = os.getpid()
output_dir = TMP_DIR / f'{Path(__file__).stem}' / f'{current_time_formatted}_{pid}'
output_dir.mkdir(parents=True, exist_ok=True)
# LOG_DIRが存在しない場合は作成する
LOG_DIR.mkdir(parents=True, exist_ok=True)
return output_dir
def filter_data(json_data, config, output_dir):
matched_data = []
filtered_data = json_data[:]
# マッチフィルタの適用
if 'match_filters' in config:
for rule in config['match_filters']:
matched_entries = match_entries(filtered_data, rule, output_dir, "match_filters")
# 重複を避けるために、既にマッチしたエントリは追加しない
for entry in matched_entries:
if entry not in matched_data:
# マッチフィルタ後の件数
mached_after_filter_count = len(matched_data)
# matched_dataに何もない場合、例外フィルタは適用されず、元のデータが返される
if not matched_data:
return [], 0, 0
# 例外条件の処理
exception_filters = config.get('exception_filters', []) # デフォルト値として空のリストを設定
for rule in exception_filters:
exception_matched = match_entries(filtered_data, rule, output_dir, "exception_filters")
matched_data = [entry for entry in matched_data if entry not in exception_matched]
# マッチフィルタ後の件数
exclude_after_filter_count = len(matched_data)
return matched_data ,mached_after_filter_count, exclude_after_filter_count
def write_csv(data, csv_file_path):
with open(csv_file_path, 'w', newline='') as file:
writer = csv.writer(file, lineterminator='\n')
# CSVのヘッダーを書き込む
writer.writerow(['date', 'queue_name', 'queue_id', 'arrival_time', 'message_size', 'forced_expire', 'sender', 'recipient_domain', 'delay_reason'])
current_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
for item in data:
for recipient in item['recipients']:
recipient_domain = "@" + recipient['address'].split('@')[-1] if '@' in recipient['address'] else ''
human_readable_arrival_time = datetime.fromtimestamp(item['arrival_time']).strftime('%Y-%m-%d %H:%M:%S')
# CSVに書き込む行のデータを準備
row = [
f'="{item["queue_id"]}"', # Excelでの正しい文字列処理のためにフォーマット
recipient.get("delay_reason", "")
def load_yaml_config(config_path):
if not config_path.exists():
logging.error(f"設定ファイルが見つかりません: {config_path}")
with config_path.open('r', encoding='utf-8') as file:
config = yaml.safe_load(file)
return config
def get_and_process_data(args, output_dir):
json_data = fetch_postqueue_data(args.use_log_file, output_dir, args.config_path)
if json_data is None:
if not json_data:
return [], 0
return split_recipients(json_data, output_dir)
def filter_and_output_csv(data, config, output_dir, csv_file_path):
filtered_data, mached_after_filter_count, exclude_after_filter_count = filter_data(data, config, output_dir)
if not filtered_data:
write_csv(filtered_data, csv_file_path)
logging.info(f"CSVファイルが出力されました: {csv_file_path}")
return mached_after_filter_count, exclude_after_filter_count
def initialize_program(args):
output_dir = setup_directories(args) # csv_output_dirの削除に伴い変更
# プログラム名に基づいた設定ファイル名とCSVファイル名の生成
program_name = Path(__file__).stem
csv_file_path = output_dir / f'{program_name}.csv'
config_path = DEF_DIR / f'{program_name}.yaml'
return output_dir, csv_file_path, config_path
def main():
# ロギング設定
# 引数解析
args = parse_arguments()
logging.info(f"プログラムを開始します。オプション: {args}")
# プログラムの初期設定とパスの取得
output_dir, csv_file_path, config_path = initialize_program(args)
# 設定ファイルの読み込み
config = load_yaml_config(config_path)
# データの取得と処理
processed_data, total_queue_count = get_and_process_data(args, output_dir)
# データのフィルタリングとCSVへの出力
if processed_data:
mached_after_filter_count, exclude_after_filter_count = filter_and_output_csv(processed_data, config, output_dir, csv_file_path)
mached_after_filter_count, exclude_after_filter_count = 0, 0
logging.info(f"キュー滞留件数: {total_queue_count}, 一致filter後件数: {mached_after_filter_count}, 除外filter後件数: {exclude_after_filter_count}")
queue_summary_file = Path(output_dir) / QUEUE_SUMMARY_FILENAME
logging.info(f"サマリファイルが出力されました: {queue_summary_file}")
with open(queue_summary_file, 'w', encoding='utf-8') as f:
if __name__ == '__main__':
- Num: '0'
queue_name: '.*'
arrival_time_thresholdMinutes: 60
sender: '.*'
recipient_domain: '.*'
delay_reason: '.*'
delay_reason_count: 1
- Num: '0'
queue_name: '.*'
arrival_time_thresholdMinutes: 0
sender: '.*'
recipient_domain: '.*'
delay_reason: 'Host or domain name not found. Name service error for name=.* type=MX: Host not found, try again'
delay_reason_count: 10000
- Num: '1'
queue_name: '.*'
arrival_time_thresholdMinutes: 60
sender: '.*'
recipient_domain: '.*'
'recipient_domain_specific_filter': true
delay_reason: 'lost connection with .*\[.*\] while receiving the initial server greeting'
delay_reason_count: 2
- Num: '2'
queue_name: '.*'
arrival_time_thresholdMinutes: 60
sender: '.*'
recipient_domain: '.*'
'recipient_domain_specific_filter': true
delay_reason: 'connect to .*\[.*\]:25: Connection timed out'
delay_reason_count: 2
- Num: '3'
queue_name: '.*'
arrival_time_thresholdMinutes: 60
sender: '.*'
recipient_domain: '.*'
'recipient_domain_specific_filter': true
delay_reason: 'host .*\[.*\] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 \[.*\] \(in reply to end of DATA command\)'
delay_reason_count: 2
Host or domain name not found. Name service error for name=test1.com type=MX: Host not found, try again
Host or domain name not found. Name service error for name=test2.com type=MX: Host not found, try again
Host or domain name not found. Name service error for name=test3.com type=MX: Host not found, try again
lost connection with test1.com[] while receiving the initial server greeting
lost connection with test2.com[] while receiving the initial server greeting
lost connection with test3.com[] while receiving the initial server greeting
connect to test1.com[]:25: Connection timed out
connect to test2.com[]:25: Connection timed out
connect to test3.com[]:25: Connection timed out
host test1.com[] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
host test2.com[] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
host test3.com[] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
Host or domain name not found. Name service error for name=test4.com type=MX: Host not found, try again
Host or domain name not found. Name service error for name=test5.com type=MX: Host not found, try again
lost connection with test4.com[] while receiving the initial server greeting
lost connection with test5.com[] while receiving the initial server greeting
connect to test4.com[]:25: Connection timed out
connect to test5.com[]:25: Connection timed out
host test4.com[] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
host test5.com[] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)
Host or domain name not found. Name service error for name=test6.com type=MX: Host not found, try again
lost connection with test6.com[] while receiving the initial server greeting
connect to test6.com[]:25: Connection timed out
host test6.com[] said: 451 4.4.4 Mail received as unauthenticated, incoming to a recipient domain configured in a hosted tenant which has no mail-enabled subscriptions. ATTR5 [TY1PEPF0000BAD7] (in reply to end of DATA command)