0
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonで並列処理を用いて、大量のCSVファイルをキーワード検索でソートする

Last updated at Posted at 2025-09-04

概要

  • 指定ディレクトリ配下の CSV ファイル群を検索し、指定列に指定キーワードを含む行を抽出
  • 抽出結果は一時ファイルに書き出した後マージし、sort コマンドでソート・重複排除を行って最終結果 CSV を出力

特徴

  • cp932 でエンコードされた CSV を読み取り (Windows 向け)
  • "Summary" 列を対象にキーワード検索
  • 並列処理 (ProcessPoolExecutor) による検索高速化 (実行環境のコア数による)
  • 一時ファイルのマージと Linux sort -u コマンドでの重複排除
  • 上限行数を超える場合はソート・重複排除をスキップ(ディスクとメモリ圧迫回避)
  • 出力 CSV は UTF-8 BOM 付き (utf-8-sig) で保存 (Windows Excel 対応)
  • 出力ファイルは実行時刻付きのファイル名で保存

出力

  • ファイル名: search_results_YYYYMMDD_HHMMSS.csv
  • ヒットした行を出力(ヘッダー付き)
  • 重複排除あり(行全体一致の場合のみ)
  • 実行スクリプトと同階層に出力

注意事項

  • "Summary" 列が存在しない場合は処理を中断
  • 大量データでは出力が数百万行になる可能性あり
  • 一時ファイルや中間ファイルをディスクに書き出すため、ディスク I/O がボトルネックになる可能性あり
  • ソート処理は Linux 環境の sort コマンドに依存

main() 解説

検索キーワードとCSVディレクトリパスの入力
keyword = input("検索キーワードの入力: ").strip()
csv_dir_str = input("CSVディレクトリパスの入力: ").strip()

正しい値が入力されるまでループさせる
入力途中Ctrl+Cで処理を終了
CSVディレクトリパスはディレクトリの存在有無も確認

シグナルを設定
signal.signal(signal.SIGINT,  make_handler(search_csv_temp_files))
signal.signal(signal.SIGTERM, make_handler(search_csv_temp_files))

Ctrl+CまたはKillを補足し、一時ファイル群を削除する

渡した一時ファイルを削除する関数
make_handler
def make_handler(temp_files):
    def handler(signum, frame):
        for f in temp_files:
            if f and os.path.exists(f):
                try:
                    os.remove(f)
                except Exception as e:
                    print(f"[WARN] cleanup失敗 {f}: {e}")
        print("[INFO] 処理を終了しています....")
    return handler

シグナルで引数も一緒に渡したかったためクロージャで関数を定義

ディレクトリ内のCSVファイルをまとめた配列
csv_files = list(csv_dir.glob("*.csv"))
CSVファイルの最初のファイルでSummary列の位置を決定
with open(csv_files[0], "r", encoding="cp932", errors="ignore") as f:
    reader = csv.reader(f)
    header = next(reader)
    try:
        summary_index = header.index("Summary")
    except ValueError:
        print("[ERROR] Summary列が見つかりませんでした")
        return

検索対象の列を最初に決定しておく(検索範囲を絞ることによる処理時間短縮)
CSVファイルのヘッダーもここで取得しておく

CSVファイルを検索しキーワードを含む行を抽出する処理を並列処理でループ
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = [executor.submit(search_in_csv, file, keyword, summary_index) for file in csv_files]
        for future in concurrent.futures.as_completed(futures):
            # 検索したCSVデータの一時ファイルを配列にまとめる
            search_csv_temp_files.append(future.result())

CSVファイル群をループさせCSVファイルをキーワード検索の関数へ渡す
戻り値の一時ファイルをそれぞれ配列へ格納しておく(一時ファイルはキーワードで絞ったCSVデータ)
メモリ圧迫を回避するために一時ファイルを経由する(ディスク圧迫の問題が出てきてしまうが)

CSVキーワード検索
search_in_csv
def search_in_csv(file_path: Path, keyword: str, summary_index: int):
    """
        1つのCSVファイルを検索し、一時ファイルに結果を書き出す(メモリ節約)
        csvヘッダー「Summary」列のみを検索対象とする
    """
    # 一時ファイル作成
    temp_fd, temp_path = tempfile.mkstemp(prefix="search_tmp_", suffix=".csv")
    os.close(temp_fd)  # 使わないので閉じる

    try:
        with open(file_path, "r", encoding="cp932", errors="ignore") as f_in, \
             open(temp_path, "w", encoding="utf-8", newline="") as f_out:

            reader = csv.reader(f_in)  # 読み込んだCSVファイル
            writer = csv.writer(f_out) # 書き出す用の一時ファイル

            # Summary列にキーワードが含まれる行だけ書き出し
            for row in reader:
                if len(row) > summary_index and keyword in row[summary_index]:
                    writer.writerow(row)

        print(f"[INFO] 読み込み成功: {file_path}")
    except Exception as e:
        print(f"[ERROR] 読み込み失敗: {file_path} ({e})")
    return temp_path

書き出すための一時ファイルを作成しencoding="utf-8"で開く
受けとったCSVファイルをencoding="cp932"で開く
Summary列にキーワードが含まれる行だけ書き出し一時ファイルを返却

一時ファイルをすべてマージしてからソート・重複排除
merged_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv").name
# 一時ファイルをすべてマージしてからソート・重複排除
with open(merged_temp_file, "w", encoding="utf-8") as merged_temp:
    for search_csv_temp_file in search_csv_temp_files:
        with open(search_csv_temp_file, "r", encoding="utf-8") as f_in:
            merged_temp.write(f_in.read())

各一時ファイルを一つにまとめるためのマージ一時ファイル(merged_temp_file)を作成
CSVキーワード検索で得た一時ファイル配列をマージ一時ファイルへ集約

ソート・重複排除
deduplicate_with_limit
def deduplicate_with_limit(input_file: Path, output_file: Path, header: list):
    """
    行数が上限を超える場合は、ソート・重複排除をスキップ
    """

    # 行数を確認
    line_count = sum(1 for _ in open(input_file, "r", encoding="utf-8"))
    print(f"[INFO] ソート・重複排除(前): {line_count:,}")

    # ソートしない
    if line_count > CSV_ROW_MAX:
        print(f"[INFO] 検索上限 {CSV_ROW_MAX:,} 行を超えたのでソートせずに終了します")
        return False

    # sort -u を実行して標準出力を受け取り
    sort_result = subprocess.run(
        ["sort", "-u", str(input_file)],
        capture_output = True,
        env            = {"LC_ALL": "C"},
        text           = True, # 出力を文字列で受け取る
        encoding       = "utf-8"
    )
    # BOMとヘッダー付きで最終ファイルに保存
    with open(output_file, "w", encoding="utf-8-sig") as f_out:
        f_out.write(",".join(header) + "\n")
        f_out.write(sort_result.stdout)
    return True

検索上限を超えた場合は処理を終了
subprocessを使用してLinuxコマンドを実行する
Linuxのsortが優秀なので今回はpythonでのソート実装はしない
処理時間短縮のためLC_ALL Cを指定

with open(output_file, "w", encoding="utf-8-sig") as f_out:
    f_out.write(",".join(header) + "\n")
    f_out.write(sort_result.stdout)
return True

CSVの日本語表記で出力したいのでencoding="utf-8-sig"を指定
ヘッダーを先頭行に追加しソート結果を続けて書き込む

処理の最後に必ず実行
try:
    #処理
finally:
    if os.path.exists(merged_temp_file): os.remove(merged_temp_file)
    cleanup_and_exit(search_csv_temp_files)

最後にマージ後とマージ前の一時ファイルを全て削除する
これが実行されないと/tmpが大量の一時ファイルで埋め尽くされる

スクリプト全体

csv_search_sort.sh
#!/usr/bin/python3
"""
CSV 検索・重複排除スクリプト
"""
import os
import sys
import csv
import time
import signal
import tempfile
import subprocess
import concurrent.futures
from   pathlib  import Path
from   datetime import datetime

# 検索結果CSV出力先ファイル
timestamp   = datetime.now().strftime("%Y%m%d_%H%M%S")
OUTPUT_FILE = f"search_results_{timestamp}.csv"
# 重複排除処理の上限行数
CSV_ROW_MAX = 5_000_000


def search_in_csv(file_path: Path, keyword: str, summary_index: int):
    """
        1つのCSVファイルを検索し、一時ファイルに結果を書き出す(メモリ節約)
        csvヘッダー「Summary」列のみを検索対象とする
    """
    # 一時ファイル作成
    temp_fd, temp_path = tempfile.mkstemp(prefix="search_tmp_", suffix=".csv")
    os.close(temp_fd)  # 使わないので閉じる

    try:
        with open(file_path, "r", encoding="cp932", errors="ignore") as f_in, \
             open(temp_path, "w", encoding="utf-8", newline="") as f_out:

            reader = csv.reader(f_in)  # 読み込んだCSVファイル
            writer = csv.writer(f_out) # 書き出す用の一時ファイル

            # Summary列にキーワードが含まれる行だけ書き出し
            for row in reader:
                if len(row) > summary_index and keyword in row[summary_index]:
                    writer.writerow(row)

        print(f"[INFO] 読み込み成功: {file_path}")
    except Exception as e:
        print(f"[ERROR] 読み込み失敗: {file_path} ({e})")
    return temp_path



def deduplicate_with_limit(input_file: Path, output_file: Path, header: list):
    """
    行数が上限を超える場合は、ソート・重複排除をスキップ
    """

    # 行数を確認
    line_count = sum(1 for _ in open(input_file, "r", encoding="utf-8"))
    print(f"[INFO] ソート・重複排除(前): {line_count:,}")

    # ソートしない
    if line_count > CSV_ROW_MAX:
        print(f"[INFO] 検索上限 {CSV_ROW_MAX:,} 行を超えたのでソートせずに終了します")
        return False

    # sort -u を実行して標準出力を受け取り
    sort_result = subprocess.run(
        ["sort", "-u", str(input_file)],
        capture_output = True,
        env            = {"LC_ALL": "C"},
        text           = True, # 出力を文字列で受け取る
        encoding       = "utf-8"
    )
    # BOMとヘッダー付きで最終ファイルに保存
    with open(output_file, "w", encoding="utf-8-sig") as f_out:
        f_out.write(",".join(header) + "\n")
        f_out.write(sort_result.stdout)
    return True


def cleanup_and_exit(temp_files):
    """ グローバルで管理している一時ファイルを全て削除し処理を終了
    """

    for f in temp_files:
        if f and os.path.exists(f):
            try:
                os.remove(f)
            except Exception as e:
                print(f"[WARN] cleanup失敗 {f}: {e}")


def make_handler(temp_files):
    def handler(signum, frame):
        for f in temp_files:
            if f and os.path.exists(f):
                try:
                    os.remove(f)
                except Exception as e:
                    print(f"[WARN] cleanup失敗 {f}: {e}")
        print("[INFO] 処理を終了しています....")
    return handler


def main():

    while True:
        try:
            keyword = input("検索キーワードの入力: ").strip()
            if not keyword:
                print("[INFO] キーワードが空です。再度入力してください。(例: TESTmsg)")
                continue
            else:
                break
        except KeyboardInterrupt:
            sys.exit(1)
        except ValueError:
            print('[ERROR] 無効な入力値です')

    while True:
        try:
            csv_dir_str = input("CSVディレクトリパスの入力: ").strip()
            if not csv_dir_str:
                print("[INFO] ディレクトリパスが空です。再度入力してください。(例: /home/test/csv)")
                continue
            csv_dir = Path(csv_dir_str)
            if csv_dir.exists() and csv_dir.is_dir():
                break
            else:
                print("[INFO] ディレクトリが存在しません。再度入力してください。(例: /home/test/csv)")
        except KeyboardInterrupt:
            sys.exit(1)
        except ValueError:
            print('[ERROR] 無効な入力値です')

    search_csv_temp_files = []
    start = time.time()
    # Ctrl+C / kill を捕捉し、一時ファイルを全て削除
    signal.signal(signal.SIGINT,  make_handler(search_csv_temp_files))
    signal.signal(signal.SIGTERM, make_handler(search_csv_temp_files))

    csv_files = list(csv_dir.glob("*.csv"))
    if not csv_files:
        print("[INFO] CSVファイルが見つかりませんでした")
        return

    print(f"\n[INFO] {len(csv_files)} ファイル で検索開始 (キーワード: '{keyword}')")

    header = None
    # 最初のファイルでSummary列の位置を決定
    with open(csv_files[0], "r", encoding="cp932", errors="ignore") as f:
        reader = csv.reader(f)
        header = next(reader)
        try:
            summary_index = header.index("Summary")
        except ValueError:
            print("[ERROR] Summary列が見つかりませんでした")
            return

    try:
        # 並列処理
        with concurrent.futures.ProcessPoolExecutor() as executor:
            futures = [executor.submit(search_in_csv, file, keyword, summary_index) for file in csv_files]
            for future in concurrent.futures.as_completed(futures):
                # 検索したCSVデータの一時ファイルを配列にまとめる
                search_csv_temp_files.append(future.result())

        print(f'\n==============================')
        if not search_csv_temp_files:
            print("[INFO] 該当データは見つかりませんでした")
            return

        try:
            # マージ用の一時ファイル (マージ時の衝突回避のため → 単発実行想定のため今回はリスクなし)
            merged_temp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv").name
            # 一時ファイルをすべてマージしてからソート・重複排除
            with open(merged_temp_file, "w", encoding="utf-8") as merged_temp:
                for search_csv_temp_file in search_csv_temp_files:
                    with open(search_csv_temp_file, "r", encoding="utf-8") as f_in:
                        merged_temp.write(f_in.read())
        except FileNotFoundError:
            # print("[INFO] ファイルが存在しません")
            return

        if deduplicate_with_limit(merged_temp_file, OUTPUT_FILE, header):
            line_count_output = sum(1 for _ in open(OUTPUT_FILE, "r", encoding="utf-8"))
            print(f"[INFO] ソート・重複排除(後): {line_count_output:,} 行 ({OUTPUT_FILE}")

    finally:

        if os.path.exists(merged_temp_file): os.remove(merged_temp_file)
        cleanup_and_exit(search_csv_temp_files)
        print(f"[INFO] 処理経過時間:  {time.time() - start:.2f}")
        print(f'==============================')


if __name__ == "__main__":
    main()

まとめ

このスクリプトを作成した環境では、ディスク容量は多いがメモリは少ないため今回の実装に落ち着いた。
CPUのコア数がたくさんある環境だと並列処理の恩恵を受けやすい。
もともとは一時ファイルではなくメモリ上に検索データを一時保存するスタイルで作成したが、大量に引っかかる検索キーワードだとメモリがパンクして動作しなくなってしまった。
ディスク、メモリどちらも潤沢にある環境ならもっとコンパクトなスクリプトで実装できるはず。

追記・修正 (25/09/05)

一時ファイル削除関数
def cleanup_temp():
    """tmp以下の search_tmp_*.csv を強制削除"""
    tmpdir = "/tmp"
    for f in glob.glob(os.path.join(tmpdir, "search_tmp_*.csv")):
        try:
            os.remove(f)
            # print(f"[INFO] 削除しました: {f}")
        except Exception as e:
            print(f"[WARN] 削除失敗 {f}: {e}")

一時ファイルの削除方法を「/tmp以下にあるsearch_tmp_*.csv全てを対象にする」ように修正したほうが良さそうです。

OS ERROR時に、search_csv_temp_filesに一時ファイルが格納されない事象が発生し、処理の最後でクリーンアップを実行しても/tmpに一部の一時ファイルが残ったままになっていた:tired_face:

0
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?