今回、バッチ処理には python、databaseには postgres を使用しますが、
大量データの insert , update , upsert (update or insert)を行う為、
psycopg2.extras for python3 を使用し、bulk sql を用意します。
srcは以降に記載していますので、詳細は、そちらをご覧ください。
insert , update , upsert のいずれの場合も、処理対象のレコード群を渡すと、
まず、bulk 処理する単位(chunk)にレコードを分割し、
その後、sqlを実行します。
参考させて頂いたurl
mysqlでも bulk処理を実装できますが、
今回のpostgresの場合、以下を参考にさせて頂き、これを python3化しています。
処理対象のレコード群分割 (chunk化)
# for bulk insert
def divide_rows(self, org_rows, chunk_size, atri_keys):
i = 0
chunk = []
ret_rows = []
for org_row in org_rows:
new_tuple = ()
for atri_key in atri_keys:
new_tuple += (org_row[atri_key],)
chunk.append( new_tuple )
if len(chunk) >= chunk_size:
ret_rows.append(chunk)
chunk = []
i += 1
if len(chunk) > 0:
ret_rows.append(chunk)
return ret_rows
bulk insert
from psycopg2 import extras # for bulk insert
def bulk_ insert(self, tbl_name, atri_keys, rows):
bulk_insert_size = self.get_conf()["common"]["bulk_insert_size"]
row_groups = self.divide_rows(rows, bulk_insert_size, atri_keys )
sql = "INSERT INTO %s (%s) VALUES %s" % (tbl_name,
",".join(atri_keys),"%s")
db_conn = self.db_connect()
with self.db_cursor(db_conn) as db_cur:
for row_group in row_groups:
try:
# bulk insert
extras.execute_values(db_cur,sql,row_group)
except Exception as e:
logger.error(e)
logger.error(sql)
logger.error(row_group)
return False
db_conn.commit()
return True
bulk update
# bulk update or insert
def bulk_update(self, tbl_name, pkeys, atri_keys, rows):
bulk_insert_size = self.get_conf()["common"]["bulk_insert_size"]
row_groups = self.divide_rows(rows, bulk_insert_size, atri_keys )
sql = """
UPDATE {0}
SET {1}
FROM ( VALUES {2}) AS data_tbl({3})
WHERE {4}
"""
set_key_vals = []
for atri_key in atri_keys:
set_key_vals.append("%s=data_tbl.%s" % (atri_key,atri_key) )
where_conds = []
for pkey in pkeys:
where_conds.append("%s.%s=data_tbl.%s" % (tbl_name,pkey,pkey))
set_key_vals_str = ",".join( set_key_vals )
atri_key_str = ",".join( atri_keys )
where_conds_str = " AND ".join(where_conds)
sql = sql.format( tbl_name,
set_key_vals_str,
"%s",
atri_key_str,
where_conds_str )
db_conn = self.db_connect()
with self.db_cursor(db_conn) as db_cur:
for row_group in row_groups:
try:
# bulk upsert
extras.execute_values(db_cur,sql, row_group )
except Exception as e:
logger.error(e)
logger.error(sql)
logger.error(row_group)
return False
db_conn.commit()
return True
bulk upsert ( update or insert )
def bulk_update(self, tbl_name, pkeys, atri_keys, rows):
bulk_insert_size = self.get_conf()["common"]["bulk_insert_size"]
row_groups = self.divide_rows(rows, bulk_insert_size, atri_keys )
sql = """
UPDATE {0}
SET {1}
FROM ( VALUES {2}) AS data_tbl({3})
WHERE {4}
"""
set_key_vals = []
for atri_key in atri_keys:
set_key_vals.append("%s=data_tbl.%s" % (atri_key,atri_key) )
where_conds = []
for pkey in pkeys:
where_conds.append("%s.%s=data_tbl.%s" % (tbl_name,pkey,pkey))
set_key_vals_str = ",".join( set_key_vals )
atri_key_str = ",".join( atri_keys )
where_conds_str = " AND ".join(where_conds)
sql = sql.format( tbl_name,
set_key_vals_str,
"%s",
atri_key_str,
where_conds_str )
db_conn = self.db_connect()
with self.db_cursor(db_conn) as db_cur:
for row_group in row_groups:
try:
# bulk upsert
extras.execute_values(db_cur,sql, row_group )
except Exception as e:
logger.error(e)
logger.error(sql)
logger.error(row_group)
return False
db_conn.commit()
return True
その他 - DB( postgres )接続
import psycopg2
import psycopg2.extras
def db_connect(self):
global db_conn
if db_conn:
return db_conn
db_conn = psycopg2.connect(
database = conf["db"]["db_name"],
user = conf["db"]["db_user"],
password = conf["db"]["db_pass"],
host = conf["db"]["db_host"],
port = conf["db"]["db_port"] )
return db_conn
def db_cursor(self,db_conn):
return db_conn.cursor(cursor_factory=psycopg2.extras.DictCursor)
使い方 - bulk upsertの場合
util_db = Db()
util_db.bulk_upsert(
self.tbl_name_header()+"_sales_count_by_city_price",
["pref","city","price","calc_date"],
["pref","city","price","calc_date",
"discuss_count", "discuss_days",
"onsale_count", "onsale_days",
"sold_count"],
["discuss_count", "discuss_days",
"onsale_count", "onsale_days",
"sold_count"],
ret_datas )