はじめに
システム構築を行う際、旧システムからのデータ移行や他システム連携により大量データを投入する場合が多々あると思います。
得てして、このような場合はCSVファイルにてやり取りが行われるパターンとなります。
本記事では、CSVファイルによる大量データ投入における2パターンを解説します。
紹介パターン
-
バルクインサート
通常インサートでは1件ずつ投入しますが、バルクインサートでは複数件同時に投入することが可能です。
INSERT文の場合、 INSERT INTO ... VALUES (...), (...), ... のよう記載することが可能ですが、以下のデメリットがあります。- クエリサイズが1GBを超えるとエラーとなる
- クエリ転送のオーバーヘッドがある
INSERTとは別にPostgreSQLはCOPYコマンドがあり、こちらの方が高速に処理が可能です。
psqlではファイル指定で処理できますが、psql以外でCOPYコマンドを使用する場合はSTDINを使用する必要があります。
Pythonの場合、psycopg2モジュールのcopy_from関数で簡単にCOPYコマンドを使用することが可能です。 -
FDWを用いたインサート
データベースサーバーの操作が可能な場合、FDW(Foreign Data Wrapper)を用いることで高速に処理が可能となります。
CSVファイルをPostgreSQLで扱う場合はfile_fdwを使用します。
(本記事ではfile_fdwの扱い方は解説しません。)
クエリはINSERT INTO ... SELECT * FROM ...
で済むため、アプリ側の処理はほぼありません。
バルクインサートと違いCSVファイルのパースはPostgreSQL側で処理するため、すべての処理がPostgreSQL内で完結します。
検証
検証条件
テストデータとして100,000レコードあるCSVファイルを使用して1レコード、10レコード、100レコード、1,000レコード、10,000レコード、100,000レコード、1,000,000レコード登録時の時間計測を5回実施しました。
column1,column2,column3,column4,column5,column6,column7,column8,column9,column10
test1_1,test2_1,test3_1,test4_1,test5_1,test6_1,test7_1,test8_1,test9_1,test10_1
test1_2,test2_2,test3_2,test4_2,test5_2,test6_2,test7_2,test8_2,test9_2,test10_2
test1_3,test2_3,test3_3,test4_3,test5_3,test6_3,test7_3,test8_3,test9_3,test10_3
~省略~
test1_100000,test2_100000,test3_100000,test4_100000,test5_100000,test6_100000,test7_100000,test8_100000,test9_100000,test10_100000
テストコード
import re
import io
import csv
import psycopg2
from psycopg2.extras import DictCursor, DictRow
from typing import Union, Optional
from time import perf_counter, sleep
class DBM:
def __init__(self):
self.conn = psycopg2.connect(
f"dbname=<DB名>"
+ f" port=<ポート番号>"
+ f" host=<DBホスト名>"
+ f" user=<ユーザー名>"
+ f" password=<パスワード>"
)
def begin(self) -> None:
"""トランザクション開始"""
self.conn.set_session(autocommit=False)
def commit(self) -> None:
"""コミット"""
if not self.conn.autocommit:
self.conn.commit()
self.conn.set_session(autocommit=True)
def rollback(self) -> None:
"""ロールバック"""
if not self.conn.autocommit:
self.conn.rollback()
self.conn.set_session(autocommit=True)
def get_query_result(
self,
sql: str,
param: Union[Optional[dict], Optional[list], Optional[tuple]] = None,
) -> list:
"""
検索結果取得
引数
----
[str] sql: SQL文
[dict/list/tuple] param: パラメーター
戻り値
----
[list] 検索結果
"""
res = []
try:
with self.conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql, param)
tmp = cur.fetchall()
if type(tmp) is list:
for row in tmp:
res.append(dict(row))
except psycopg2.Error as e:
raise Exception(code=self.DB_ERROR_CODE, param=param, error=e)
return res
def execute(
self,
sql: str,
param: Union[Optional[dict], Optional[list], Optional[tuple]] = None,
) -> int:
"""
更新SQL実行
引数
----
[str] sql: SQL文
[dict/list/tuple] param: パラメーター
戻り値
----
[int] 更新件数
"""
res = 0
with self.conn.cursor(cursor_factory=DictCursor) as cur:
cur.execute(sql, param)
res = cur.rowcount
return res
def bulk_insert(
self,
table_name: str,
column_list: list,
data_list: list,
) -> None:
"""
データの一括登録
引数
----
[str] table_name: テーブル名
[list] column_list: カラム一覧
[list] data_list: 登録データ(dict内包)
"""
with io.StringIO() as f:
for data in data_list:
tmp_data = []
for column in column_list:
tmp = data[column]
if tmp is None:
# Nullの場合
tmp = "\\N"
else:
# データ内のカンマをエスケープ
tmp = str(tmp)
tmp = tmp.replace(",", "\\,")
tmp_data.append(tmp)
tmp = ",".join(tmp_data)
f.write(f"{tmp}\n")
with self.conn.cursor() as cur:
# StringIOのカーソル位置を先頭に戻す
f.seek(0)
cur.copy_from(
f, # type: ignore
table_name,
sep=",",
null="\\N",
columns=column_list,
)
test = DBM()
test.begin()
drop_str = "drop table if exists test_table cascade"
create_str = (
"create table test_table("
+ "column1 text"
+ ", column2 text"
+ ", column3 text"
+ ", column4 text"
+ ", column5 text"
+ ", column6 text"
+ ", column7 text"
+ ", column8 text"
+ ", column9 text"
+ ", column10 text"
+ ")"
)
# テストデータロード
with open("test.csv") as f:
reader = csv.DictReader(f)
file_datas = [row for row in reader]
# INSERTでのデータ挿入
test.execute(drop_str)
test.execute(create_str)
# テスト実行
start = perf_counter()
for file_data in file_datas:
test.execute(
"insert into test_table values ("
+ "%(p1)s, %(p2)s, %(p3)s, %(p4)s, %(p5)s"
+ ", %(p6)s, %(p7)s, %(p8)s, %(p9)s, %(p10)s)",
{
"p1": file_data["column1"],
"p2": file_data["column2"],
"p3": file_data["column3"],
"p4": file_data["column4"],
"p5": file_data["column5"],
"p6": file_data["column6"],
"p7": file_data["column7"],
"p8": file_data["column8"],
"p9": file_data["column9"],
"p10": file_data["column10"],
},
)
end = perf_counter()
print(f"INSERTでのデータ挿入時間: {(end - start):.5f}秒")
test.commit()
sleep(10)
# バルクインサートでのデータ挿入
test.begin()
test.execute(drop_str)
test.execute(create_str)
start = perf_counter()
test.bulk_insert(
"test_table",
[
"column1",
"column2",
"column3",
"column4",
"column5",
"column6",
"column7",
"column8",
"column9",
"column10",
],
file_datas,
)
end = perf_counter()
print(f"バルクインサートでのデータ挿入時間: {(end - start):.5f}秒")
test.commit()
sleep(10)
# FWDでのデータ挿入
# 事前にfile_fdwによりCSVファイルを読み込む外部テーブルの作成を実施する
# create server csv_file foreign data wrapper file_fdw;
# create foreign table test_csv (column1 text,column2 text,column3 text,column4 text,column5 text,column6 text,column7 text,column8 text,column9 text,column10 text) server csv_file options( filename '/tmp/test.csv', format 'csv', header 'true', encoding 'utf8');
# alter table test_csv owner to <ユーザー名>;
test.begin()
test.execute(drop_str)
test.execute(create_str)
start = perf_counter()
test.execute("insert into test_table select * from test_csv")
end = perf_counter()
print(f"FWDでのデータ挿入時間: {(end - start):.5f}秒")
test.commit()
test.execute(drop_str)
検証結果
計測結果
-
INSERT
件数 1回目 2回目 3回目 4回目 5回目 平均 1レコード 0.00147秒 0.00133秒 0.00155秒 0.00154秒 0.00117秒 0.00141秒 10レコード 0.01085秒 0.00997秒 0.01074秒 0.01116秒 0.01076秒 0.01070秒 100レコード 0.09909秒 0.10325秒 0.10495秒 0.11332秒 0.09889秒 0.10390秒 1,000レコード 1.02309秒 0.75127秒 0.86947秒 1.06752秒 1.03721秒 0.94971秒 10,000レコード 8.92535秒 10.18288秒 10.23660秒 9.51159秒 9.60280秒 9.69184秒 100,000レコード 96.71324秒 105.79340秒 90.64923秒 93.89740秒 96.16214秒 96.64308秒 1,000,000レコード 930.22842秒 828.11898秒 996.06900秒 923.54200秒 927.10096秒 921.01187秒 -
バルクインサート
件数 1回目 2回目 3回目 4回目 5回目 平均 1レコード 0.00205秒 0.00194秒 0.00213秒 0.00209秒 0.00179秒 0.00200秒 10レコード 0.00218秒 0.00226秒 0.00166秒 0.00202秒 0.00213秒 0.00205秒 100レコード 0.00305秒 0.00345秒 0.00310秒 0.00316秒 0.00326秒 0.00320秒 1,000レコード 0.00884秒 0.00886秒 0.00844秒 0.01314秒 0.01372秒 0.01060秒 10,000レコード 0.09553秒 0.09440秒 0.09428秒 0.08882秒 0.08849秒 0.09230秒 100,000レコード 0.74876秒 0.84313秒 0.90050秒 0.78405秒 0.97011秒 0.84931秒 1,000,000レコード 7.47400秒 8.08532秒 7.70984秒 11.09723秒 7.74149秒 8.42158秒 -
FDWを用いたインサート
件数 1回目 2回目 3回目 4回目 5回目 平均 1レコード 0.00215秒 0.00215秒 0.00209秒 0.00235秒 0.00210秒 0.00217秒 10レコード 0.00204秒 0.00269秒 0.00183秒 0.00244秒 0.00218秒 0.00224秒 100レコード 0.00323秒 0.00328秒 0.00266秒 0.00247秒 0.00255秒 0.00284秒 1,000レコード 0.00476秒 0.00475秒 0.00478秒 0.00501秒 0.00500秒 0.00486秒 10,000レコード 0.03052秒 0.03038秒 0.03020秒 0.03069秒 0.02992秒 0.03034秒 100,000レコード 0.28117秒 0.36588秒 0.28077秒 0.28097秒 0.29127秒 0.30001秒 1,000,000レコード 5.22589秒 5.12838秒 5.10691秒 5.43964秒 5.36728秒 5.25362秒
- INSERTは10,000レコードから約10秒かかり、100,000レコードから実用に耐えない。
- バルクインサートとFDWを用いたインサートは1,000,000件でも10秒を切り、非常に高速に処理されている。
- 今回の検証ではCSV読み込みの時間計測は行っていないが、FDWを用いたインサートではCSV読み込みオーバーヘッドが含まれる。
そのため、登録にかかる時間は計測時間より短くなると想定される。
まとめ
1レコードごとのINSERTで大量データ処理をせず、バルクインサートかFDWを用いたインサートを検討するとよいでしょう。
データベースサーバーへの操作権があるのであればFDWを、なければバルクインサートを用いることをお勧めします。