to_sql()
はDataFrame
をデータベースに登録するために使われますが、トランザクションを実装するためには少し工夫が必要です。
本稿では、ORMモジュールであるSQLAlchemyを使ってto_sql()
でトランザクションを実装する方法を紹介します。
前提
- Python : 3.6
- sqlalchemy : 1.3.3
- psycopg2-binary : 2.8.2
- postgreSQL : 10.4
DB接続
DB接続情報を記載したdb_config.ini
ファイルを作成し、実行スクリプトディレクトリに格納します。
db_config.ini
[db]
database = postgresql
host = localhost
port = 5432
dbname = ********
schema = ********
user = ********
password = ********
engineオブジェクトを生成します。
from sqlalchemy import create_engine
database, user, password, host, dbname = \
self.db_conf.get('db', 'database'), \
self.db_conf.get('db', 'user'), \
self.db_conf.get('db', 'password'), \
self.db_conf.get('db', 'host'), \
self.db_conf.get('db', 'dbname')
url = f'{database}://{user}:{password}@{host}/{dbname}'
engine = create_engine(url, use_batch_mode=True)
engine.connect().connection
トランザクション実装
engine.begin()
によりトランザクションを実装できます。
with
内の処理に成功した場合はautocommit
されますが、途中でException
を吐いた場合はrollback
されます。
from sqlalchemy.exc import IntegrityError
# DBへの登録処理
# 処理に失敗した場合はロールバックされる
try:
with engine.begin() as conn:
# DBへの登録処理①
dataframe_1.to_sql(
'tbl_1', conn, if_exists='append', index=False
)
logging.info(f'inserted {len(dataframe_1)} rows to tbl_1.')
# DBへの登録処理②
dataframe_2.to_sql(
'tbl_2', conn, if_exists='append', index=False
)
logging.info(f'inserted {len(dataframe_2)} rows to tbl_2.')
except IntegrityError as e:
logging.error(f'{e}')
logging.error('failed to insert.')
logging.error('executed rollback.')
sys.exit(1)