import json
import logging
import re
import socket
import sys
from pathlib import Path
BASE_DIR = Path(__file__).resolve().parent.parent
LOG_DIR = BASE_DIR / 'log'
DEF_DIR = BASE_DIR / 'def'
TMP_DIR = BASE_DIR / 'tmp'
def setup_logging():
"""ロギングの設定を行う。"""
log_file_path = LOG_DIR / 'pythonlog.log'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
formatter = logging.Formatter(f'%(asctime)s {socket.gethostname()} [{Path(__file__).name}:%(levelname)s] [%(process)d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
file_handler = logging.FileHandler(str(log_file_path), encoding='utf-8')
file_handler.setFormatter(formatter)
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(formatter)
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
logging.info("ロギングが設定されました。")
def parse_arguments():
"""コマンドライン引数を解析する。"""
parser = argparse.ArgumentParser(description='Postfixキュー監視ツール')
parser.add_argument('--input-file', required=True, help='処理するJSONファイルのパスを指定します。')
parser.add_argument('--output-dir', required=True, help='結果を保存するディレクトリのパスを指定します。')
return parser.parse_args()
def load_json(file_path):
"""JSONファイルを読み込む。"""
with open(file_path, 'r', encoding='utf-8') as file:
data = json.load(file)
logging.info(f"ファイル {file_path} が正常に読み込まれました。")
return data
def is_monitored_func(reason, predefined_patterns):
"""理由が事前定義されたパターンに一致するかどうかをチェックする。"""
for pattern in predefined_patterns:
if pattern.match(reason):
return False
return True
def process_data(data, predefined_patterns):
"""データを処理してキューごとに分け、結果を辞書形式で返す。"""
queue_data = {}
true_counts = {}
false_counts = {}
for item in data:
queue_name = item["queue_name"]
if queue_name not in queue_data:
queue_data[queue_name] = []
true_counts[queue_name] = 0
false_counts[queue_name] = 0
is_monitored = is_monitored_func(item["delay_reason"], predefined_patterns)
converted_item = f'"{item["current_datetime"]}","{item["arriva_time_jst"]}","{item["queue_name"]}","{item["queue_id"]}",{item["sender"]},{item["address"]},{is_monitored},"{item["delay_reason"]}"'
queue_data[queue_name].append(converted_item)
if is_monitored:
true_counts[queue_name] += 1
else:
false_counts[queue_name] += 1
logging.info("データが正常に処理されました。")
return queue_data, true_counts, false_counts
def save_data(data, output_dir):
"""処理したデータをファイルに保存する。"""
for queue_name, items in data.items():
output_file_path = output_dir / f'02_{queue_name}.csv'
with open(output_file_path, 'w', encoding='utf-8') as output_file:
for line in items:
output_file.write(line + '\n')
logging.info(f"ファイル {output_file_path} にデータが保存されました。")
def save_summary(summary, output_dir):
"""集計結果をJSON形式で保存する。"""
summary_file_path = output_dir / 'summary.json'
with open(summary_file_path, 'w', encoding='utf-8') as summary_file:
json.dump(summary, summary_file, ensure_ascii=False, indent=4)
logging.info(f"集計結果がファイル {summary_file_path} に保存されました。")
def main():
setup_logging()
args = parse_arguments()
input_file_path = Path(args.input_file)
output_dir = Path(args.output_dir)
if not output_dir.exists():
logging.error(f"指定されたディレクトリが存在しません: {output_dir}")
sys.exit(1)
data = load_json(input_file_path)
predefined_patterns = [
re.compile(r"reason\d+"), # 例: "reason1", "reason2", "reason3" にマッチ
re.compile(r"delivery temp2"), # 追加のパターン
]
queue_data, true_counts, false_counts = process_data(data, predefined_patterns)
save_data(queue_data, output_dir)
summary = []
for queue_name in queue_data.keys():
summary.append({
"queue_name": queue_name,
"total_count": len(queue_data[queue_name]),
"monitored_count": true_counts[queue_name],
"unmonitored_count": false_counts[queue_name]
})
save_summary(summary, output_dir)
if __name__ == "__main__":
main()
Register as a new user and use Qiita more conveniently
- You get articles that match your needs
- You can efficiently read back useful information
- You can use dark theme