はじめに
Pandasを使っていて、pd.DataFrameをDBにINSERTする際にto_sql()メソッドでやりたいことがうまく出来なかったのが解決できたのでメモとして残す。
※この方法で数万レコードをINSERTしようとすると時間がかなりかかったので要注意
やりたいこと
pd.DataFrameをDBにINSERTする際にNotNull制約違反や主キー制約違反が発生した場合にその行をスキップしてINSERTしたい。
具体的には、以下のようなデータフレームを主キー制約がcompanyにNotNull制約がestablished_inにかけられたテーブルにto_sql()メソッドでINSERTしようとするとエラーになり全くINSERTされないが、INSERTできるレコードはINSERTして失敗したレコードはINSERTしないようにしたい。
company | established_in | founder | |
---|---|---|---|
0 | A | 1900.0 | Alice |
1 | B | 1925.0 | Bob |
2 | C | NaN | Charlie |
3 | A | 1995.0 | Ahn |
4 | D | 2010.0 | David |
ちなみに、普通に、
with engine.begin() as conn:
df_fail.to_sql('company', if_exists='append', index=False, con=conn)
してあげると、
(psycopg2.errors.NotNullViolation) null value in column "established_in" violates not-null constraint
DETAIL: Failing row contains (C, null, Charlie).
[SQL: INSERT INTO company (company, established_in, founder) VALUES (%(company)s, %(established_in)s, %(founder)s)]
[parameters: ({'company': 'A', 'established_in': 1900.0, 'founder': 'Alice'}, {'company': 'B', 'established_in': 1925.0, 'founder': 'Bob'}, {'company': 'C', 'established_in': None, 'founder': 'Charlie'}, {'company': 'A', 'established_in': 1995.0, 'founder': 'Ahn'}, {'company': 'D', 'established_in': 2010.0, 'founder': 'David'})]
(Background on this error at: http://sqlalche.me/e/14/gkpj)
となり、全くINSERTされない。
ソースコード
JupyterLabで以下を実行した。なお、DB側でcompanyには主キー制約がestablished_inにはNotNull制約がかけられている。
import numpy as np
import pandas as pd
import psycopg2
import sqlalchemy
from IPython.display import display
engine = sqlalchemy.create_engine(url='postgresql://postgres:postgres@192.168.56.101/mydb1')
df_fail = pd.DataFrame({'company': ['A', 'B', 'C', 'A', 'D'],
'established_in': [1900, 1925, np.nan, 1995, 2010],
'founder': ['Alice', 'Bob', 'Charlie', 'Ahn', 'David' ]})
df_fail
# 主キー制約違反・NotNull制約違反
company | established_in | founder | |
---|---|---|---|
0 | A | 1900.0 | Alice |
1 | B | 1925.0 | Bob |
2 | C | NaN | Charlie |
3 | A | 1995.0 | Ahn |
4 | D | 2010.0 | David |
for row in range(len(df_fail)):
record = df_fail.iloc[[row]]
try:
with engine.begin() as conn:
record.to_sql('company', if_exists='append', index=False, con=conn)
print('Pass')
except sqlalchemy.exc.IntegrityError as e:
print('IntegrityError')
print(e)
except Exception as e:
print('Error')
print(e)
finally:
display(record)
print('\n\n')
Pass
company | established_in | founder | |
---|---|---|---|
0 | A | 1900.0 | Alice |
Pass
company | established_in | founder | |
---|---|---|---|
1 | B | 1925.0 | Bob |
IntegrityError
(psycopg2.errors.NotNullViolation) null value in column "established_in" violates not-null constraint
DETAIL: Failing row contains (C, null, Charlie).
[SQL: INSERT INTO company (company, established_in, founder) VALUES (%(company)s, %(established_in)s, %(founder)s)]
[parameters: {'company': 'C', 'established_in': None, 'founder': 'Charlie'}]
(Background on this error at: http://sqlalche.me/e/14/gkpj)
company | established_in | founder | |
---|---|---|---|
2 | C | NaN | Charlie |
IntegrityError
(psycopg2.errors.UniqueViolation) duplicate key value violates unique constraint "constraint_pkey"
DETAIL: Key (company)=(A) already exists.
[SQL: INSERT INTO company (company, established_in, founder) VALUES (%(company)s, %(established_in)s, %(founder)s)]
[parameters: {'company': 'A', 'established_in': 1995.0, 'founder': 'Ahn'}]
(Background on this error at: http://sqlalche.me/e/14/gkpj)
company | established_in | founder | |
---|---|---|---|
3 | A | 1995.0 | Ahn |
Pass
company | established_in | founder | |
---|---|---|---|
4 | D | 2010.0 | David |
with engine.begin() as conn:
data_on_db = pd.read_sql('SELECT * FROM company;', con=conn)
data_on_db
company | established_in | founder | |
---|---|---|---|
0 | A | 1900 | Alice |
1 | B | 1925 | Bob |
2 | D | 2010 | David |
確かに例外処理を使ってINSERTできるものだけINSERTできた(ただし、主キー制約違反の場合上の行からINSERTされてしまう点に注意)。
おわりに
失敗した行だけ集めたpd.DataFrameを作っておけば、どこがINSERTされなかったのかわかるので便利なはず。