python
エンコーディングを引数で指定可能にした
mask_mail_address.py
import argparse
import os
import sys
import re
import logging
import socket
from datetime import datetime
from multiprocessing import Pool, cpu_count
from pathlib import Path
import time
import shutil
import inspect
#===============================================================================
# ロギング設定
#===============================================================================
def setup_logging():
host_name = socket.gethostname()
program_name = os.path.basename(__file__)
log_file_path = Path(__file__).resolve().parent.parent / 'log' / 'pythonlog.log'
logger = logging.getLogger()
logger.setLevel(logging.INFO)
file_handler = logging.FileHandler(log_file_path, encoding='utf-8')
file_handler.setFormatter(logging.Formatter(f'%(asctime)s {host_name} [{program_name}:%(levelname)s] [%(process)d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
stream_handler = logging.StreamHandler(sys.stdout)
stream_handler.setFormatter(logging.Formatter(f'%(asctime)s {host_name} [{program_name}:%(levelname)s] [%(process)d] %(message)s', datefmt='%Y-%m-%d %H:%M:%S'))
logger.addHandler(file_handler)
logger.addHandler(stream_handler)
#===============================================================================
# 引数の解析
#===============================================================================
def parse_arguments():
parser = argparse.ArgumentParser(description='メールアドレスのローカルパートをマスク化します。')
parser.add_argument('-f', '--file', required=True, help='マスク化を行うログファイルのパス。')
parser.add_argument('-o', '--output', required=True, help='マスク化されたデータを出力するファイルの名前。')
parser.add_argument('-m', '--mode', choices=['append', 'overwrite'], default='overwrite', help='出力ファイルへの書き込みモード(appendまたはoverwrite)。デフォルトはoverwriteです。')
parser.add_argument('-p', '--processes', type=int, default=1, help='使用するプロセス数。デフォルトは1です')
parser.add_argument('-s', '--start-line', type=int, default=1, help='マスク化を開始する行番号。デフォルトは1(ファイルの始めから)。')
parser.add_argument('-e', '--end-line', type=int, help='マスク化を終了する行番号。指定しない場合はファイルの末尾まで処理します。')
parser.add_argument('--read-encoding', default='utf-8', help='ファイルの読み込みに使用する文字エンコーディング。デフォルトはutf-8です。')
parser.add_argument('--write-encoding', default='utf-8', help='ファイルの書き込みに使用する文字エンコーディング。デフォルトはutf-8です。')
return parser.parse_args()
#===============================================================================
# ユーティリティ関数
#===============================================================================
def get_emails_list_path():
"""
マスク処理をしないメールアドレスのリストが含まれるファイルのパスを生成して返します。
このファイルは、実行ファイルと同じ階層にあるdefディレクトリ内に位置していることを想定しています。
"""
return Path(__file__).resolve().parent.parent / 'def' / (Path(__file__).stem + '.lst')
def load_unmasked_emails(filepath):
"""
マスク処理をしないメールアドレスのリストを含むファイルからメールアドレスを読み込み、リストとして返します。
コメント行や空行は無視されます。
"""
try:
unmasked_emails = []
with open(filepath, 'r', encoding='utf-8') as file:
for line in file:
if line.strip() and not line.startswith('#'):
unmasked_emails.append(line.strip())
return unmasked_emails
except Exception as e:
logging.error(f"{e}, 関数: {inspect.currentframe().f_code.co_name}")
sys.exit(1)
def mask_email(content, unmasked_emails):
"""
与えられたテキスト内のメールアドレスをマスクします。
ただし、指定されたメールアドレスまたはメールアドレスリストに含まれる場合はマスクされません。
"""
try:
pattern = re.compile(r'[A-Za-z0-9.!#$%&\'*+/=?^_`{|}~"-]+@[A-Za-z0-9._-]+')
message_id_pattern = 'message-id=<'
message_id_length = len(message_id_pattern)
def replace_func(match):
email = match.group(0) # メールアドレス全体を取得
local_part, domain = email.split('@')
domain_with_at = f"@{domain}" # ドメインの前に@を付ける
# メールアドレスの直前の部分をチェックして message-id=< があるかを確認
preceding_text = content[match.start()-message_id_length:match.start()]
if preceding_text.lower() == message_id_pattern:
return email
# メールアドレス全体またはドメインがリストに含まれていればマスクしない
if email in unmasked_emails or domain_with_at in unmasked_emails:
return email
else:
logging.debug(f"マスク処理されたメールアドレス: {email} -> xxxxx@{domain}")
return f'xxxxx@{domain}' # ローカルパートをマスク
return re.sub(pattern, replace_func, content)
except Exception as e:
logging.error(f"{e}, 関数: {inspect.currentframe().f_code.co_name}")
sys.exit(1)
def process_lines(args):
"""
与えられた引数に基づいて、ファイル内の特定の行範囲を処理します。
この関数はマルチプロセスから呼び出され、各プロセスが異なる行範囲を処理することで、ファイルのマスク処理を並列に実行します。
"""
try:
start_line, end_line, file_path, output_path, unmasked_emails, read_encoding, write_encoding = args
with open(file_path, 'r', encoding=read_encoding, errors='replace') as f, open(output_path, 'w', encoding=write_encoding, errors='replace') as out_f:
for i, line in enumerate(f):
if start_line <= i < end_line:
masked_line = mask_email(line, unmasked_emails)
out_f.write(masked_line)
except Exception as e:
logging.error(f"{e}, 関数: {inspect.currentframe().f_code.co_name}")
def create_tasks(total_lines, processes, file_path, tmp_dir, unmasked_emails, read_encoding, write_encoding, start_line=0, end_line=None):
'''
説明:
全てのプロセスに対して処理すべき行範囲を計算し、それぞれのプロセスに割り当てるタスクリストを生成します。
各タスクは、処理する行の開始と終了のインデックス、入力ファイルパス、出力先の一時ファイルパス、許可されたメールアドレスリストを含むタプルです。
処理例:
例)
total_lines = 301
process = 3
・各プロセスが処理する行数
(301 + 3 - 1) // 3 = 303 // 3 = 101
しかし、総行数が301行なので、最後のプロセスが処理する行数は全ての行が均等に分配できないため、
最後のプロセスだけが少ない行数を処理を行う。
その為、最後のプロセスは、101 * 3 = 303行目まで処理を行うのではなく、
total_lines = 301行目まで処理を行う事とする。
・配列
tasks = [
(0, 101, args.file, tmp_dir / "part_0.txt", unmasked_emails), # 1つ目のプロセスのタスク
(101, 202, args.file, tmp_dir / "part_1.txt", unmasked_emails), # 2つ目のプロセスのタスク
(202, 301, args.file, tmp_dir / "part_2.txt", unmasked_emails) # 3つ目のプロセスのタスク
]
・line_per_processの計算式の理由
(total_lines // args.processes)だと(10 // 3) == 3になり、
3つ目のプロセスのタスクが9行目までの処理で終わってしまう。
よって、(total_lines + args.processes - 1) // args.processesとしている。
'''
try:
start_index = start_line - 1
if end_line is None:
end_index = total_lines
else:
end_index = min(end_line, total_lines)
if start_index >= end_index:
return []
adjusted_total_lines = end_index - start_index
lines_per_process = (adjusted_total_lines + processes - 1) // processes
tasks = [
(
max(start_index, i * lines_per_process + start_index),
min(end_index, (i + 1) * lines_per_process + start_index),
file_path,
tmp_dir / f"part_{i}.txt",
unmasked_emails,
read_encoding,
write_encoding
) for i in range(processes)
]
return tasks
except Exception as e:
logging.error(f"{e}, 関数: {inspect.currentframe().f_code.co_name}")
sys.exit(1)
def execute_tasks(tasks):
"""
multiprocessing.Poolを使用して、与えられたタスクリストを並列に実行します。
各プロセスはprocess_lines関数を用いて、指定された行範囲のマスク処理を行います。
"""
try:
if tasks and len(tasks) > 0: # タスクリストが空でないことを確認
with Pool(len(tasks)) as pool:
pool.map(process_lines, tasks)
else:
logging.info("実行するタスクがありません。")
except Exception as e:
logging.error(f"{e}, 関数: {inspect.currentframe().f_code.co_name}")
sys.exit(1)
#===============================================================================
# メイン関数
#===============================================================================
def main():
setup_logging()
args = parse_arguments()
logging.info(f"プログラムを開始します。オプション: {args}")
unmasked_emails = load_unmasked_emails(get_emails_list_path())
logging.info(f"マスク対象外メールアドレスリストが読み込まれました。メールアドレスリスト: {unmasked_emails}")
total_lines = sum(1 for line in open(args.file, 'r', encoding=args.read_encoding, errors='replace'))
logging.info(f"入力ファイルが読み込まれました。総行数: {total_lines}")
pid = os.getpid()
timestamp = datetime.now().strftime('%Y%m%d%H%M%S')
tmp_dir = Path(__file__).resolve().parent.parent / 'tmp' / f"{Path(__file__).stem}/{timestamp}_{pid}"
tmp_dir.mkdir(parents=True, exist_ok=True)
logging.info(f"tmpディレクトリが作成されました。PATH: {tmp_dir}")
tasks = create_tasks(total_lines, args.processes, args.file, tmp_dir, unmasked_emails, args.read_encoding, args.write_encoding, args.start_line, args.end_line)
logging.info(f"各プロセスに対するタスクが割り当てられました。タスク数: {len(tasks)}")
execute_tasks(tasks)
# マージ
if tasks:
file_open_mode = 'a' if args.mode == 'append' else 'w'
with open(args.output, file_open_mode, encoding=args.write_encoding, errors='replace') as outfile:
for i in range(args.processes):
part_path = tmp_dir / f"part_{i}.txt"
with open(part_path, 'r', encoding=args.write_encoding, errors='replace') as infile:
shutil.copyfileobj(infile, outfile)
os.remove(part_path)
logging.info(f"出力ファイルが保存されました。PATH: {args.output}")
else:
logging.info("タスクが実行されなかったため、マージ処理はスキップされました。")
if __name__ == '__main__':
start_time = time.time()
try:
main()
except Exception as e:
logging.error(f"{e}")
finally:
end_time = time.time()
execution_time = end_time - start_time
logging.info(f"プログラムを終了します。実行時間: {execution_time}秒")