8
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Pythonで大量のCSVデータを高速に読み書きする

Last updated at Posted at 2023-08-19

背景

以下の処理を行うプログラムで時間がかかりすぎていることが課題となりました。

  1. 数GB程度のCSVファイルから、ある条件に該当するデータの行番号を一覧として取得します
    ⇛ 例えば[0,1,5,7,10...]行目を利用します

  2. 入力ファイルを1行ずつ読み込み、そのうち利用したい行を抽出して出力ファイルに書き込みます
    ⇛ 出力ファイルに[0,1,5,7,10...]行目のみが書き込まれます

上記2.のIO(読み書き)処理で数時間かかるレベルになっているので、改善が必要です。

目的

出力ファイルの内容に影響を与えずに、大量データの読み込みと書き込み処理を高速化する。

検証

色々検証しましたので、その内容と結果を以下に書いていきます。
最終的にはpandasのメソッドを使うのが最も高速でした。

入力ファイル

26個の列で、0~1000000行の数値が含まれています。

"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"
"0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0"
"1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1","1"
...
"999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999","999999"
"1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000"

検証①:ASISの処理

現行の処理では、入力ファイルを一行ずつ読み込んで、抽出対象の行を1行ずつ出力ファイルに書き込んでいます。

import time
import csv

# 入力ファイルと出力ファイルの指定
input_file = 'input_large.csv'
output_file = 'output_selected_rows.csv'

# 抽出したい行番号のリストを指定(0~)
selected_row_indices = [i for i in range(0, 1000000+1, 25)] # 適当に25行ごとに1行を抽出

# 開始時間の取得
start_time = time.time()

# 入力ファイルと出力ファイルを開く
with open(input_file, 'r', newline='') as infile, \
     open(output_file, 'w', newline='') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile, quoting=csv.QUOTE_ALL)

    # 先にヘッダー行を書き込む
    header = next(reader)
    writer.writerow(header)
    
    # 残りのデータを順番に見て行き、該当するものを出力ファイルに書き込む
    for row_index, row in enumerate(reader):
        if row_index in selected_row_indices:
            writer.writerow(row)

# 終了時間の取得
end_time = time.time()

# 経過時間の計算と印字
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

ファイル読み込みとファイル書込みが完了するまで…
⇛ 131.00 秒

出力ファイル

"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"
"0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0"
"25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25"
...
"999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975"
"1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000"

⇛ 問題なし。

検証②:バッファに数行を溜めてからファイルに書き込み

まず、1行ずつより複数行を一発で書き込んだ方が高速だと聞いたのでバッファーを用意しました。
バッファーに1000行が溜まったら書き込むイメージです。

import csv
import time
from itertools import islice

# 入力ファイルと出力ファイルの指定
input_file = 'input_large.csv'
output_file = 'output_selected_rows_2.csv'


# 抽出したい行番号のリストを指定(0~)
selected_row_indices = [i for i in range(0, 1000000+1, 25)]

# チャンクごとの行数を指定
chunk_size = 1000 

# 開始時間の取得
start_time = time.time()

# 入力ファイルと出力ファイルを開く
with open(input_file, 'r', newline='') as infile, \
     open(output_file, 'w', newline='') as outfile:
    reader = csv.reader(infile)
    writer = csv.writer(outfile, quoting=csv.QUOTE_ALL)

    # 先にヘッダー行を書き込む
    header = next(reader)
    writer.writerow(header)
    
    # 書き込む前に保管するバッファーを用意
    buffer = []

    # 残りのデータを順番に見て行き、該当するものを出力ファイルに書き込む
    for chunk in iter(lambda: list(islice(reader, chunk_size)), []):
        for row_index, row in enumerate(chunk):
            if row_index in selected_row_indices:
                buffer.append(row)

        # バッファーの内容をファイルに書き込む
        if len(buffer) >= chunk_size:
            writer.writerows(buffer)
            buffer = []

    # バッファーに残っている行を書き込む
    if buffer:
        writer.writerows(buffer)

# 終了時間を取得
end_time = time.time()

# 経過時間の計算と印字
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

ファイル読み込みとファイル書込みが完了するまで…
⇛ 126.57 秒 (驚くほど速くなってないよなぁ…)

出力ファイル

"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"
"0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0"
"25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25"
...
"999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975"
"1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000"

⇛ 問題なし。

検証③:pandasメソッドを利用

本当はPandasで色々試してみたけれど、以下が一番効果的でした。
chunkで入力ファイルを1000行ずつ読み込み、書込みはPandasのto_csvメソッドに任せるとうまくいきました。

注意

  • read_csvのデフォルト動作ではデータ型の変換が行われてしまうので、希望しない場合はdtype=str等のオプションを追加する必要があります
  • to_csvメソッドのlineterminator='\r\n'オプションを追加しないと出力ファイルの改行コードがCRLFからLFになります
import csv
import time
import pandas as pd

# 入力ファイルと出力ファイルの指定
input_file = 'input_large.csv'
output_file = 'output_selected_rows_3.csv'

# 抽出したい行番号のリストを指定(0~)
selected_row_indices = [i for i in range(0, 1000000+1, 25)]

# チャンクごとの行数を指定
chunk_size = 1000

# 開始時間の取得
start_time = time.time()

# チャンクを読み込む処理を指定
chunks = pd.read_csv(input_file, chunksize=chunk_size)

selected_rows = []
current_row = 0
for chunk in chunks:
    # 現チャンクの範囲を計算し、そのうちに利用したいデータの行数に絞る
    end_row = current_row + len(chunk)
    chunk_indices = [idx for idx in selected_row_indices if current_row <= idx < end_row]
    
    # 現チャンクのインデックスを加工(チャンクのインデックスじゃなくて全体のインデックスを使わせるため)
    adjusted_indices = [idx - current_row for idx in chunk_indices]
    
    # 該当する行をリストに追加
    selected_rows.append(chunk.iloc[adjusted_indices])
    current_row = end_row

# 選択した行をまとめる
selected_rows_df = pd.concat(selected_rows)

# 選択した行をすべて出力ファイルに書き込む
selected_rows_df.to_csv(output_file, index=False, quoting=csv.QUOTE_ALL, lineterminator='\r\n')

# 終了時間の取得
end_time = time.time()

# 経過時間の計算と印字
elapsed_time = end_time - start_time
print(f"Time taken: {elapsed_time:.2f} seconds")

ファイル読み込みとファイル書込みが完了するまで…
⇛ 3.54 秒 !(^◇^)

出力ファイル

"a","b","c","d","e","f","g","h","i","j","k","l","m","n","o","p","q","r","s","t","u","v","w","x","y","z"
"0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0","0"
"25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25","25"
...
"999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975","999975"
"1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000","1000000"

⇛ 問題なし。

おわりに

大量データのCSVを扱う際にはやはりPandasが有利ですね。
chunkを使って部分的に処理するとメモリーの使いすぎ防止ができるし、IOも速くなるといったメリットがありますので、皆さんもぜひ使ってみてください。

結果まとめは以下のとおりです。

検証 概要 経過時間 出力ファイルエンコーディング 出力ファイル改行コード
csvで1行ずつ読み書き 131.00 秒 UTF-8 CRLF
csvでバッファーを利用して書き込み 126.57 秒 UTF-8 CRLF
pandasでchunkを利用して書込み 3.54 秒 UTF-8 CRLF

ちなみにデータが複数のファイルに分かれていた場合、並行処理が有利だと思います。
よくjoblibを使っていますので、必要だったら調べてみてください。

ということで、今回はそこまで。

8
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?