概要
- 指定ディレクトリ配下の CSV ファイル群を検索し、指定列に指定キーワードを含む行を抽出
- 抽出結果は一時ファイルに書き出した後マージし、sort コマンドでソート・重複排除を行って最終結果 CSV を出力
特徴
- cp932 でエンコードされた CSV を読み取り (Windows 向け)
- "Summary" 列を対象にキーワード検索
- 並列処理 (
ProcessPoolExecutor
) による検索高速化 (実行環境のコア数による) - 一時ファイルのマージと Linux sort -u コマンドでの重複排除
- 上限行数を超える場合はソート・重複排除をスキップ(ディスクとメモリ圧迫回避)
- 出力 CSV は UTF-8 BOM 付き (utf-8-sig) で保存 (Windows Excel 対応)
- 出力ファイルは実行時刻付きのファイル名で保存
出力
- ファイル名:
search_results_YYYYMMDD_HHMMSS.csv
- ヒットした行を出力(ヘッダー付き)
- 重複排除あり(行全体一致の場合のみ)
- 実行スクリプトと同階層に出力
注意事項
- "Summary" 列が存在しない場合は処理を中断
- 大量データでは出力が数百万行になる可能性あり
- 一時ファイルや中間ファイルをディスクに書き出すため、ディスク I/O がボトルネックになる可能性あり
- ソート処理は Linux 環境の sort コマンドに依存
main() 解説
検索キーワードとCSVディレクトリパスの入力
keyword = input("検索キーワードの入力: ").strip()
csv_dir_str = input("CSVディレクトリパスの入力: ").strip()
正しい値が入力されるまでループさせる
入力途中Ctrl+C
で処理を終了
CSVディレクトリパスはディレクトリの存在有無も確認
シグナルを設定
signal.signal(signal.SIGINT, make_handler(search_csv_temp_files))
signal.signal(signal.SIGTERM, make_handler(search_csv_temp_files))
Ctrl+C
またはKill
を補足し、一時ファイル群を削除する
渡した一時ファイルを削除する関数
def make_handler(temp_files):
def handler(signum, frame):
for f in temp_files:
if f and os.path.exists(f):
try:
os.remove(f)
except Exception as e:
print(f"[WARN] cleanup失敗 {f}: {e}")
print("[INFO] 処理を終了しています....")
return handler
シグナルで引数も一緒に渡したかったためクロージャで関数を定義
ディレクトリ内のCSVファイルをまとめた配列
csv_files = list(csv_dir.glob("*.csv"))
CSVファイルの最初のファイルでSummary列の位置を決定
with open(csv_files[0], "r", encoding="cp932", errors="ignore") as f:
reader = csv.reader(f)
header = next(reader)
try:
summary_index = header.index("Summary")
except ValueError:
print("[ERROR] Summary列が見つかりませんでした")
return
検索対象の列を最初に決定しておく(検索範囲を絞ることによる処理時間短縮)
CSVファイルのヘッダーもここで取得しておく
CSVファイルを検索しキーワードを含む行を抽出する処理を並列処理でループ
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(search_in_csv, file, keyword, summary_index) for file in csv_files]
for future in concurrent.futures.as_completed(futures):
# 検索したCSVデータの一時ファイルを配列にまとめる
search_csv_temp_files.append(future.result())
CSVファイル群をループさせCSVファイルをキーワード検索の関数へ渡す
戻り値の一時ファイルをそれぞれ配列へ格納しておく(一時ファイルはキーワードで絞ったCSVデータ)
メモリ圧迫を回避するために一時ファイルを経由する(ディスク圧迫の問題が出てきてしまうが)
CSVキーワード検索
def search_in_csv(file_path: Path, keyword: str, summary_index: int):
"""
1つのCSVファイルを検索し、一時ファイルに結果を書き出す(メモリ節約)
csvヘッダー「Summary」列のみを検索対象とする
"""
# 一時ファイル作成
temp_fd, temp_path = tempfile.mkstemp(prefix="search_tmp_", suffix=".csv")
os.close(temp_fd) # 使わないので閉じる
try:
with open(file_path, "r", encoding="cp932", errors="ignore") as f_in, \
open(temp_path, "w", encoding="utf-8", newline="") as f_out:
reader = csv.reader(f_in) # 読み込んだCSVファイル
writer = csv.writer(f_out) # 書き出す用の一時ファイル
# Summary列にキーワードが含まれる行だけ書き出し
for row in reader:
if len(row) > summary_index and keyword in row[summary_index]:
writer.writerow(row)
print(f"[INFO] 読み込み成功: {file_path}")
except Exception as e:
print(f"[ERROR] 読み込み失敗: {file_path} ({e})")
return temp_path
書き出すための一時ファイルを作成しencoding="utf-8"
で開く
受けとったCSVファイルをencoding="cp932"
で開く
Summary列にキーワードが含まれる行だけ書き出し一時ファイルを返却
一時ファイルをすべてマージしてからソート・重複排除
merged_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv").name
# 一時ファイルをすべてマージしてからソート・重複排除
with open(merged_temp_file, "w", encoding="utf-8") as merged_temp:
for search_csv_temp_file in search_csv_temp_files:
with open(search_csv_temp_file, "r", encoding="utf-8") as f_in:
merged_temp.write(f_in.read())
各一時ファイルを一つにまとめるためのマージ一時ファイル(merged_temp_file)
を作成
CSVキーワード検索で得た一時ファイル配列をマージ一時ファイルへ集約
ソート・重複排除
def deduplicate_with_limit(input_file: Path, output_file: Path, header: list):
"""
行数が上限を超える場合は、ソート・重複排除をスキップ
"""
# 行数を確認
line_count = sum(1 for _ in open(input_file, "r", encoding="utf-8"))
print(f"[INFO] ソート・重複排除(前): {line_count:,} 行")
# ソートしない
if line_count > CSV_ROW_MAX:
print(f"[INFO] 検索上限 {CSV_ROW_MAX:,} 行を超えたのでソートせずに終了します")
return False
# sort -u を実行して標準出力を受け取り
sort_result = subprocess.run(
["sort", "-u", str(input_file)],
capture_output = True,
env = {"LC_ALL": "C"},
text = True, # 出力を文字列で受け取る
encoding = "utf-8"
)
# BOMとヘッダー付きで最終ファイルに保存
with open(output_file, "w", encoding="utf-8-sig") as f_out:
f_out.write(",".join(header) + "\n")
f_out.write(sort_result.stdout)
return True
検索上限を超えた場合は処理を終了
subprocess
を使用してLinuxコマンドを実行する
Linuxのsortが優秀なので今回はpythonでのソート実装はしない
処理時間短縮のためLC_ALL C
を指定
with open(output_file, "w", encoding="utf-8-sig") as f_out:
f_out.write(",".join(header) + "\n")
f_out.write(sort_result.stdout)
return True
CSVの日本語表記で出力したいのでencoding="utf-8-sig"
を指定
ヘッダーを先頭行に追加しソート結果を続けて書き込む
処理の最後に必ず実行
try:
#処理
finally:
if os.path.exists(merged_temp_file): os.remove(merged_temp_file)
cleanup_and_exit(search_csv_temp_files)
最後にマージ後とマージ前の一時ファイルを全て削除する
これが実行されないと/tmpが大量の一時ファイルで埋め尽くされる
スクリプト全体
#!/usr/bin/python3
"""
CSV 検索・重複排除スクリプト
"""
import os
import sys
import csv
import time
import signal
import tempfile
import subprocess
import concurrent.futures
from pathlib import Path
from datetime import datetime
# 検索結果CSV出力先ファイル
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
OUTPUT_FILE = f"search_results_{timestamp}.csv"
# 重複排除処理の上限行数
CSV_ROW_MAX = 5_000_000
def search_in_csv(file_path: Path, keyword: str, summary_index: int):
"""
1つのCSVファイルを検索し、一時ファイルに結果を書き出す(メモリ節約)
csvヘッダー「Summary」列のみを検索対象とする
"""
# 一時ファイル作成
temp_fd, temp_path = tempfile.mkstemp(prefix="search_tmp_", suffix=".csv")
os.close(temp_fd) # 使わないので閉じる
try:
with open(file_path, "r", encoding="cp932", errors="ignore") as f_in, \
open(temp_path, "w", encoding="utf-8", newline="") as f_out:
reader = csv.reader(f_in) # 読み込んだCSVファイル
writer = csv.writer(f_out) # 書き出す用の一時ファイル
# Summary列にキーワードが含まれる行だけ書き出し
for row in reader:
if len(row) > summary_index and keyword in row[summary_index]:
writer.writerow(row)
print(f"[INFO] 読み込み成功: {file_path}")
except Exception as e:
print(f"[ERROR] 読み込み失敗: {file_path} ({e})")
return temp_path
def deduplicate_with_limit(input_file: Path, output_file: Path, header: list):
"""
行数が上限を超える場合は、ソート・重複排除をスキップ
"""
# 行数を確認
line_count = sum(1 for _ in open(input_file, "r", encoding="utf-8"))
print(f"[INFO] ソート・重複排除(前): {line_count:,} 行")
# ソートしない
if line_count > CSV_ROW_MAX:
print(f"[INFO] 検索上限 {CSV_ROW_MAX:,} 行を超えたのでソートせずに終了します")
return False
# sort -u を実行して標準出力を受け取り
sort_result = subprocess.run(
["sort", "-u", str(input_file)],
capture_output = True,
env = {"LC_ALL": "C"},
text = True, # 出力を文字列で受け取る
encoding = "utf-8"
)
# BOMとヘッダー付きで最終ファイルに保存
with open(output_file, "w", encoding="utf-8-sig") as f_out:
f_out.write(",".join(header) + "\n")
f_out.write(sort_result.stdout)
return True
def cleanup_and_exit(temp_files):
""" グローバルで管理している一時ファイルを全て削除し処理を終了
"""
for f in temp_files:
if f and os.path.exists(f):
try:
os.remove(f)
except Exception as e:
print(f"[WARN] cleanup失敗 {f}: {e}")
def make_handler(temp_files):
def handler(signum, frame):
for f in temp_files:
if f and os.path.exists(f):
try:
os.remove(f)
except Exception as e:
print(f"[WARN] cleanup失敗 {f}: {e}")
print("[INFO] 処理を終了しています....")
return handler
def main():
while True:
try:
keyword = input("検索キーワードの入力: ").strip()
if not keyword:
print("[INFO] キーワードが空です。再度入力してください。(例: TESTmsg)")
continue
else:
break
except KeyboardInterrupt:
sys.exit(1)
except ValueError:
print('[ERROR] 無効な入力値です')
while True:
try:
csv_dir_str = input("CSVディレクトリパスの入力: ").strip()
if not csv_dir_str:
print("[INFO] ディレクトリパスが空です。再度入力してください。(例: /home/test/csv)")
continue
csv_dir = Path(csv_dir_str)
if csv_dir.exists() and csv_dir.is_dir():
break
else:
print("[INFO] ディレクトリが存在しません。再度入力してください。(例: /home/test/csv)")
except KeyboardInterrupt:
sys.exit(1)
except ValueError:
print('[ERROR] 無効な入力値です')
search_csv_temp_files = []
start = time.time()
# Ctrl+C / kill を捕捉し、一時ファイルを全て削除
signal.signal(signal.SIGINT, make_handler(search_csv_temp_files))
signal.signal(signal.SIGTERM, make_handler(search_csv_temp_files))
csv_files = list(csv_dir.glob("*.csv"))
if not csv_files:
print("[INFO] CSVファイルが見つかりませんでした")
return
print(f"\n[INFO] {len(csv_files)} ファイル で検索開始 (キーワード: '{keyword}')")
header = None
# 最初のファイルでSummary列の位置を決定
with open(csv_files[0], "r", encoding="cp932", errors="ignore") as f:
reader = csv.reader(f)
header = next(reader)
try:
summary_index = header.index("Summary")
except ValueError:
print("[ERROR] Summary列が見つかりませんでした")
return
try:
# 並列処理
with concurrent.futures.ProcessPoolExecutor() as executor:
futures = [executor.submit(search_in_csv, file, keyword, summary_index) for file in csv_files]
for future in concurrent.futures.as_completed(futures):
# 検索したCSVデータの一時ファイルを配列にまとめる
search_csv_temp_files.append(future.result())
print(f'\n==============================')
if not search_csv_temp_files:
print("[INFO] 該当データは見つかりませんでした")
return
try:
# マージ用の一時ファイル (マージ時の衝突回避のため → 単発実行想定のため今回はリスクなし)
merged_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv").name
# 一時ファイルをすべてマージしてからソート・重複排除
with open(merged_temp_file, "w", encoding="utf-8") as merged_temp:
for search_csv_temp_file in search_csv_temp_files:
with open(search_csv_temp_file, "r", encoding="utf-8") as f_in:
merged_temp.write(f_in.read())
except FileNotFoundError:
# print("[INFO] ファイルが存在しません")
return
if deduplicate_with_limit(merged_temp_file, OUTPUT_FILE, header):
line_count_output = sum(1 for _ in open(OUTPUT_FILE, "r", encoding="utf-8"))
print(f"[INFO] ソート・重複排除(後): {line_count_output:,} 行 ({OUTPUT_FILE})")
finally:
if os.path.exists(merged_temp_file): os.remove(merged_temp_file)
cleanup_and_exit(search_csv_temp_files)
print(f"[INFO] 処理経過時間: {time.time() - start:.2f} 秒")
print(f'==============================')
if __name__ == "__main__":
main()
まとめ
このスクリプトを作成した環境では、ディスク容量は多いがメモリは少ないため今回の実装に落ち着いた。
CPUのコア数がたくさんある環境だと並列処理の恩恵を受けやすい。
もともとは一時ファイルではなくメモリ上に検索データを一時保存するスタイルで作成したが、大量に引っかかる検索キーワードだとメモリがパンクして動作しなくなってしまった。
ディスク、メモリどちらも潤沢にある環境ならもっとコンパクトなスクリプトで実装できるはず。
追記・修正 (25/09/05)
def cleanup_temp():
"""tmp以下の search_tmp_*.csv を強制削除"""
tmpdir = "/tmp"
for f in glob.glob(os.path.join(tmpdir, "search_tmp_*.csv")):
try:
os.remove(f)
# print(f"[INFO] 削除しました: {f}")
except Exception as e:
print(f"[WARN] 削除失敗 {f}: {e}")
一時ファイルの削除方法を「/tmp以下にあるsearch_tmp_*.csv
全てを対象にする」ように修正したほうが良さそうです。
OS ERROR時に、search_csv_temp_files
に一時ファイルが格納されない事象が発生し、処理の最後でクリーンアップを実行しても/tmpに一部の一時ファイルが残ったままになっていた