1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

CSVデータマスキングツールをつくった

Posted at

概要

このツールは、CSVファイル内の個人情報や機密データを安全にマスキングするためのGUIアプリケーションです。
Fakerライブラリを使用して元のデータと同じフォーマットを保ちながら、偽のデータに置き換えることができます。

主な機能

  • 複数のCSVファイルを一括処理できる
  • カラム名に基づく自動マスキング項目の検出してサジェストしてくれる
  • データ増幅機能がある
  • 日本語データ(漢字、ひらがな、カタカナ)のマスキングに対応
  • 電話番号やクレジットカード番号など、特定のフォーマットを保持したマスキング

image.png

Fakerライブラリについて

Fakerライブラリは、開発やテスト目的でダミーデータを簡単に生成するためのツールです。様々なプログラミング言語で実装されており、特にPythonやJavaScript、PHPなどで広く使用されています。

インストール方法

依存ライブラリのインストール

pip install faker pandas jaconv chardet psutil

実行方法

python csv_masking_faker_app.py

使用方法

  1. 「ファイルを選択」ボタンをクリックし、マスキングしたいCSVファイルを選択します。

    • 複数のファイルを選択することも可能です。
  2. 必要に応じてデータ増幅倍率を設定します。

    • デフォルトは1(増幅なし)です。整数値を入力してください。
  3. 各ファイルごとにタブが作成されます。

    • 各タブでマスキング対象のカラムと使用するFaker関数を確認・編集できます。
  4. マスキングしたいカラムにチェックを入れ、適切なFaker関数を選択します。

    • デフォルトでカラム名に基づいて自動的に選択されています。
  5. 「マスキングを実行」ボタンをクリックすると、マスキング処理が開始されます。

  6. マスキング処理が完了すると、元のファイル名に日時が付加された新しいCSVファイルが生成されます。

    • 例:test_data_masked_20231005_153045.csv

コード

import tkinter as tk
from tkinter import filedialog, messagebox, ttk
from faker import Faker
import pandas as pd
import os
import re
import traceback
import logging
from datetime import datetime
import jaconv
import threading
import queue
import chardet
import gc
import sys

# ロギング設定
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    filename='masking_app.log',
    filemode='a'
)
logger = logging.getLogger('MaskingApp')

class MaskingApp:
    def __init__(self, root):
        self.root = root
        self.root.title("CSVデータマスキングツール")
        self.root.geometry("900x700")  # ウィンドウサイズを拡大
        
        # エラー回復メカニズム
        self.error_queue = queue.Queue()
        
        # ロケール設定
        self.locale = 'ja_JP'
        
        try:
            self.fake = Faker(self.locale)
        except Exception as e:
            logger.error(f"Faker初期化エラー: {str(e)}")
            self.fake = Faker()  # フォールバック

        # 最大許容データサイズ (MB)
        self.max_data_size_mb = 500
        
        # キーワードとFaker関数のマッピング
        self.keyword_to_faker = {
            ('id', 'identifier', '番号', 'ナンバー', 'no', 'number', 'code', 'コード', '識別子', 'uid', '管理番号'): 'unique_id',
            ('first_name', '', 'first', 'firstname', 'given_name', 'given', '下の名前', 'ファースト', '名前'): 'first_name',
            ('last_name', '', '苗字', 'surname', 'lastname', 'family_name', 'family', 'ファミリーネーム', '', 'せい'): 'last_name',
            ('name', '氏名', 'フルネーム', 'full_name', 'fullname', '名義', '名称', 'なまえ', '名義人', '氏名等', '人名'): 'full_name',
            ('kana_name', 'カナ氏名', 'フリガナ', 'カナ', 'ふりがな', '読み', 'よみがな', 'kana', 'yomigana', 'フリガナ等', 'カナ名義'): 'kana_name',
            ('email', 'mail', 'メール', 'メールアドレス', 'メルアド', 'e-mail', 'emailaddress', 'mail_address', 'メアド', '電子メール'): 'email',
            ('phone', 'tel', '電話', '携帯', 'telephone', 'mobile', 'cell', 'cellphone', '電話番号', '携帯番号', '連絡先', 'phone_number', '通話番号'): 'phone_number',
            ('address', 'addr', 'street', '住所', '番地', 'location', '所在地', '現住所', '居住地', 'home_address', '場所', '詳細住所', 'full_address'): 'address',
            ('city', '', '', '', '市区町村', 'town', 'city_name', 'municipality', '自治体', '市名', '区名', '町名'): 'city',
            ('prefecture', 'state', 'province', '都道府県', 'pref', 'region', '地域', '都道府県名', '', '', '', ''): 'prefecture',
            ('zipcode', 'zip', 'postal', '郵便番号', 'postal_code', 'postcode', 'zip_code', '', 'postal_number', '郵便'): 'postcode',
            ('company', 'corporation', '会社名', '企業', '企業名', '組織', '組織名', 'firm', 'organization', '事業所', '事業者', '会社', '法人', '法人名'): 'company',
            ('ssn', 'social_security_number', 'social_security', 'マイナンバー', '個人番号', '保険証番号', '社会保障番号', '保険番号', 'insurance_number'): 'ssn',
            ('birth', 'birthday', 'dob', '生年月日', 'date_of_birth', '誕生日', '生れ', '誕生', 'born_date', '出生日', '生まれ'): 'date_of_birth',
            ('date', '日付', 'day', '年月日', '', '期日', '年月', 'calendar', 'datetime', 'データ日', '登録日', '作成日', '更新日'): 'date',
            ('credit_card_number', 'cc_number', 'クレジットカード番号', 'card_number', 'カード番号', 'クレカ番号', 'カードナンバー', 'クレジットカードナンバー'): 'credit_card_number',
            ('credit_card_provider', 'cc_provider', 'card_provider', 'カード会社', 'クレジット会社', 'クレカ会社', 'issuer', '発行会社', 'ブランド'): 'credit_card_provider',
            ('credit_card_security_code', 'cc_security_code', 'cvv', 'cvc', 'セキュリティコード', '認証番号', 'security_code', 'セキュリティ番号', 'カードセキュリティ'): 'credit_card_security_code',
            ('credit_card_expire', 'cc_expire', '有効期限', 'expiry', 'expiration', 'expire_date', 'カード有効期限', 'クレカ有効期限', '期限', 'expiry_date'): 'credit_card_expire',
            ('university', 'college', 'school', '大学', '学校', '教育機関', '学院', '学園', '高校', '中学', '小学校', '学府', '学歴'): 'university',
            ('url', 'website', 'homepage', 'web', 'サイト', 'ウェブサイト', 'ホームページ', 'サイトURL', 'web_address', 'ウェブ', 'リンク', 'link'): 'url',
            ('uuid', 'guid', 'uuid4', 'グローバルID', 'ユニークID', 'グローバル識別子', 'ユニバーサルID', 'universal_id', 'global_id'): 'uuid4',
            ('job', 'position', 'occupation', '職業', '職種', '仕事', '役職', '職名', '肩書き', '職業名', 'career', '職歴'): 'job',
            ('username', 'user_name', 'login', 'userid', 'user_id', 'アカウント', 'ユーザー名', 'ユーザID', 'ログインID', 'account'): 'user_name',
            ('password', 'pass', 'pwd', 'パスワード', '暗証番号', 'secret', 'passphrase', 'passcode', '合言葉', 'パス'): 'password',
            ('age', '年齢', '年代', 'years_old', '', 'age_in_years', '年令', 'aged'): 'random_int',
            ('gender', 'sex', '性別', '男女', 'gender_type', 'sex_type', '', 'male_female'): 'random_element',
            ('nationality', '国籍', 'country', '出身国', '', 'nation', '出身地', 'origin_country', '母国'): 'country',
            ('blood_type', '血液型', 'blood', 'abo', '血型', 'bloodtype', '血液'): 'random_element',
            ('height', '身長', 'tall', 'stature', '高さ', 'cm', 'センチ'): 'random_int',
            ('weight', '体重', 'mass', '重さ', 'kg', 'キロ'): 'random_int',
            ('color', 'colour', '', 'カラー', '色彩', '色調'): 'color_name',
            ('note', 'memo', 'comment', '備考', 'コメント', '注釈', '特記', '記事', '説明', 'description', '解説'): 'text',
        }

        # Faker関数と日本語の説明のマッピング
        self.faker_function_descriptions = {
            'unique_id': 'ユニークID',
            'full_name': '氏名',
            'kana_name': 'カナ氏名',
            'first_name': '',
            'last_name': '',
            'name': '名前',
            'email': 'メールアドレス',
            'phone_number': '電話番号',
            'address': '住所',
            'city': '市町村',
            'prefecture': '都道府県',
            'postcode': '郵便番号',
            'company': '会社名',
            'ssn': '社会保障番号',
            'date_of_birth': '生年月日',
            'date': '日付',
            'credit_card_number': 'クレジットカード番号',
            'credit_card_provider': 'クレジットカード会社',
            'credit_card_security_code': 'クレジットカードセキュリティコード',
            'credit_card_expire': 'クレジットカード有効期限',
            'university': '大学名',
            'url': 'URL',
            'uuid4': 'UUID',
        }
        
        # 利用可能なFaker関数のリスト(日本語の説明を含む)
        self.available_faker_functions = sorted(set(self.faker_function_descriptions.keys()))
        self.available_faker_functions_jp = [''] + [self.faker_function_descriptions.get(func, func) for func in self.available_faker_functions]

        # 中断フラグ
        self.cancel_processing = False
        
        self.create_widgets()
        
        # アプリケーション終了時の処理を設定
        self.root.protocol("WM_DELETE_WINDOW", self.on_closing)
        
        # 定期的なエラーチェック
        self.check_error_queue()

    def check_error_queue(self):
        """エラーキューを定期的にチェックする"""
        try:
            while not self.error_queue.empty():
                error = self.error_queue.get_nowait()
                messagebox.showerror("エラー", error)
        except Exception as e:
            logger.error(f"エラーキュー処理中のエラー: {str(e)}")
        finally:
            # 再スケジュール
            self.root.after(100, self.check_error_queue)

    def on_closing(self):
        """アプリケーション終了時の処理"""
        try:
            # 実行中のタスクがあれば中断
            self.cancel_processing = True
            # メモリクリーンアップ
            self.cleanup_resources()
            # アプリケーション終了
            self.root.destroy()
        except Exception as e:
            logger.error(f"アプリケーション終了時のエラー: {str(e)}")
            self.root.destroy()
    
    def cleanup_resources(self):
        """メモリリソースのクリーンアップ"""
        try:
            # 大きなデータ構造の参照を解放
            if hasattr(self, 'file_data'):
                for file_info in self.file_data:
                    if 'df' in file_info:
                        del file_info['df']
                    if 'id_mappings' in file_info:
                        file_info['id_mappings'].clear()
            
            # ガベージコレクションを明示的に実行
            gc.collect()
        except Exception as e:
            logger.error(f"リソースクリーンアップエラー: {str(e)}")

    def create_widgets(self):
        # メインフレーム
        main_frame = tk.Frame(self.root)
        main_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=10)

        # タイトルラベル
        title_label = tk.Label(main_frame, text="CSVデータマスキングツール", font=("Arial", 16))
        title_label.pack(pady=10)

        # CSVファイル選択ボタンとデータ増幅倍率の入力フィールド
        select_frame = tk.Frame(main_frame)
        select_frame.pack(pady=5)

        self.csv_file_label = tk.Label(select_frame, text="CSVファイルを選択してください。", font=("Arial", 12))
        self.csv_file_label.pack(side=tk.LEFT, padx=5)

        self.select_csv_button = tk.Button(select_frame, text="ファイルを選択", command=self.select_csv_files)
        self.select_csv_button.pack(side=tk.LEFT, padx=5)

        # データ増幅倍率の入力フィールド
        amplify_frame = tk.Frame(main_frame)
        amplify_frame.pack(pady=5)

        amplify_label = tk.Label(amplify_frame, text="データ増幅倍率(整数):", font=("Arial", 12))
        amplify_label.pack(side=tk.LEFT, padx=5)

        self.amplify_entry = tk.Entry(amplify_frame, width=5)
        self.amplify_entry.insert(0, "1")  # デフォルト値は1(増幅なし)
        self.amplify_entry.pack(side=tk.LEFT, padx=5)

        # Notebookを使用して、各ファイルのタブを作成
        self.notebook = ttk.Notebook(main_frame)
        self.notebook.pack(fill=tk.BOTH, expand=True, padx=5, pady=10)

        # マスキング実行ボタン
        button_frame = tk.Frame(main_frame)
        button_frame.pack(pady=10)

        self.mask_button = tk.Button(button_frame, text="マスキングを実行", command=self.mask_data, state=tk.DISABLED, width=15, bg="#4CAF50", font=("Arial", 12))
        self.mask_button.pack(side=tk.LEFT, padx=5)

        # Faker関数の説明ボタン
        self.help_button = tk.Button(button_frame, text="Faker関数の説明", command=self.show_faker_help, width=15, font=("Arial", 12))
        self.help_button.pack(side=tk.LEFT, padx=5)

        # **使用方法ボタンの追加**
        self.usage_button = tk.Button(button_frame, text="使用方法", command=self.show_usage, width=15, font=("Arial", 12))
        self.usage_button.pack(side=tk.LEFT, padx=5)

        # ファイルごとのデータを保持
        self.file_data = []
        
        # キャンセルボタンの追加
        self.cancel_button = tk.Button(button_frame, text="処理中断", command=self.cancel_operation, state=tk.DISABLED, width=15, font=("Arial", 12))
        self.cancel_button.pack(side=tk.LEFT, padx=5)
        
        # プログレスバーの追加
        self.progress_frame = tk.Frame(main_frame)
        self.progress_frame.pack(pady=5, fill=tk.X)
        self.progress_var = tk.DoubleVar()
        self.progress_bar = ttk.Progressbar(self.progress_frame, variable=self.progress_var, maximum=100)
        self.progress_bar.pack(fill=tk.X, padx=10)
        self.progress_label = tk.Label(self.progress_frame, text="準備完了")
        self.progress_label.pack(pady=2)
        
        # ファイルごとのデータを保持
        self.file_data = []

    def cancel_operation(self):
        """処理を中断する"""
        self.cancel_processing = True
        self.cancel_button.config(state=tk.DISABLED)
        self.progress_label.config(text="処理を中断しています...")

    def select_csv_files(self):
        """CSVファイルを選択する"""
        try:
            self.csv_file_paths = filedialog.askopenfilenames(filetypes=[("CSV Files", "*.csv"), ("All Files", "*.*")])
            if self.csv_file_paths:
                self.mask_button.config(state=tk.NORMAL)
                self.csv_file_label.config(text=f"{len(self.csv_file_paths)} 個のファイルが選択されました。")
                # 別スレッドでファイル読み込み
                threading.Thread(target=self.load_files_thread, daemon=True).start()
                self.progress_label.config(text="ファイル読み込み中...")
                self.cancel_button.config(state=tk.NORMAL)
        except Exception as e:
            logger.error(f"ファイル選択エラー: {str(e)}")
            messagebox.showerror("エラー", f"ファイル選択中にエラーが発生しました:\n{str(e)}")

    def load_files_thread(self):
        """別スレッドでファイルを読み込む"""
        try:
            self.cancel_processing = False
            # 既存のタブを削除
            self.root.after(0, self.clear_tabs)
            
            self.file_data = []
            total_files = len(self.csv_file_paths)
            
            for i, file_path in enumerate(self.csv_file_paths):
                if self.cancel_processing:
                    self.root.after(0, lambda: self.progress_label.config(text="ファイル読み込みが中断されました"))
                    self.root.after(0, lambda: self.cancel_button.config(state=tk.DISABLED))
                    return
                
                # ファイルサイズチェック
                file_size_mb = os.path.getsize(file_path) / (1024 * 1024)
                if file_size_mb > self.max_data_size_mb:
                    error_msg = f"ファイルサイズが大きすぎます: {file_path} ({file_size_mb:.2f}MB > {self.max_data_size_mb}MB)"
                    self.error_queue.put(error_msg)
                    logger.warning(error_msg)
                    continue
                
                # エンコーディング検出
                encoding = self.detect_file_encoding(file_path)
                
                try:
                    # データフレーム読み込み
                    df = pd.read_csv(file_path, encoding=encoding)
                    
                    # NaNを適切に処理
                    df = df.fillna('')
                    
                    file_info = {
                        'file_path': file_path,
                        'df': df,
                        'column_vars': {},
                        'function_vars': {},
                        'tab': None,
                        'id_mappings': {},
                        'encoding': encoding
                    }
                    
                    self.file_data.append(file_info)
                    
                    # UI更新はメインスレッドで
                    self.root.after(0, lambda f=file_info: self.create_file_tab(f))
                    
                except Exception as e:
                    error_msg = f"ファイル読み込みエラー ({file_path}): {str(e)}"
                    self.error_queue.put(error_msg)
                    logger.error(f"{error_msg}\n{traceback.format_exc()}")
                
                # 進捗更新
                progress = (i + 1) / total_files * 100
                self.root.after(0, lambda p=progress: self.update_progress(p, f"ファイル読み込み中... ({i+1}/{total_files})"))
            
            self.root.after(0, lambda: self.progress_label.config(text="ファイル読み込みが完了しました"))
            self.root.after(0, lambda: self.cancel_button.config(state=tk.DISABLED))
            
        except Exception as e:
            error_msg = f"ファイル読み込み処理エラー: {str(e)}"
            self.error_queue.put(error_msg)
            logger.error(f"{error_msg}\n{traceback.format_exc()}")
            self.root.after(0, lambda: self.cancel_button.config(state=tk.DISABLED))
    
    def detect_file_encoding(self, file_path):
        """ファイルのエンコーディングを検出する"""
        try:
            with open(file_path, 'rb') as f:
                result = chardet.detect(f.read(10000))  # 最初の10000バイトでエンコーディングを推測
            
            encoding = result['encoding']
            confidence = result['confidence']
            
            if confidence < 0.7:  # 信頼度が低い場合
                logger.warning(f"エンコーディング検出の信頼度が低い: {file_path}, {encoding}, 信頼度: {confidence}")
                return 'utf-8'  # デフォルトはUTF-8
            
            return encoding
        except Exception as e:
            logger.error(f"エンコーディング検出エラー: {str(e)}")
            return 'utf-8'  # エラーの場合はUTF-8をデフォルトとする

    def clear_tabs(self):
        """既存のタブをすべて削除"""
        for tab in self.notebook.tabs():
            self.notebook.forget(tab)

    def create_file_tab(self, file_info):
        """ファイルタブを作成"""
        df = file_info['df']
        tab = tk.Frame(self.notebook)
        self.notebook.add(tab, text=os.path.basename(file_info['file_path']))
        file_info['tab'] = tab
        self.show_masking_options(file_info)

    def update_progress(self, value, text="処理中..."):
        """プログレスバーと進捗ラベルを更新"""
        self.progress_var.set(value)
        self.progress_label.config(text=text)

    def show_masking_options(self, file_info):
        df = file_info['df']
        tab = file_info['tab']

        # スクロール可能なキャンバス
        canvas = tk.Canvas(tab)
        canvas.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)

        scrollbar = ttk.Scrollbar(tab, orient="vertical", command=canvas.yview)
        scrollbar.pack(side=tk.RIGHT, fill="y")

        canvas.configure(yscrollcommand=scrollbar.set)
        canvas.bind('<Configure>', lambda e: canvas.configure(scrollregion=canvas.bbox('all')))

        inner_frame = tk.Frame(canvas)
        canvas.create_window((0, 0), window=inner_frame, anchor='nw')

        masking_columns = self.find_columns_to_mask(df.columns)

        header_frame = tk.Frame(inner_frame)
        header_frame.pack(fill=tk.X, padx=5, pady=5)

        tk.Label(header_frame, text="マスク", width=5, anchor='w', font=("Arial", 10, "bold")).grid(row=0, column=0)
        tk.Label(header_frame, text="カラム名", width=30, anchor='w', font=("Arial", 10, "bold")).grid(row=0, column=1)
        tk.Label(header_frame, text="Faker関数", width=25, anchor='w', font=("Arial", 10, "bold")).grid(row=0, column=2)

        separator = ttk.Separator(inner_frame, orient='horizontal')
        separator.pack(fill=tk.X, padx=5, pady=5)

        for idx, column in enumerate(df.columns):
            var = tk.BooleanVar()
            function_var = tk.StringVar()
            initial_function = masking_columns.get(column, '')
            function_var.set(self.faker_function_descriptions.get(initial_function, ''))

            frame = tk.Frame(inner_frame)
            frame.pack(fill=tk.X, padx=5, pady=2)

            checkbox = tk.Checkbutton(frame, variable=var)
            checkbox.grid(row=idx, column=0, sticky='w')

            label = tk.Label(frame, text=column, width=30, anchor='w')
            label.grid(row=idx, column=1, sticky='w')

            function_menu = ttk.Combobox(frame, textvariable=function_var, values=self.available_faker_functions_jp, state='readonly', width=25)
            function_menu.grid(row=idx, column=2, sticky='w')

            if column in masking_columns:
                var.set(True)

            file_info['column_vars'][column] = var
            file_info['function_vars'][column] = function_var

    def mask_data(self):
        """データマスキング処理を実行"""
        try:
            # 増幅倍率のバリデーション
            try:
                amplify_factor = int(self.amplify_entry.get())
                if amplify_factor < 1 or amplify_factor > 100:  # 上限を設定
                    raise ValueError("範囲外")
            except ValueError:
                messagebox.showwarning("警告", "データ増幅倍率には1以上100以下の整数を入力してください。")
                return
            
            # 選択されたカラムのチェック
            has_selected_columns = False
            for file_info in self.file_data:
                column_vars = file_info['column_vars']
                selected_columns = [column for column, var in column_vars.items() if var.get()]
                if selected_columns:
                    has_selected_columns = True
                    break
            
            if not has_selected_columns:
                messagebox.showwarning("警告", "マスキングするカラムが選択されていません。")
                return
            
            # 処理開始
            self.cancel_processing = False
            self.cancel_button.config(state=tk.NORMAL)
            self.mask_button.config(state=tk.DISABLED)
            
            # 別スレッドで処理実行
            threading.Thread(target=self.mask_data_thread, args=(amplify_factor,), daemon=True).start()
            
        except Exception as e:
            logger.error(f"マスキング前処理エラー: {str(e)}\n{traceback.format_exc()}")
            messagebox.showerror("エラー", f"マスキング処理の準備中にエラーが発生しました:\n{str(e)}")
            self.cancel_button.config(state=tk.DISABLED)
            self.mask_button.config(state=tk.NORMAL)

    def mask_data_thread(self, amplify_factor):
        """別スレッドでマスキング処理を実行"""
        try:
            total_files = len(self.file_data)
            processed_files = 0
            
            for file_info in self.file_data:
                if self.cancel_processing:
                    self.root.after(0, lambda: self.update_progress(0, "処理が中断されました"))
                    break
                
                try:
                    df = file_info['df']
                    column_vars = file_info['column_vars']
                    function_vars = file_info['function_vars']
                    selected_columns = [column for column, var in column_vars.items() if var.get()]
                    
                    if not selected_columns:
                        processed_files += 1
                        self.root.after(0, lambda p=processed_files/total_files*100: 
                                       self.update_progress(p, f"処理中... ({processed_files}/{total_files})"))
                        continue
                    
                    # メモリ使用量を考慮したチャンク処理
                    chunk_size = self.calculate_optimal_chunk_size(df, amplify_factor)
                    total_rows = len(df) * amplify_factor
                    processed_rows = 0
                    
                    # 最終的なデータフレームを格納するリスト
                    df_chunks = []
                    
                    for chunk_idx in range(0, amplify_factor, chunk_size):
                        if self.cancel_processing:
                            break
                        
                        # 現在のチャンクサイズを計算
                        current_chunk_size = min(chunk_size, amplify_factor - chunk_idx)
                        
                        # チャンクを作成
                        df_chunk = pd.concat([df.copy() for _ in range(current_chunk_size)], ignore_index=True)
                        
                        # IDマッピングの初期化(必要な場合のみ)
                        if not file_info['id_mappings']:
                            file_info['id_mappings'] = {}
                        
                        # 各カラムに対して処理
                        for column in selected_columns:
                            if self.cancel_processing:
                                break
                                
                            faker_function_jp = function_vars[column].get()
                            faker_function = self.get_faker_function_by_description(faker_function_jp)
                            
                            if faker_function:
                                # カラムをマスク処理
                                self.process_column(df_chunk, column, faker_function, file_info)
                            else:
                                logger.warning(f"Faker関数が選択されていません: {column}")
                        
                        # 処理したチャンクを保存
                        df_chunks.append(df_chunk)
                        
                        # 進捗更新
                        processed_rows += len(df_chunk)
                        file_progress = processed_rows / total_rows
                        total_progress = (processed_files + file_progress) / total_files * 100
                        self.root.after(0, lambda p=total_progress: 
                                       self.update_progress(p, f"ファイル {processed_files+1}/{total_files} 処理中..."))
                    
                    if not self.cancel_processing:
                        # すべてのチャンクを結合
                        if df_chunks:
                            df_amplified = pd.concat(df_chunks, ignore_index=True)
                            
                            # 結果の保存
                            output_file_path = self.get_output_file_path(file_info['file_path'])
                            
                            # エンコーディングを維持
                            df_amplified.to_csv(output_file_path, index=False, encoding=file_info.get('encoding', 'utf-8-sig'))
                            logger.info(f"ファイル保存完了: {output_file_path}")
                    
                except Exception as e:
                    error_msg = f"ファイル処理エラー ({os.path.basename(file_info['file_path'])}): {str(e)}"
                    self.error_queue.put(error_msg)
                    logger.error(f"{error_msg}\n{traceback.format_exc()}")
                
                # メモリ解放
                self.cleanup_file_processing(file_info)
                processed_files += 1
            
            # 処理完了
            if not self.cancel_processing:
                self.root.after(0, lambda: self.update_progress(100, "すべての処理が完了しました"))
                self.root.after(0, lambda: messagebox.showinfo("完了", "全てのデータのマスキングが完了しました。"))
            
            # UI状態を更新
            self.root.after(0, lambda: self.cancel_button.config(state=tk.DISABLED))
            self.root.after(0, lambda: self.mask_button.config(state=tk.NORMAL))
            
        except Exception as e:
            error_msg = f"マスキング処理全体エラー: {str(e)}"
            self.error_queue.put(error_msg)
            logger.error(f"{error_msg}\n{traceback.format_exc()}")
            
            self.root.after(0, lambda: self.cancel_button.config(state=tk.DISABLED))
            self.root.after(0, lambda: self.mask_button.config(state=tk.NORMAL))
            self.root.after(0, lambda: self.update_progress(0, "エラーが発生しました"))

    def calculate_optimal_chunk_size(self, df, amplify_factor):
        """最適なチャンクサイズを計算"""
        try:
            # データフレームのメモリ使用量を推定 (バイト)
            memory_usage = df.memory_usage(deep=True).sum()
            
            # 利用可能なメモリの20%を使用する (単位: バイト)
            available_memory = 0.2 * psutil.virtual_memory().available
            
            # 1チャンクあたりの最大行数を計算
            max_rows_per_chunk = max(1, int(available_memory / (memory_usage / len(df))))
            
            # 安全係数を適用 (0.8)
            safe_rows_per_chunk = int(max_rows_per_chunk * 0.8)
            
            # 増幅倍率とチャンク数の関係
            chunk_size = min(10, max(1, safe_rows_per_chunk // len(df)))
            
            logger.info(f"最適チャンクサイズ: {chunk_size}, DF行数: {len(df)}, 増幅倍率: {amplify_factor}")
            return chunk_size
            
        except Exception as e:
            logger.error(f"チャンクサイズ計算エラー: {str(e)}")
            return 1  # エラー時は最小サイズ

    def cleanup_file_processing(self, file_info):
        """ファイル処理後のメモリクリーンアップ"""
        try:
            # 大きなデータ構造をクリア
            if 'temp_df' in file_info:
                del file_info['temp_df']
            
            # ガベージコレクション
            gc.collect()
        except Exception as e:
            logger.error(f"ファイル処理後のクリーンアップエラー: {str(e)}")

    def process_column(self, df, column, faker_function, file_info):
        """1つのカラムに対するマスキング処理"""
        try:
            # 日付フォーマットの処理
            if faker_function in ['date', 'date_of_birth']:
                date_format = self.infer_date_format(df[column])
                if date_format:
                    df[column] = df[column].apply(
                        lambda x: self.safe_generate_fake_data(faker_function, x, date_format=date_format))
                else:
                    df[column] = df[column].apply(
                        lambda x: self.safe_generate_fake_data(faker_function, x))
                        
            # フォーマットを保持する処理
            elif faker_function in ['phone_number', 'postcode', 'credit_card_number']:
                df[column] = df[column].apply(
                    lambda x: self.safe_formatted_data(faker_function, x))
                    
            # カナ氏名の処理
            elif faker_function == 'kana_name':
                df[column] = df[column].apply(
                    lambda x: self.safe_generate_kana_name(x))
                    
            # 氏名の処理
            elif faker_function in ['full_name']:
                df[column] = df[column].apply(
                    lambda x: self.safe_generate_full_name(x))
                    
            # 名・姓の処理
            elif faker_function in ['first_name', 'last_name']:
                df[column] = df[column].apply(
                    lambda x: self.safe_generate_name_with_script(x, faker_function))
                    
            # ユニークIDの処理
            elif faker_function == 'unique_id':
                df[column] = self.safe_generate_unique_ids(df[column], file_info)
                
            else:
                df[column] = df[column].apply(
                    lambda x: self.safe_generate_fake_data(faker_function, x))
                    
        except Exception as e:
            error_msg = f"カラム処理エラー ({column}, {faker_function}): {str(e)}"
            self.error_queue.put(error_msg)
            logger.error(f"{error_msg}\n{traceback.format_exc()}")
            # エラー時は元の値を保持
            pass


    def find_columns_to_mask(self, columns):
        """
        カラム名を分析して、マスキングすべきカラムとそれに対応するFaker関数を特定する
        
        Args:
            columns (Index): データフレームのカラム一覧
            
        Returns:
            dict: カラム名とそれに対応するFaker関数のマッピング
        """
        masking_columns = {}
        
        # 入力バリデーション
        if columns is None or len(columns) == 0:
            logger.warning("空のカラムリストが渡されました")
            return masking_columns
            
        try:
            # キーワードマッチングの効率向上のためにキャッシュを作成
            # タプルキーを文字列リストに変換して検索を最適化
            keyword_cache = {}
            for keywords_tuple, faker_function in self.keyword_to_faker.items():
                for keyword in keywords_tuple:
                    keyword_cache[keyword] = faker_function
            
            for column in columns:
                # NoneやNaN値などの非文字列値を安全に処理
                if column is None:
                    continue
                    
                try:
                    if pd.isna(column):
                        continue
                        
                    # 最初から文字列として扱い、変換に失敗した場合はスキップ
                    try:
                        column_lower = str(column).lower().strip()
                        if not column_lower:  # 空文字列チェック
                            continue
                    except (TypeError, ValueError):
                        logger.warning(f"カラム名を文字列に変換できません: {column}")
                        continue
                    
                    # 各キーワードカテゴリに対して一致するか確認(優先度順)
                    matched = False
                    matched_function = None
                    match_score = 0  # マッチスコア(完全一致>部分一致)
                    
                    # まず完全一致をチェック
                    if column_lower in keyword_cache:
                        matched = True
                        matched_function = keyword_cache[column_lower]
                        match_score = 100  # 完全一致の場合は高いスコア
                        logger.debug(f"カラム '{column}' は完全一致しました: '{matched_function}'")
                    
                    # 完全一致しない場合、部分一致をチェック
                    if not matched:
                        for keywords_tuple, faker_function in self.keyword_to_faker.items():
                            try:
                                # 最も長いキーワードが含まれるかチェック(より具体的なマッチを優先)
                                for keyword in sorted(keywords_tuple, key=len, reverse=True):
                                    if keyword in column_lower:
                                        new_score = len(keyword)  # キーワードの長さをスコアとする
                                        if new_score > match_score:
                                            matched = True
                                            matched_function = faker_function
                                            match_score = new_score
                            except Exception as e:
                                logger.error(f"キーワードマッチング中にエラー: {column}, {keywords_tuple}, {str(e)}")
                                continue
                    
                    # マッチング結果の適用
                    if matched and matched_function:
                        masking_columns[column] = matched_function
                        logger.debug(f"カラム '{column}' に対して '{matched_function}' 関数が割り当てられました (スコア: {match_score})")
                    else:
                        logger.debug(f"カラム '{column}' に対して該当するマスキング関数が見つかりませんでした")
                        
                except Exception as e:
                    # 個別のカラム処理中のエラーを捕捉し、他のカラムの処理を続行
                    logger.error(f"カラム '{column}' の処理中にエラー: {str(e)}")
                    continue
                    
        except Exception as e:
            # 全体的なエラーを捕捉
            logger.error(f"カラム分析中のエラー: {str(e)}\n{traceback.format_exc()}")
            # エラーが発生しても、処理済みの結果は返す
        
        # 結果のサマリをログに記録
        logger.info(f"検出されたマスキング対象カラム数: {len(masking_columns)}/{len(columns)}")
        
        return masking_columns

    def get_faker_function_by_description(self, description_jp):
        """
        日本語の説明からFaker関数名を取得する
        
        Args:
            description_jp (str): Faker関数の日本語説明
            
        Returns:
            str: Faker関数名(見つからない場合はNone)
        """
        if not description_jp:
            return None
            
        try:
            # 説明から関数名へのマッピングを作成
            desc_to_func = {desc: func for func, desc in self.faker_function_descriptions.items()}
            
            # 直接マッチする場合
            if description_jp in desc_to_func:
                return desc_to_func[description_jp]
                
            # 部分一致を試行
            for desc, func in desc_to_func.items():
                if desc in description_jp or description_jp in desc:
                    logger.debug(f"部分一致: '{description_jp}' => '{func}'")
                    return func
                    
            # 見つからない場合はログに記録
            logger.warning(f"Faker関数が見つかりません: '{description_jp}'")
            return None
            
        except Exception as e:
            logger.error(f"Faker関数検索エラー: {str(e)}, 入力: {description_jp}")
            return None


    def generate_fake_data(self, faker_function, **kwargs):
        """
        指定されたFaker関数を使用してフェイクデータを生成する
        
        Args:
            faker_function (str): 使用するFaker関数名
            **kwargs: 追加パラメータ(例:date_formatなど)
            
        Returns:
            any: 生成されたフェイクデータ
        """
        try:
            # 日付関連の特殊処理
            if faker_function == 'date':
                if 'date_format' in kwargs:
                    date_obj = self.fake.date_object()
                    return date_obj.strftime(kwargs['date_format'])
                else:
                    return str(self.fake.date())
                    
            elif faker_function == 'date_of_birth':
                if 'date_format' in kwargs:
                    date_obj = self.fake.date_of_birth()
                    return date_obj.strftime(kwargs['date_format'])
                else:
                    return str(self.fake.date_of_birth())
            
            # その他の一般的なFaker関数
            faker_method = getattr(self.fake, faker_function, None)
            if faker_method:
                return faker_method()
            else:
                logger.warning(f"未定義のFaker関数: {faker_function}")
                return "Unknown_Function"
                
        except Exception as e:
            logger.error(f"フェイクデータ生成エラー: {str(e)}, 関数: {faker_function}")
            return f"Error_{faker_function}"

    def generate_unique_ids(self, id_series, file_info):
        """
        ユニークIDを生成し、一貫性を保持する
        
        Args:
            id_series (Series): 元のID列
            file_info (dict): ファイル情報辞書
            
        Returns:
            Series: 新しいユニークID列
        """
        try:
            # IDマッピングを取得
            id_mappings = file_info.get('id_mappings', {})
            
            # 新しいIDを割り当てる関数
            def assign_id(old_id):
                if pd.isna(old_id):
                    return old_id
                
                str_id = str(old_id)
                
                # マッピングがすでに存在する場合は再利用
                if str_id in id_mappings:
                    return id_mappings[str_id]
                    
                # 新しいIDを生成
                try:
                    new_id = self.fake.uuid4()
                    id_mappings[str_id] = new_id
                    return new_id
                except Exception as e:
                    logger.error(f"ユニークID生成エラー: {str(e)}, 元のID: {str_id}")
                    new_id = f"id_{len(id_mappings)}"
                    id_mappings[str_id] = new_id
                    return new_id
            
            # 各IDに対して新しいIDを割り当て
            return id_series.apply(assign_id)
            
        except Exception as e:
            logger.error(f"ユニークID生成全体エラー: {str(e)}")
            # エラー時は元のシリーズを返す
            return id_series

    def infer_date_format(self, date_series):
        """
        データ列から日付フォーマットを推測する
        
        Args:
            date_series (Series): 日付データが含まれている列
            
        Returns:
            str: 推測された日付フォーマット(推測できない場合はNone)
        """
        try:
            # NaNを除外
            valid_dates = date_series.dropna()
            
            if len(valid_dates) == 0:
                return None
                
            # 最初の数個のサンプルを取得
            samples = valid_dates.head(min(5, len(valid_dates)))
            
            # 各サンプルで日付フォーマットを検出
            formats = []
            for date_str in samples:
                fmt = self.detect_date_format(date_str)
                if fmt:
                    formats.append(fmt)
            
            # 最も頻繁に検出されたフォーマットを返す
            if formats:
                from collections import Counter
                most_common = Counter(formats).most_common(1)[0][0]
                logger.info(f"推測された日付フォーマット: {most_common}")
                return most_common
                
            return None
            
        except Exception as e:
            logger.error(f"日付フォーマット推論エラー: {str(e)}")
            return None
 
    # 以下、各種安全な生成メソッド
    def safe_generate_fake_data(self, faker_function, original_value, **kwargs):
        """例外処理を含むフェイクデータ生成"""
        try:
            if pd.isna(original_value) or original_value == '':
                return original_value
            return self.generate_fake_data(faker_function, **kwargs)
        except Exception as e:
            logger.error(f"フェイクデータ生成エラー ({faker_function}): {str(e)}")
            return str(original_value)  # エラー時は元の値を文字列として返す
    
    def safe_formatted_data(self, faker_function, original_value):
        """例外処理を含むフォーマット付きデータ生成"""
        try:
            if pd.isna(original_value) or original_value == '':
                return original_value
            return self.generate_formatted_data(faker_function, original_value)
        except Exception as e:
            logger.error(f"フォーマット付きデータ生成エラー ({faker_function}): {str(e)}")
            return str(original_value)

    def safe_generate_kana_name(self, original_value):
        """例外処理を含むカナ氏名生成"""
        try:
            if pd.isna(original_value) or original_value == '':
                return original_value
            return self.generate_kana_name(original_value)
        except Exception as e:
            logger.error(f"カナ氏名生成エラー: {str(e)}")
            return str(original_value)

    def safe_generate_full_name(self, original_value):
        """例外処理を含む氏名生成"""
        try:
            if pd.isna(original_value) or original_value == '':
                return original_value
            return self.generate_full_name(original_value)
        except Exception as e:
            logger.error(f"氏名生成エラー: {str(e)}")
            return str(original_value)

    def safe_generate_name_with_script(self, original_value, faker_function):
        """例外処理を含む名前生成"""
        try:
            if pd.isna(original_value) or original_value == '':
                return original_value
            return self.generate_name_with_script(original_value, faker_function)
        except Exception as e:
            logger.error(f"名前生成エラー ({faker_function}): {str(e)}")
            return str(original_value)

    def safe_generate_unique_ids(self, id_series, file_info):
        """例外処理を含むユニークID生成"""
        try:
            return self.generate_unique_ids(id_series, file_info)
        except Exception as e:
            logger.error(f"ユニークID生成エラー: {str(e)}")
            return id_series.apply(lambda x: str(x) if not pd.isna(x) else x)

    def show_faker_help(self):
        help_text = "Faker関数の説明:\n\n"
        faker_descriptions = {
        'unique_id': 'ユニークID: ランダムな一意の識別子を生成します。',
        'full_name': '氏名: ランダムな姓と名を組み合わせたフルネームを生成します。',
        'kana_name': 'カナ氏名: ランダムな氏名をカタカナまたはひらがなで生成します。',
        'first_name': '名: ランダムな名前(名)を生成します。',
        'last_name': '姓: ランダムな名字(姓)を生成します。',
        'email': 'メールアドレス: ランダムなメールアドレスを生成します。',
        'phone_number': '電話番号: ランダムな電話番号を生成します。',
        'address': '住所: ランダムな住所を生成します。',
        'city': '市町村: ランダムな市町村名を生成します。',
        'prefecture': '都道府県: ランダムな都道府県名を生成します。',
        'postcode': '郵便番号: ランダムな郵便番号を生成します。',
        'company': '会社名: ランダムな会社名を生成します。',
        'ssn': '社会保障番号: ランダムな社会保障番号(日本ではマイナンバーなど)を生成します。',
        'date_of_birth': '生年月日: ランダムな生年月日を生成します。',
        'date': '日付: ランダムな日付を生成します。',
        'credit_card_number': 'クレジットカード番号: ランダムなクレジットカード番号を生成します。',
        'credit_card_provider': 'クレジットカード会社: ランダムなクレジットカード発行会社名を生成します。',
        'credit_card_security_code': 'クレジットカードセキュリティコード: ランダムなCVC/CVVコードを生成します。',
        'credit_card_expire': 'クレジットカード有効期限: ランダムなクレジットカードの有効期限を生成します。',
        'university': '大学名: ランダムな大学名を生成します。',
        'url': 'URL: ランダムなウェブサイトのURLを生成します。',
        'uuid4': 'UUID: ランダムなUUID4形式の識別子を生成します。',
        }

        for func in self.available_faker_functions:
            desc = faker_descriptions.get(func, func)
            help_text += f"{desc}\n\n"

        messagebox.showinfo("Faker関数の説明", help_text)


    def mask_data(self):
        try:
            amplify_factor = int(self.amplify_entry.get())
            if amplify_factor < 1:
                raise ValueError
        except ValueError:
            messagebox.showwarning("警告", "データ増幅倍率には1以上の整数を入力してください。")
            return

        progress_window = tk.Toplevel(self.root)
        progress_window.title("マスキング処理中")
        progress_label = tk.Label(progress_window, text="マスキング処理を実行中...", font=("Arial", 12))
        progress_label.pack(padx=20, pady=20)

        self.root.update_idletasks()

        for file_info in self.file_data:
            df = file_info['df']
            column_vars = file_info['column_vars']
            function_vars = file_info['function_vars']
            selected_columns = [column for column, var in column_vars.items() if var.get()]

            if not selected_columns:
                continue  # このファイルではマスキングするカラムがない

            # データ増幅用のデータフレームを作成
            df_list = [df.copy() for _ in range(amplify_factor)]
            df_amplified = pd.concat(df_list, ignore_index=True)

            # IDマッピングの初期化
            file_info['id_mappings'] = {}

            for column in selected_columns:
                faker_function_jp = function_vars[column].get()
                faker_function = self.get_faker_function_by_description(faker_function_jp)
                if faker_function:
                    # 日付フォーマットの処理
                    if faker_function in ['date', 'date_of_birth']:
                        date_format = self.infer_date_format(df_amplified[column])
                        if date_format:
                            df_amplified[column] = df_amplified[column].apply(lambda x: self.generate_fake_data(faker_function, date_format=date_format))
                        else:
                            df_amplified[column] = df_amplified[column].apply(lambda x: self.generate_fake_data(faker_function))
                    # フォーマットを保持する処理(電話番号、郵便番号、クレジットカード番号など)
                    elif faker_function in ['phone_number', 'postcode', 'credit_card_number']:
                        df_amplified[column] = df_amplified[column].apply(lambda x: self.generate_formatted_data(faker_function, x))
                    # カナ氏名の処理
                    elif faker_function == 'kana_name':
                        df_amplified[column] = df_amplified[column].apply(lambda x: self.generate_kana_name(x))
                    # 氏名の処理
                    elif faker_function in ['full_name']:
                        df_amplified[column] = df_amplified[column].apply(lambda x: self.generate_full_name(x))
                    # 名・姓の処理
                    elif faker_function in ['first_name', 'last_name']:
                        df_amplified[column] = df_amplified[column].apply(lambda x: self.generate_name_with_script(x, faker_function))
                    # ユニークIDの処理
                    elif faker_function == 'unique_id':
                        df_amplified[column] = self.generate_unique_ids(df_amplified[column], file_info)
                    else:
                        df_amplified[column] = df_amplified[column].apply(lambda x: self.generate_fake_data(faker_function))
                else:
                    messagebox.showwarning("警告", f"{column} のFaker関数が選択されていません。")
                    progress_window.destroy()
                    return

            # 結果の保存
            output_file_path = self.get_output_file_path(file_info['file_path'])
            df_amplified.to_csv(output_file_path, index=False, encoding='utf-8-sig')

        progress_window.destroy()
        messagebox.showinfo("完了", f"全てのデータのマスキングが完了しました。")

    def generate_formatted_data(self, faker_function, original_value):
        """
        元のフォーマットを保持しつつ、指定されたFaker関数を使用して偽データを生成する
        
        Args:
            faker_function (str): 使用するFaker関数名('phone_number', 'postcode', 'credit_card_number'など)
            original_value (any): 元のデータ値
            
        Returns:
            str: 生成された偽データ(元のフォーマットを保持)
        """
        # 入力値のバリデーション - None/NaN値のチェック
        if pd.isnull(original_value):
            return original_value
            
        # 文字列への変換
        try:
            original_value = str(original_value).strip()
            if not original_value:  # 空文字列チェック
                return original_value
        except Exception as e:
            logger.error(f"フォーマットデータの文字列変換エラー: {str(e)}, 値: {original_value}")
            return str(original_value) if original_value is not None else ''
        
        try:
            # 数字以外の文字(ハイフン、スペースなど)を抽出して保存
            non_digit_chars = re.findall(r'\D+', original_value)
            
            # 数字部分の長さを配列で保存(各数字ブロックの桁数を維持するため)
            digit_blocks = re.findall(r'\d+', original_value)
            if not digit_blocks:  # 数字が含まれない場合は元の値を返す
                return original_value
                
            digit_counts = [len(s) for s in digit_blocks]
            
            # Faker関数に基づいてフェイクデータを生成
            try:
                if faker_function == 'phone_number':
                    fake_number = self.fake.phone_number()
                elif faker_function == 'postcode':
                    fake_number = self.fake.postcode()
                elif faker_function == 'credit_card_number':
                    fake_number = self.fake.credit_card_number()
                else:
                    # サポートされていない関数の場合、ログに記録して空の文字列を使用
                    logger.warning(f"サポート外のフォーマット関数: {faker_function}")
                    fake_number = ''.join(['0' * count for count in digit_counts])
            except Exception as e:
                logger.error(f"Faker関数実行エラー: {str(e)}, 関数: {faker_function}")
                # エラー時は0埋めの数字を生成
                fake_number = ''.join(['0' * count for count in digit_counts])
    
            # フェイクデータから数字部分のみを抽出
            fake_digits = re.findall(r'\d+', fake_number)
            if not fake_digits:  # 生成されたデータに数字が含まれない場合
                fake_digits = ['0' * count for count in digit_counts]
            
            # 元のフォーマットに合わせて結果を組み立て
            result = ''
            idx = 0
            
            # 各数字ブロックと区切り文字を交互に追加
            for i, count in enumerate(digit_counts):
                # 数字ブロックの追加
                if idx < len(fake_digits):
                    # 必要な桁数だけ取得(足りない場合は0埋め)
                    current_digits = fake_digits[idx][:count]
                    if len(current_digits) < count:
                        current_digits = current_digits.ljust(count, '0')
                    result += current_digits
                    idx += 1
                else:
                    # フェイクデータの数字ブロックが不足している場合は0埋め
                    result += '0' * count
                
                # 最後の数字ブロック以外の後に区切り文字を追加
                if i < len(digit_counts) - 1 and non_digit_chars:
                    result += non_digit_chars.pop(0)
                elif i == len(digit_counts) - 1 and non_digit_chars:
                    # 最後の数字ブロックの後にも区切り文字がある場合
                    result += non_digit_chars.pop(0)
    
            return result
            
        except Exception as e:
            # 想定外の例外をキャッチして処理を継続
            logger.error(f"generate_formatted_data全体エラー: {str(e)}, 値: {original_value}")
            return str(original_value)  # 最終的なフォールバックとして元の値を返す

    def generate_formatted_data(self, faker_function, original_value):
        """
        元のフォーマットを保持しつつ、指定されたFaker関数を使用して偽データを生成する
        
        Args:
            faker_function (str): 使用するFaker関数名('phone_number', 'postcode', 'credit_card_number'など)
            original_value (any): 元のデータ値
            
        Returns:
            str: 生成された偽データ(元のフォーマットを保持)
        """
        # 入力値のバリデーション - None/NaN値のチェック
        if pd.isnull(original_value):
            return original_value
            
        # 文字列への変換
        try:
            original_value = str(original_value).strip()
            if not original_value:  # 空文字列チェック
                return original_value
        except Exception as e:
            logger.error(f"フォーマットデータの文字列変換エラー: {str(e)}, 値: {original_value}")
            return str(original_value) if original_value is not None else ''
        
        try:
            # 数字以外の文字(ハイフン、スペースなど)を抽出して保存
            non_digit_chars = re.findall(r'\D+', original_value)
            
            # 数字部分の長さを配列で保存(各数字ブロックの桁数を維持するため)
            digit_blocks = re.findall(r'\d+', original_value)
            if not digit_blocks:  # 数字が含まれない場合は元の値を返す
                return original_value
                
            digit_counts = [len(s) for s in digit_blocks]
            
            # Faker関数に基づいてフェイクデータを生成
            try:
                if faker_function == 'phone_number':
                    fake_number = self.fake.phone_number()
                elif faker_function == 'postcode':
                    fake_number = self.fake.postcode()
                elif faker_function == 'credit_card_number':
                    # クレジットカード番号の生成を改善
                    try:
                        # カード種類を指定せずに汎用的なカード番号を生成
                        fake_number = self.fake.credit_card_number(card_type=None)
                        
                        # 何らかの理由で空や無効な値が返された場合の対処
                        if not fake_number or not re.search(r'\d', fake_number):
                            # 正しい長さの数字文字列をフォールバックとして生成
                            total_digits = sum(digit_counts)
                            fake_number = ''.join([str(self.fake.random_digit()) for _ in range(total_digits)])
                    except Exception as card_error:
                        logger.error(f"クレジットカード番号生成エラー: {str(card_error)}")
                        # 代替手段としてランダムな数字を生成
                        fake_number = ''.join([str(self.fake.random_digit()) for _ in range(16)])
                else:
                    # サポートされていない関数の場合、ログに記録して空の文字列を使用
                    logger.warning(f"サポート外のフォーマット関数: {faker_function}")
                    fake_number = ''.join(['0' * count for count in digit_counts])
            except Exception as e:
                logger.error(f"Faker関数実行エラー: {str(e)}, 関数: {faker_function}")
                # エラー時は0埋めの数字を生成
                fake_number = ''.join(['0' * count for count in digit_counts])
    
            # フェイクデータから数字部分のみを抽出
            fake_digits = re.findall(r'\d+', fake_number)
            if not fake_digits:  # 生成されたデータに数字が含まれない場合
                # ランダムな数字を生成
                fake_digits = [''.join([str(self.fake.random_digit()) for _ in range(count)]) for count in digit_counts]
            
            # 元のフォーマットに合わせて結果を組み立て
            result = ''
            idx = 0
            
            # 各数字ブロックと区切り文字を交互に追加
            for i, count in enumerate(digit_counts):
                # 数字ブロックの追加
                if idx < len(fake_digits):
                    # 必要な桁数だけ取得(足りない場合は0埋め)
                    current_digits = fake_digits[idx][:count]
                    if len(current_digits) < count:
                        current_digits = current_digits.ljust(count, '0')
                    result += current_digits
                    idx += 1
                else:
                    # フェイクデータの数字ブロックが不足している場合はランダムな数字を生成
                    result += ''.join([str(self.fake.random_digit()) for _ in range(count)])
                
                # 最後の数字ブロック以外の後に区切り文字を追加
                if i < len(digit_counts) - 1 and i < len(non_digit_chars):
                    result += non_digit_chars[i]
                elif i == len(digit_counts) - 1 and i < len(non_digit_chars):
                    # 最後の数字ブロックの後にも区切り文字がある場合
                    result += non_digit_chars[i]
    
            return result
            
        except Exception as e:
            # 想定外の例外をキャッチして処理を継続
            logger.error(f"generate_formatted_data全体エラー: {str(e)}, 値: {original_value}")
            return str(original_value)  # 最終的なフォールバックとして元の値を返す
            


    def generate_full_name(self, original_value):
        """
        元の文字種と形式を保持しつつフルネームを生成する
        
        Args:
            original_value (any): 元の名前データ
            
        Returns:
            str: 生成された偽名データ(元の文字種と形式を保持)
        """
        # 入力値のバリデーション - None/NaN値のチェック
        if pd.isnull(original_value):
            return original_value
            
        # 文字列への変換とトリミング
        try:
            original_value = str(original_value).strip()
            if not original_value:  # 空文字列チェック
                return original_value
        except Exception as e:
            logger.error(f"名前データの文字列変換エラー: {str(e)}, 値: {original_value}")
            return original_value  # エラー時は元の値を返す
        
        try:
            # 姓と名を分割(スペースで区切られた部分を検出)
            separators = re.findall(r'\s+', original_value)
            names = re.findall(r'\S+', original_value)
            
            # 元の文字種を判定(漢字/ひらがな/カタカナ)
            try:
                script = self.detect_name_script(original_value)
            except Exception as e:
                logger.error(f"文字種判定エラー: {str(e)}, 値: {original_value}")
                script = 'kanji'  # エラー時は漢字として扱う
            
            # フェイクの姓と名を生成
            try:
                fake_last_name = self.fake.last_name() or "山田"  # フォールバック値を設定
                fake_first_name = self.fake.first_name() or "太郎"  # フォールバック値を設定
            except Exception as e:
                logger.error(f"Faker名前生成エラー: {str(e)}")
                fake_last_name, fake_first_name = "山田", "太郎"  # エラー時のデフォルト値
            
            # 必要に応じてカナに変換
            try:
                if script == 'hiragana':
                    # 半角→全角変換、カタカナ→ひらがな変換の順で処理
                    fake_last_name = jaconv.kana2hiragana(jaconv.kata2hira(jaconv.han2zen(fake_last_name)))
                    fake_first_name = jaconv.kana2hiragana(jaconv.kata2hira(jaconv.han2zen(fake_first_name)))
                elif script == 'katakana':
                    # 半角→全角変換、ひらがな→カタカナ変換の順で処理
                    fake_last_name = jaconv.hira2kata(jaconv.kana2katakana(jaconv.han2zen(fake_last_name)))
                    fake_first_name = jaconv.hira2kata(jaconv.kana2katakana(jaconv.han2zen(fake_first_name)))
                # 'kanji'の場合はそのまま
            except Exception as e:
                logger.error(f"文字種変換エラー: {str(e)}, 文字種: {script}")
                # 変換エラー時は変換せずに返す(元の生成値をそのまま使用)
            
            # 元のフォーマットに合わせて組み立て
            try:
                if len(names) >= 2:  # 姓名が分割できる場合
                    result = ''
                    result += fake_last_name
                    if separators:  # 区切り文字があれば追加
                        result += separators[0]
                    result += fake_first_name
                    return result
                else:  # 名前全体として処理
                    # 単一の名前として処理
                    try:
                        fake_name = self.fake.name() or "山田 太郎"  # フォールバック値を設定
                        
                        # 文字種変換
                        if script == 'hiragana':
                            fake_name = jaconv.kana2hiragana(jaconv.kata2hira(jaconv.han2zen(fake_name)))
                        elif script == 'katakana':
                            fake_name = jaconv.hira2kata(jaconv.kana2katakana(jaconv.han2zen(fake_name)))
                            
                        return fake_name
                    except Exception as e:
                        logger.error(f"単一名前生成エラー: {str(e)}")
                        return fake_last_name  # エラー時は姓だけを返す
            except Exception as e:
                logger.error(f"名前フォーマット組立エラー: {str(e)}")
                # フォーマットエラー時は生成した姓と名を半角スペースで結合
                return f"{fake_last_name} {fake_first_name}"
                
        except Exception as e:
            # 想定外の例外をキャッチして処理を継続
            logger.error(f"generate_full_name全体エラー: {str(e)}, 値: {original_value}")
            return str(original_value)  # 最終的なフォールバックとして元の値を返す
    
    def generate_kana_name(self, original_value):
        """
        元の文字種(ひらがな/カタカナ)を保持しつつカナ氏名を生成する
        
        Args:
            original_value (any): 元のカナ氏名データ
            
        Returns:
            str: 生成された偽カナ氏名(元の文字種を保持)
        """
        # 入力値のバリデーション - None/NaN値のチェック
        if pd.isnull(original_value):
            return original_value
        
        # 文字列への変換とトリミング
        try:
            original_value = str(original_value).strip()
            if not original_value:  # 空文字列チェック
                return original_value
        except Exception as e:
            logger.error(f"カナ名データの文字列変換エラー: {str(e)}, 値: {original_value}")
            return original_value  # エラー時は元の値を返す
        
        try:
            # 元の文字種を判定(ひらがな/カタカナ)
            try:
                script = self.detect_name_script(original_value)
            except Exception as e:
                logger.error(f"カナ文字種判定エラー: {str(e)}, 値: {original_value}")
                script = 'katakana'  # エラー時はカタカナとして扱う(一般的にカナ氏名はカタカナが多い)
            
            # フェイクの氏名を生成
            try:
                fake_name = self.fake.name() or "ヤマダ タロウ"  # フォールバック値を設定
            except Exception as e:
                logger.error(f"Fakerカナ名生成エラー: {str(e)}")
                # エラー時のデフォルト値(script値に応じて適切なデフォルトを設定)
                if script == 'hiragana':
                    return "やまだ たろう"
                else:
                    return "ヤマダ タロウ"
            
            # カナに変換
            try:
                if script == 'hiragana':
                    # 半角→全角変換、カタカナ→ひらがな変換の順で処理
                    fake_name = jaconv.kana2hiragana(jaconv.kata2hira(jaconv.han2zen(fake_name)))
                else:  # デフォルトはカタカナ (script == 'katakana' または他の値)
                    # 半角→全角変換、ひらがな→カタカナ変換の順で処理
                    fake_name = jaconv.hira2kata(jaconv.kana2katakana(jaconv.han2zen(fake_name)))
            except Exception as e:
                logger.error(f"カナ変換エラー: {str(e)}, 文字種: {script}, 値: {fake_name}")
                # 変換エラー時は変換せずに返す
                
            # 元の文字列長を考慮(極端に長い名前への対応)
            if len(original_value) > 15 and len(fake_name) < 10:
                # 元の名前が長いのに生成された名前が短い場合、適度に埋める
                fake_name = fake_name + " " + fake_name
                fake_name = fake_name[:len(original_value)]
                
            return fake_name
            
        except Exception as e:
            # 想定外の例外をキャッチして処理を継続
            logger.error(f"generate_kana_name全体エラー: {str(e)}, 値: {original_value}")
            return str(original_value)  # 最終的なフォールバックとして元の値を返す

    def generate_name_with_script(self, original_value, faker_function):
        """
        元の文字種(ひらがな・カタカナ・漢字)を保持しつつ名前を生成する
        
        Args:
            original_value (str/any): 元の名前データ
            faker_function (str): 使用するFaker関数名('first_name'/'last_name'等)
            
        Returns:
            str: 生成された偽名データ(元の文字種を保持)
        """
        # 入力値のバリデーション
        if pd.isnull(original_value):
            return original_value
            
        # 文字列に変換(数値や特殊型が入力される可能性を考慮)
        try:
            original_value = str(original_value).strip()
            # 変換後に空文字になった場合は元の値を返す
            if not original_value:
                return original_value
        except Exception as e:
            logger.error(f"名前データの文字列変換エラー: {str(e)}, 値: {original_value}")
            return original_value  # エラー時は元の値を返す
            
        # 元の文字種を判定
        try:
            script = self.detect_name_script(original_value)
        except Exception as e:
            logger.error(f"文字種判定エラー: {str(e)}, 値: {original_value}")
            script = 'kanji'  # デフォルトは漢字として扱う
            
        # フェイクの名前を生成
        try:
            # Fakerオブジェクトから指定されたメソッドを取得
            faker_method = getattr(self.fake, faker_function, None)
            if faker_method is None:
                logger.error(f"無効なFaker関数: {faker_function}")
                return original_value
                
            # メソッド呼び出し
            fake_name = faker_method()
            
            # 例外的なNoneや空文字が返された場合の対応
            if not fake_name:
                logger.warning(f"Faker関数 {faker_function} が空の値を返しました")
                return original_value
                
        except Exception as e:
            logger.error(f"Faker名前生成エラー: {str(e)}, 関数: {faker_function}")
            return original_value  # エラー時は元の値を返す
        
        # 必要に応じてカナに変換
        try:
            if script == 'hiragana':
                # 半角→全角変換、カタカナ→ひらがな変換の順で処理
                fake_name = jaconv.kana2hiragana(jaconv.kata2hira(jaconv.han2zen(fake_name)))
            elif script == 'katakana':
                # 半角→全角変換、ひらがな→カタカナ変換の順で処理
                fake_name = jaconv.hira2kata(jaconv.kana2katakana(jaconv.han2zen(fake_name)))
            # 'kanji'の場合はそのまま
        except Exception as e:
            logger.error(f"文字種変換エラー: {str(e)}, 文字種: {script}, 値: {fake_name}")
            # 変換エラー時は変換せずに返す
            
        # 元の文字列の長さを考慮(極端に長い/短い名前への対応)
        if len(original_value) > 10 and len(fake_name) < 5:
            # 元の名前が長いのに生成された名前が短い場合、適度に埋める
            fake_name = fake_name * (len(original_value) // len(fake_name) + 1)
            fake_name = fake_name[:len(original_value)]
            
        return fake_name
    
    def detect_name_script(self, text):
        """
        テキストの文字種(ひらがな・カタカナ・漢字)を判定する
        
        Args:
            text (str): 判定する文字列
            
        Returns:
            str: 'hiragana', 'katakana', 'kanji'のいずれか
        """
        if not text or not isinstance(text, str):
            return 'kanji'  # デフォルト値
            
        # 空白文字を除外して判定
        clean_text = ''.join(char for char in text if char.strip())
        
        # すべての文字がひらがな範囲内か確認
        if clean_text and all('\u3040' <= char <= '\u309f' for char in clean_text):
            return 'hiragana'
        # すべての文字がカタカナ範囲内か確認
        elif clean_text and all('\u30a0' <= char <= '\u30ff' for char in clean_text):
            return 'katakana'
        # それ以外は漢字(または混合文字)として扱う
        else:
            return 'kanji'

    def detect_date_format(self, date_str):
        """
        文字列から日付フォーマットを検出する関数
        """
        # 空文字やNoneの場合は早期リターン
        if not date_str or pd.isna(date_str):
            return None
            
        # 文字列に変換(数値などが入力された場合の対策)
        try:
            date_str = str(date_str).strip()
        except Exception:
            return None
            
        # 空文字になった場合も早期リターン
        if not date_str:
            return None
        
        # サポートする日付フォーマットの一覧
        # ISO形式、地域別形式、日本語形式、タイムスタンプ形式など多様なパターンをカバー
        date_formats = [
            # 標準的なISO形式
            "%Y-%m-%d",  # 2023-01-31
            # 地域別の一般的な形式
            "%d/%m/%Y",  # 31/01/2023 (欧州等)
            "%m/%d/%Y",  # 01/31/2023 (米国等)
            "%Y/%m/%d",  # 2023/01/31 (日本等)
            # ハイフン区切り
            "%d-%m-%Y",  # 31-01-2023
            "%m-%d-%Y",  # 01-31-2023
            # ドット区切り
            "%Y.%m.%d",  # 2023.01.31
            "%d.%m.%Y",  # 31.01.2023
            "%m.%d.%Y",  # 01.31.2023
            # 区切りなし
            "%Y%m%d",    # 20230131
            "%d%m%Y",    # 31012023
            "%m%d%Y",    # 01312023
            # 日本語表記
            "%Y年%m月%d日",  # 2023年01月31日
            # 日時形式
            "%Y/%m/%d %H:%M:%S",  # 2023/01/31 12:34:56
            "%Y-%m-%d %H:%M:%S",  # 2023-01-31 12:34:56
            # 英語月名表記
            "%d %b %Y",  # 31 Jan 2023 (短縮月名)
            "%d %B %Y",  # 31 January 2023 (完全月名)
            # 日本語日時形式
            "%Y年%m月%d日 %H時%M分%S秒",  # 2023年01月31日 12時34分56秒
            # ISO 8601拡張形式
            "%Y-%m-%dT%H:%M:%S",  # 2023-01-31T12:34:56
            "%Y/%m/%dT%H:%M:%S",  # 2023/01/31T12:34:56
            "%Y%m%dT%H%M%S",      # 20230131T123456
        ]
        
        # 各フォーマットを試してマッチするものを探す
        for fmt in date_formats:
            try:
                # 文字列をdatetimeオブジェクトに変換を試みる
                datetime_obj = datetime.strptime(date_str, fmt)
                # 変換が成功した場合、そのフォーマットを返す
                return fmt
            except ValueError:
                # このフォーマットではない場合は次を試す
                continue
            except Exception as e:
                # 予期せぬエラーの場合はログに記録して次を試す
                logger.debug(f"日付フォーマット検出中の例外: {str(e)} for format {fmt}, date_str: {date_str}")
                continue
        
        return None

    def get_output_file_path(self, input_file_path):
        base, ext = os.path.splitext(input_file_path)
        now = datetime.now().strftime("%Y%m%d_%H%M%S")  # 現在の日時を取得
        output_file_path = f"{base}_masked_{now}{ext}"  # ファイル名に日時を追加
        return output_file_path

    def show_faker_help(self):
        help_text = "Faker関数の説明:\n\n"
        for func in self.available_faker_functions:
            desc = self.faker_function_descriptions.get(func, func)
            help_text += f"{desc}: {func}\n"

        messagebox.showinfo("Faker関数の説明", help_text)

    # **使用方法を表示するメソッドの追加**
    def show_usage(self):
        usage_text = """CSVデータマスキングツールの使用方法:

1. 「ファイルを選択」ボタンをクリックし、マスキングしたいCSVファイルを選択します。
   - 複数のファイルを選択することも可能です。

2. 必要に応じてデータ増幅倍率を設定します。
   - デフォルトは1(増幅なし)です。整数値を入力してください。

3. 各ファイルごとにタブが作成されます。
   - 各タブでマスキング対象のカラムと使用するFaker関数を確認・編集できます。

4. マスキングしたいカラムにチェックを入れ、適切なFaker関数を選択します。
   - デフォルトでカラム名に基づいて自動的に選択されています。

5. 「マスキングを実行」ボタンをクリックすると、マスキング処理が開始されます。

6. マスキング処理が完了すると、元のファイル名に日時が付加された新しいCSVファイルが生成されます。
   - 例:`test_data_masked_20231005_153045.csv`

7. 生成されたファイルを確認し、データが正しくマスキングされていることを確認してください。

注意事項:
- CSVファイルはUTF-8エンコーディングで保存してください。
- データ増幅倍率を大きく設定すると、処理に時間がかかる場合があります。
- Faker関数の詳細は「Faker関数の説明」ボタンをクリックして確認できます。
"""
        messagebox.showinfo("使用方法", usage_text)

# メイン部分に例外ハンドリングを追加
if __name__ == "__main__":
    try:
        # psutilのインポート(利用可能な場合)
        try:
            import psutil
        except ImportError:
            # psutilが利用できない場合は代替値を使用
            class MockPsutil:
                class VirtualMemory:
                    def __init__(self):
                        self.available = 1024 * 1024 * 1024  # 1GB

                def virtual_memory(self):
                    return self.VirtualMemory()
            
            psutil = MockPsutil()
            logger.warning("psutilライブラリがインストールされていません。メモリ管理が最適化されません。")
        
        root = tk.Tk()
        app = MaskingApp(root)
        root.mainloop()
    except Exception as e:
        error_message = f"アプリケーション実行エラー: {str(e)}\n{traceback.format_exc()}"
        logger.critical(error_message)
        
        # 最後の手段としてのエラー表示
        try:
            tk.messagebox.showerror("クリティカルエラー", f"アプリケーションでエラーが発生しました:\n{str(e)}\n\n詳細はログファイルを確認してください。")
        except:
            print(error_message)
        
        sys.exit(1)
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?