LoginSignup
45

More than 5 years have passed since last update.

うわっ...私のレコード追加、遅すぎ...?(爆速でINSERTする極意)

Last updated at Posted at 2019-02-22

お久しぶりです、EITAです。
既視感溢れるタイトルの元ネタはこちらからどうぞ。

ここ最近PythonからCSVファイルを読み込み、DB(MySQL)に大量のレコードを追加しました。その際に得た知見を書き綴っていきます。

思い浮かべるINSERT文

多くの方々がテーブルにレコードを挿入する際の記述はこのように想像するのではないでしょうか?

INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')

正しい構文ですが、コミットするタイミング次第で処理時間が大幅に変化します。

コミットするタイミング

数万行のレコードを追加する前提で、こちらを見てください。

INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')
# commit

1行ずつコミットを行うと行数分コミットしてしまうため、数百行の追加ならまだしも、数万行以上をこの方法で実行するとかなりの時間を費やしてします。
SQLに馴染みがなかったのでわかりませんでしたが、複数行まとめてコミットすることも可能だそうです。

INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')
# ...
INSERT INTO test_table(id, name, age, address) VALUES(1000, 'taro', 20, ' Tokyo')
# commit

このようにコミットを行うことで999回分のコミットの時間を削減することができます。

しかし、それだけではありません、実はこれ以上に速く爆速で実行する方法があるのです...!

さらなる速度へ

それはbulk insertと呼ばれています。
わかりやすい説明が見つかったのでこちらをどうぞ。

どこかの誰かが、1回の命令で、たくさんのデータを入れられるinsert文を作りました。
データベースに入れたいデータを、ごそっと用意して「これ、入れておいて」と、まとめて指定できるinsert文です。

この「大量のデータを一気に投入できるinsert文(っぽいやつ)」が「バルクインサート(bulk insert)」です。
bulk insertぉぉぉおおお!ここにある大量のデータを!この箱に!入れてくれぇぇえ!」みたいな命令をすると、データベースさんは指定された大量のデータを指定の箱に入れて保管してくれます。
d005497-4.png

引用:バルクインサート (bulk insert)とは|「分かりそう」で「分からない」でも「分かった」気になれるIT用語辞典

すごいわかりやすいですね!DBによって記述方法は異なったりしますが、今回はMySQLを使った方法を紹介します。

テスト環境

PC MacBook Pro (13-inch, 2016, Four Thunderbolt 3 Ports)
CPU 3.1 GHz Intel Core i5
メモリ 16 GB
OS macOS Mojave バージョン10.14.3
Python 3.6.5 :: Anaconda
DB MySQL 10.1.28-MariaDB

※Jupiter notebookで検証

ライブラリのインポート

import MySQLdb
import pandas as pd
from time import time
from tqdm import tqdm_notebook as tqdm

Mac用になりますが、MySQLコネクタであるmysqlclientのインストール時にエラーが出る方はこちらに対処法が掲載されています。

今回は少々時間がかかる処理(1分程度)を行うため、Jupyter notebook上にプログレスバーを表示するtqdmを利用します。なくても問題ありませんが、どの程度処理が残っているのか判断できるため便利なライブラリになります。こちらを参考にしてください。

データの用意

データベースに挿入するデータを用意します。
insert_dic = {"name" : ["taro", "jiro", "saburo", "sakura", "hanako"],
    "sex" : [1, 1, 1, 2, 2],
    "zipcode" : ["0010001", "0010002", "0010003", "0010004", "0010005"],
    "prefecture_code" : [1, 2, 3, 4, 5],
    "address" : ["北海道", "青森県", "岩手県", "宮城県", "秋田県"],
    "age" : [20, 25, 30, 15, 20],
    "phone_number_1" : ["00100010001", "00200020002", "00300030003", "00400040004", "00500050005"],
    "phone_number_2" : ["0010001", "0020002", "0030003", "0040004", "0050005"]} 

# DataFrame作成
insert_df = pd.DataFrame(insert_dic, columns = list(insert_dic.keys()))
insert_df

スクリーンショット 2019-02-20 10.04.34.png

50000行に増やす

速度を体感できるデータを揃えることが難しいため、先ほど用意したデータを50000行に増やします。

1分ほどかかるので気長に待ちましょう。

insert_50000_df = insert_df

# 50000行になるまでループ
for i in tqdm(range(9999)):
    insert_50000_df = pd.concat([insert_50000_df, insert_df])
    i + 1

# 行数の確認
len(insert_50000_df)

スクリーンショット 2019-02-20 10.04.34.png

データベースの接続情報

データベースに接続するための情報をdict型で定義します。

# データベース接続情報
db_config = {
        'host'     : '127.0.0.1',
        'port'     : 0,
        'user'     : 'hogehoge',
        'password' : 'fugafuga',
        'database' : 'test_db',
        'use_unicode': True,
        'charset': 'utf8'
}

比較する関数を用意

速度がどの程度違うか比較するため、それぞれ関数を用意します。

  • 1行ずつコミットする関数

  • 1000行ずつコミットする関数

  • bulk insert(1行に1000個の値をセットしてコミットする関数)

都度コミットするのは面倒なので関数で定義しちゃいましょう。

def commit(conn, cur):
    cur.close()
    conn.commit()
    return conn.cursor()

1行ずつコミットする関数

初めに紹介した手法です。ループの中にコミットが仕込まれていることがわかります。
def insert_db_slow(sql, values):
    start_time = time()
    conn = MySQLdb.connect(**db_config)
    cur = conn.cursor()

    # レコード分ループ
    for i in range(len(values[0])):
        exec_values = []
        # カラム分ループ
        for j in range(len(values)):
            exec_values.append(values[j][i])
        # タプルに変換
        exec_values = tuple(exec_values)
        cur.execute(sql, exec_values)

        # コミット
        cur = commit(conn, cur)

    print("time:{0:8.5f}sec".format(float(time() - start_time)))

1000行ずつコミットする関数

先ほどのコードと大きな違いはありませんが、コミットする頻度が大幅に削減できるため速度は向上します。
def insert_db_fast(sql, values):
    start_time = time()
    conn = MySQLdb.connect(**db_config)
    cur = conn.cursor()

    # レコード分ループ
    for i in range(len(values[0])):
        exec_values = []
        # カラム分ループ
        for j in range(len(values)):
            exec_values.append(values[j][i])
        # タプルに変換
        exec_values = tuple(exec_values)
        cur.execute(sql, exec_values)

        # 1000行ごとにコミット
        if (i + 1) % 1000 == 0:
            cur = commit(conn, cur)
        elif i + 1 == len(values[0]):
            _ = commit(conn, cur)

    print("time:{0:8.5f}sec".format(float(time() - start_time)))

bulk insert

お待ちかねのbulk insert関数です、コードを紹介した後に補足します。

def bulk_insert_db(sql, values):
    start_time = time()
    conn = MySQLdb.connect(**db_config)
    cur = conn.cursor()
    bulk_values = []

    # レコード分ループ
    for i in range(len(values[0])):
        exec_values = []
        # カラム分ループ
        for j in range(len(values)):
            exec_values.append(values[j][i])
        # タプルに変換して配列に追加
        bulk_values.append(tuple(exec_values))

        # 挿入する値が1000個に達したらコミット
        if (i + 1) % 1000 == 0:
            cur.executemany(sql, bulk_values)
            cur = commit(conn, cur)
            bulk_values = []
        elif i + 1 == len(values[0]):
            cur.executemany(sql, bulk_values)
            _ = commit(conn, cur)
        del exec_values

    print("time:{0:8.5f}sec".format(float(time() - start_time)))

1行ずつ実行する際は
cur.execute(sql, exec_values)

で実行していましたが、bulk insertではこのように変化していることがわかります。
cur.executemany(sql, bulk_values)

もう一つの大きな違いは、実行する際に渡す引数です。一行ずつデータをセットするSQL文では

INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')

このように1つずつ値をセットしていました。MySQLのbulk insertでは1行で複数の値をセットできます。

INSERT INTO test_table(id, name, age, address) VALUES(1, 'eita', 21, ' okinawa')(2, 'tanaka', 23, 'kagoshima')(3, 'satou', 25, 'kanagawa')...

実際にこのように記述するとエラーになりますが、イメージとして持っておくといいでしょう!

速度を比較してみる

先ほど作成した50000行のデータセットを追加してテストします。毎回テーブルを削除・作成しているので条件は同じです。

1行ずつコミット-> 1000行ずつコミット->bulk insertの10回ずつ行って確認してみましょう。

def execute_db(sql):
    conn = MySQLdb.connect(**db_config)
    cur = conn.cursor()
    cur.execute(sql)
    cur.close()
    conn.commit()

# testテーブル作成
def create_table_test():
    sql = """CREATE TABLE `test_table`(`id` INT NOT NULL AUTO_INCREMENT,
                                        `name` VARCHAR(50) NOT NULL,
                                        `sex` INT NOT NULL, `zipcode` INT NOT NULL,
                                        `prefecture_code` INT NOT NULL,
                                        `address` VARCHAR(100) NOT NULL,
                                        `age` INT NOT NULL, `phone_number_1` INT NOT NULL,
                                        `phone_number_2` INT NOT NULL,
                                        PRIMARY KEY (`id`)
          )ENGINE = InnoDB;"""
    execute_db(sql)

# testテーブル削除
def drop_table_test():
    sql = "DROP TABLE `test_table`"
    execute_db(sql)

# DBに挿入するデータセット作成
def insert_db_values(df):
    values = []
    for i in range(len(df.columns)):
        values.append(list(df[df.columns[i]]))
    return values

# INSERT用のSQL文作成
def insert_sql(table_name, columns):
    records = ""
    values = ""
    for i, column in enumerate(columns):
        records += column
        values += "%s"
        if i != len(columns) - 1:
            records += ", "
            values += ", "

    return "INSERT INTO {0:s}({1:s}) VALUES({2:s})".format(table_name, records, values)

# SQLの作成
sql = insert_sql('test_table', insert_50000_df.columns)
# データをセット
values = insert_db_values(insert_50000_df)

# テーブルに挿入(各手法10回ずつ実行)
insert_method = ["1_row_and_1_commit", "1000_rows_and_1_commit", "bulk_insert(1_row(1000_values)_and_1_commit)"]
for i, method_name in enumerate(insert_method):
    print("{0:s}:".format(method_name))
    for j in range(10):
        create_table_test()
        print(" {0:d}:".format(j + 1), end = "")
        if i == 0:
            insert_db_slow(sql, values)
        elif i == 1:
            insert_db_fast(sql, values)
        else:
            bulk_insert_db(sql, values)
        drop_table_test()
    print()

出力結果
output
1_row_and_1_commit:
    01:time:15.93790sec
    02:time:17.13466sec
    03:time:16.28792sec
    04:time:16.72755sec
    05:time:16.05999sec
    06:time:16.35922sec
    07:time:16.04794sec
    08:time:17.50887sec
    09:time:17.33433sec
    10:time:16.46473sec

1000_rows_and_1_commit:
    01:time: 4.61864sec
    02:time: 4.63770sec
    03:time: 4.69085sec
    04:time: 4.62592sec
    05:time: 4.63636sec
    06:time: 4.65857sec
    07:time: 4.65147sec
    08:time: 4.65053sec
    09:time: 4.64463sec
    10:time: 4.63797sec

bulk_insert(1_row(1000_values)_and_1_commit):
    01:time: 1.10884sec
    02:time: 1.11510sec
    03:time: 1.10370sec
    04:time: 1.09741sec
    05:time: 1.11254sec
    06:time: 1.20106sec
    07:time: 1.61702sec
    08:time: 1.08873sec
    09:time: 1.60758sec
    10:time: 1.65760sec

まとめ

  • 1行ずつコミット->約16秒
  • 1000行ずつコミット->約4秒
  • bulk insert->約1秒

50000行を追加するだけで顕著な差が現れました。

さらに多くのレコードを追加する場合はより大きな差が開いてしまうため、大量のデータをDBに格納する際は工夫してみてくださいね!
IOT機器のログを管理する際に役立つかも?

時は金なり...身に沁みて感じました。

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
45