磯野〜!数百万件のレコードが入っているcsvファイルがあるから、分割して並列で処理しようぜ!
備考
- 分割するのと並列するのを一緒にやってるのが見当たらなかったので備忘録として書きました、なんか素敵な記事があったら教えてください
- python3.7です
- pandas使いません
- pool使います
- future使いません
早速やる
読み込むファイル
sample.csv
1,あ
2,い
3,う
4,え
5,お
gen_chunks()
は作っている人がいたので拝借しました https://stackoverflow.com/a/4957046
だいたいのことは誰かが既にやってくれています。ありがとうインターネット、ありがとう先人たち
できあがったコードはこちら
pool.py
import csv
import time
from multiprocessing import Pool
def read():
f = open("sample.csv", "r")
reader = csv.reader(f)
pool = Pool()
results = []
for data_list in gen_chunks(reader):
results.append(pool.apply_async(do_something, [data_list]))
pool.close()
pool.join()
_ = [r.get() for r in results]
f.close()
def do_something(data_list):
print(f"start {data_list}")
time.sleep(len(data_list))
# hoge
print(f"finish {data_list}")
def gen_chunks(reader, chunksize=2):
"""
Chunk generator. Take a CSV `reader` and yield
`chunksize` sized slices.
"""
chunk = []
for i, line in enumerate(reader):
if i % chunksize == 0 and i > 0:
yield chunk
chunk = []
chunk.append(line)
yield chunk
read()
結果
start [['1', 'あ'], ['2', 'い']]
start [['3', 'う'], ['4', 'え']]
start [['5', 'お']]
finish [['5', 'お']]
finish [['1', 'あ'], ['2', 'い']]
finish [['3', 'う'], ['4', 'え']]
chunksize=2
なので2レコードずつdo_something()
に渡されています。でかいcsvだったらここをいい感じにしましょう。
_ = [r.get() for r in results]
でエラーを拾っています。エラー処理をすべきですが面倒なので省いています
もっと良い書き方がありそうなので知ってる人は教えてください
また、stackoverflowでも指摘されていますが、gen_chunks()
の配列リセット部分をdel chunk[:]
にすると出力が以下になります
結果
start [['5', 'お']]
start [['5', 'お']]
start [['5', 'お']]
finish [['5', 'お']]
finish [['5', 'お']]
finish [['5', 'お']]
私はコメントをちゃんと読んでなかったので残念な結果を目の当たりにしました
悲しいね