モチベーション
@Maki-Daisukeさんの記事(結局、Go言語をやめる理由はなかった件)に
1万行のcsvを読んだり書いたりするのに8秒もかかってしまうというのは直感に反しますね
csvの読み込みを1000行ぐらいでミニバッチ化してbulk importすれば2秒以下になりそうな気がしますが
というコメントをした結果、実際どうなるか知りたい人がいるらしいので試してみました。
似たような問題はRailsなどではやったことがありますが、Pythonでのナイスなソリューションを持ち合わせていなかったので、それを確保するのもモチベーションの一つです。
残念ながら私はGo言語が書けないので、Pythonのみチャレンジします。(Goでも似たようなことは書けると思います。)
ベンチマークを分析する
元となった@okdyy75さんの記事(だから僕はGo言語を辞めた)のpythonの結果を見る限り、「CSV読み込み+DBインサート」の実行時間が全体7.3秒に対して6.8秒と支配的です。
だがちょっと待っていただきたい、10000行程度で7秒も使っていたら1億行の書き込みにほぼ1日かかってしまいます。RDBがいくら書き込みが遅いと言われていてもインデックスもついていないテーブルへのINSERTがそこまで遅いわけがないという感覚を身に着けることが大事です。
ミニバッチにデータを分割する
今回の入力データは10000行ありますが、元のベンチマークコードは言語のDBクライアントを通じて、この行データを1行ずつMySQLに送信しています。1行csvを読み込んで、1行送信して終わったらまた1行読み込んでを繰り返しています。これでは明らかにDBとのNetworkIO(+csvファイルとのIO)に時間が食われます。
幸いなことにRDBは複数行を一括でINSERTする(bulk insert)機能がついています。これを使うことでDBとのNetworkIOの回数を減らすことができます。
今回の例だと10000行程度なので、まとめて読み込んでbulk insert1発で登録するのがおそらく一番速いです。
しかしこの方法はスケールしません。例えば10億行のデータを一括で突っ込もうとするとクライアントでOutOfMemoryErrorを出してしまうでしょう。そこで今回は10000行のデータを1000行のデータの塊10個に分割します。1000行のデータの塊のことをここではミニバッチと呼んでいますが、一般的にはchunkとも呼ばれています。
行のジェネレータをミニバッチのジェネレータに変換する
ミニバッチを作るときに、データを全部メモリに乗せてからミニバッチを作るのでは意味がありません。bulk insertに必要な分だけをメモリに読み込んでDBにINSERT文を発行するということが重要です。forやbreakを使って泥臭く実装してもいいですが、おそらくそれでは抽象度が低いわかりにくいコードになると思います。ここでpythonのジェネレータを活用すると実装が非常にシンプルになるので利用します。
from itertools import islice, chain
def chunked(iterable, size: int):
iterator = iter(iterable)
for first_element_of_batch in iterator:
yield chain([first_element_of_batch], islice(iterator, size - 1))
このコードはstackoverflowより引っ張ってきました。
Split a generator into chunks without pre-walking it
例えばchunked(range(0,10),3)
は[[0,1,2], [3,4,5], [6,7,8], [9]]
といった感じの数列を返します。(ここでは意図を明示するためにリストのリストっぽく書いていますが、実際にはジェネレータのジェネレータが返ります。)
この非常に小さな関数をうまく活用することでベンチマーク結果を大きく改善することができます。
ベンチマークスクリプトの変更
ベンチマークに以下のような変更を加えて、比較してみました。
(作ったchunked関数をうまく挟み1000行読み込んだら、bulk insertするという処理にする。)
import MySQLdb.cursors
+ from itertools import islice, chain
+
+ def chunked(iterable, size: int): #chunkを作る関数を追加して
+ iterator = iter(iterable)
+ for first_element_of_batch in iterator:
+ yield chain([first_element_of_batch], islice(iterator, size - 1))
# ...(略)
def work(db):
print_time('import CSV start')
with open('../import_users.csv') as f:
reader = csv.reader(f)
next(reader)
- for row in reader:
+ for chunk in chunked(reader,1000): # 1000行のミニバッチを作って
cur = db.cursor()
- cur.execute('''
+ cur.executemany(''' # bulkinsertする
# ...(略)
INSERT INTO users (
- ''', {
+ ''', [{
'name': row[1], # row[0]はidのため1から
'email': row[2],
'email_verified_at': row[3],
'password': row[4],
'remember_token': row[5],
'created_at': row[6],
'updated_at': row[7]
- })
+ } for row in chunk]) // 入れるデータをdictからdictのリストに変え
# ...(略)
結果
条件 | 結果 | スピード |
---|---|---|
元々のベンチマーク | 19.11 秒 | 1.00 x |
変更後のベンチマーク | 1.75 秒 | 10.92 x |
(元々のベンチマークが元記事に比べると2倍以上遅いですが、1.4GHzのmacなので仕方がない。11倍ぐらい改善されているので、元が8秒なら0.8秒程度になると期待できます。) |
結論
- ミドルウェアを叩くときは、言語よりも実行戦略の方が重要です。(N+1は避ける。)
- 汎用性の高い関数chunkedが発見できました。
さらなる冴えたやり方を考える
言語に差はないとわかっていても、唯一例外かもしれない言語があります。
そうshell script(CLI)ですね。mysqlだとmysqlimportとかいかにもbulk insertに向いてそうなクライアントがあるのですが、使ってみた感じ柔軟性があまり高くなく制約が強いので試すのは諦めました。
postgresとかだと例えば下のような感じのノリで加工しながらスピーディに標準入出力を受け取りできるので楽です。標準入力を受け付けるclickhouseなどの列志向なDWHにもシームレスにデータを投げ込むことができます。
psql -c "COPY (SELECT * FROM users limit 10) TO STDOUT" |
psql -c "COPY mini_users FROM STDIN"
mysqlに異なるデータソースからのデータを入れたり、色んなDB・DWH間でのデータ転送をしたい時には今回のようなミニバッチ戦略が有効な局面は多いと思います。
もちろん洗練されたデータレプリケーション基盤があればこんなことする必要ないですが、いつでもそういうものがあるわけではないので、道具箱にしまっておきましょう。
他にも非同期化するなども改善戦略としては考えられます。レイテンシー改善には適していると思いますが、バッチサイズを大きくした方がスループットは上がりそうだというのが第一感ですね