お久しぶりです、EITAです。
既視感溢れるタイトルの元ネタはこちらからどうぞ。
ここ最近PythonからCSVファイルを読み込み、DB(MySQL)に大量のレコードを追加しました。その際に得た知見を書き綴っていきます。
思い浮かべるINSERT文
多くの方々がテーブルにレコードを挿入する際の記述はこのように想像するのではないでしょうか?
INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')
正しい構文ですが、コミットするタイミング次第で処理時間が大幅に変化します。
コミットするタイミング
数万行のレコードを追加する前提で、こちらを見てください。
INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')
# commit
1行ずつコミットを行うと行数分コミットしてしまうため、数百行の追加ならまだしも、数万行以上をこの方法で実行するとかなりの時間を費やしてします。
SQLに馴染みがなかったのでわかりませんでしたが、複数行まとめてコミットすることも可能だそうです。
INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')
# ...
INSERT INTO test_table(id, name, age, address) VALUES(1000, 'taro', 20, ' Tokyo')
# commit
このようにコミットを行うことで999回分のコミットの時間を削減することができます。
しかし、それだけではありません、実はこれ以上に速く爆速で実行する方法があるのです...!
さらなる速度へ
それはbulk insertと呼ばれています。
わかりやすい説明が見つかったのでこちらをどうぞ。
どこかの誰かが、1回の命令で、たくさんのデータを入れられるinsert文を作りました。
データベースに入れたいデータを、ごそっと用意して「これ、入れておいて」と、まとめて指定できるinsert文です。この「大量のデータを一気に投入できるinsert文(っぽいやつ)」が「バルクインサート(bulk insert)」です。
「bulk insertぉぉぉおおお!ここにある大量のデータを!この箱に!入れてくれぇぇえ!」みたいな命令をすると、データベースさんは指定された大量のデータを指定の箱に入れて保管してくれます。
引用:バルクインサート (bulk insert)とは|「分かりそう」で「分からない」でも「分かった」気になれるIT用語辞典
すごいわかりやすいですね!DBによって記述方法は異なったりしますが、今回はMySQLを使った方法を紹介します。
テスト環境
PC | MacBook Pro (13-inch, 2016, Four Thunderbolt 3 Ports) |
---|---|
CPU | 3.1 GHz Intel Core i5 |
メモリ | 16 GB |
OS | macOS Mojave バージョン10.14.3 |
Python | 3.6.5 :: Anaconda |
DB | MySQL 10.1.28-MariaDB |
※Jupiter notebookで検証
ライブラリのインポート
import MySQLdb
import pandas as pd
from time import time
from tqdm import tqdm_notebook as tqdm
Mac用になりますが、MySQLコネクタであるmysqlclientのインストール時にエラーが出る方はこちらに対処法が掲載されています。
今回は少々時間がかかる処理(1分程度)を行うため、Jupyter notebook上にプログレスバーを表示するtqdmを利用します。なくても問題ありませんが、どの程度処理が残っているのか判断できるため便利なライブラリになります。こちらを参考にしてください。
データの用意
データベースに挿入するデータを用意します。
insert_dic = {"name" : ["taro", "jiro", "saburo", "sakura", "hanako"],
"sex" : [1, 1, 1, 2, 2],
"zipcode" : ["0010001", "0010002", "0010003", "0010004", "0010005"],
"prefecture_code" : [1, 2, 3, 4, 5],
"address" : ["北海道", "青森県", "岩手県", "宮城県", "秋田県"],
"age" : [20, 25, 30, 15, 20],
"phone_number_1" : ["00100010001", "00200020002", "00300030003", "00400040004", "00500050005"],
"phone_number_2" : ["0010001", "0020002", "0030003", "0040004", "0050005"]}
# DataFrame作成
insert_df = pd.DataFrame(insert_dic, columns = list(insert_dic.keys()))
insert_df
50000行に増やす
速度を体感できるデータを揃えることが難しいため、先ほど用意したデータを50000行に増やします。
1分ほどかかるので気長に待ちましょう。
insert_50000_df = insert_df
# 50000行になるまでループ
for i in tqdm(range(9999)):
insert_50000_df = pd.concat([insert_50000_df, insert_df])
i + 1
# 行数の確認
len(insert_50000_df)
データベースの接続情報
データベースに接続するための情報をdict型で定義します。
# データベース接続情報
db_config = {
'host' : '127.0.0.1',
'port' : 0,
'user' : 'hogehoge',
'password' : 'fugafuga',
'database' : 'test_db',
'use_unicode': True,
'charset': 'utf8'
}
比較する関数を用意
速度がどの程度違うか比較するため、それぞれ関数を用意します。
-
1行ずつコミットする関数
-
1000行ずつコミットする関数
-
bulk insert(1行に1000個の値をセットしてコミットする関数)
都度コミットするのは面倒なので関数で定義しちゃいましょう。
def commit(conn, cur):
cur.close()
conn.commit()
return conn.cursor()
1行ずつコミットする関数
初めに紹介した手法です。ループの中にコミットが仕込まれていることがわかります。
def insert_db_slow(sql, values):
start_time = time()
conn = MySQLdb.connect(**db_config)
cur = conn.cursor()
# レコード分ループ
for i in range(len(values[0])):
exec_values = []
# カラム分ループ
for j in range(len(values)):
exec_values.append(values[j][i])
# タプルに変換
exec_values = tuple(exec_values)
cur.execute(sql, exec_values)
# コミット
cur = commit(conn, cur)
print("time:{0:8.5f}sec".format(float(time() - start_time)))
1000行ずつコミットする関数
先ほどのコードと大きな違いはありませんが、コミットする頻度が大幅に削減できるため速度は向上します。
def insert_db_fast(sql, values):
start_time = time()
conn = MySQLdb.connect(**db_config)
cur = conn.cursor()
# レコード分ループ
for i in range(len(values[0])):
exec_values = []
# カラム分ループ
for j in range(len(values)):
exec_values.append(values[j][i])
# タプルに変換
exec_values = tuple(exec_values)
cur.execute(sql, exec_values)
# 1000行ごとにコミット
if (i + 1) % 1000 == 0:
cur = commit(conn, cur)
elif i + 1 == len(values[0]):
_ = commit(conn, cur)
print("time:{0:8.5f}sec".format(float(time() - start_time)))
bulk insert
お待ちかねのbulk insert関数です、コードを紹介した後に補足します。
def bulk_insert_db(sql, values):
start_time = time()
conn = MySQLdb.connect(**db_config)
cur = conn.cursor()
bulk_values = []
# レコード分ループ
for i in range(len(values[0])):
exec_values = []
# カラム分ループ
for j in range(len(values)):
exec_values.append(values[j][i])
# タプルに変換して配列に追加
bulk_values.append(tuple(exec_values))
# 挿入する値が1000個に達したらコミット
if (i + 1) % 1000 == 0:
cur.executemany(sql, bulk_values)
cur = commit(conn, cur)
bulk_values = []
elif i + 1 == len(values[0]):
cur.executemany(sql, bulk_values)
_ = commit(conn, cur)
del exec_values
print("time:{0:8.5f}sec".format(float(time() - start_time)))
1行ずつ実行する際は
cur.execute(sql, exec_values)
で実行していましたが、bulk insertではこのように変化していることがわかります。
cur.executemany(sql, bulk_values)
もう一つの大きな違いは、実行する際に渡す引数です。一行ずつデータをセットするSQL文では
INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')
このように1つずつ値をセットしていました。MySQLのbulk insertでは1行で複数の値をセットできます。
INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')(2, 'tanaka', 23, 'kagoshima')(3, 'satou', 25, 'kanagawa')...
実際にこのように記述するとエラーになりますが、イメージとして持っておくといいでしょう!
速度を比較してみる
先ほど作成した50000行のデータセットを追加してテストします。毎回テーブルを削除・作成しているので条件は同じです。
1行ずつコミット-> 1000行ずつコミット->bulk insertの10回ずつ行って確認してみましょう。
def execute_db(sql):
conn = MySQLdb.connect(**db_config)
cur = conn.cursor()
cur.execute(sql)
cur.close()
conn.commit()
# testテーブル作成
def create_table_test():
sql = """CREATE TABLE `test_table`(`id` INT NOT NULL AUTO_INCREMENT,
`name` VARCHAR(50) NOT NULL,
`sex` INT NOT NULL, `zipcode` INT NOT NULL,
`prefecture_code` INT NOT NULL,
`address` VARCHAR(100) NOT NULL,
`age` INT NOT NULL, `phone_number_1` INT NOT NULL,
`phone_number_2` INT NOT NULL,
PRIMARY KEY (`id`)
)ENGINE = InnoDB;"""
execute_db(sql)
# testテーブル削除
def drop_table_test():
sql = "DROP TABLE `test_table`"
execute_db(sql)
# DBに挿入するデータセット作成
def insert_db_values(df):
values = []
for i in range(len(df.columns)):
values.append(list(df[df.columns[i]]))
return values
# INSERT用のSQL文作成
def insert_sql(table_name, columns):
records = ""
values = ""
for i, column in enumerate(columns):
records += column
values += "%s"
if i != len(columns) - 1:
records += ", "
values += ", "
return "INSERT INTO {0:s}({1:s}) VALUES({2:s})".format(table_name, records, values)
# SQLの作成
sql = insert_sql('test_table', insert_50000_df.columns)
# データをセット
values = insert_db_values(insert_50000_df)
# テーブルに挿入(各手法10回ずつ実行)
insert_method = ["1_row_and_1_commit", "1000_rows_and_1_commit", "bulk_insert(1_row(1000_values)_and_1_commit)"]
for i, method_name in enumerate(insert_method):
print("{0:s}:".format(method_name))
for j in range(10):
create_table_test()
print(" {0:d}:".format(j + 1), end = "")
if i == 0:
insert_db_slow(sql, values)
elif i == 1:
insert_db_fast(sql, values)
else:
bulk_insert_db(sql, values)
drop_table_test()
print()
出力結果
1_row_and_1_commit:
01:time:15.93790sec
02:time:17.13466sec
03:time:16.28792sec
04:time:16.72755sec
05:time:16.05999sec
06:time:16.35922sec
07:time:16.04794sec
08:time:17.50887sec
09:time:17.33433sec
10:time:16.46473sec
1000_rows_and_1_commit:
01:time: 4.61864sec
02:time: 4.63770sec
03:time: 4.69085sec
04:time: 4.62592sec
05:time: 4.63636sec
06:time: 4.65857sec
07:time: 4.65147sec
08:time: 4.65053sec
09:time: 4.64463sec
10:time: 4.63797sec
bulk_insert(1_row(1000_values)_and_1_commit):
01:time: 1.10884sec
02:time: 1.11510sec
03:time: 1.10370sec
04:time: 1.09741sec
05:time: 1.11254sec
06:time: 1.20106sec
07:time: 1.61702sec
08:time: 1.08873sec
09:time: 1.60758sec
10:time: 1.65760sec
まとめ
- 1行ずつコミット->約16秒
- 1000行ずつコミット->約4秒
- bulk insert->約1秒
50000行を追加するだけで顕著な差が現れました。
さらに多くのレコードを追加する場合はより大きな差が開いてしまうため、大量のデータをDBに格納する際は工夫してみてくださいね!
IOT機器のログを管理する際に役立つかも?
時は金なり...身に沁みて感じました。