Pandas DataFrame を PostgreSQL に Bulk Insert したいときは SQLAlchemy を入れて .to_sql()
を使うとできるのだけど、PostgreSQL クライアントとして psycopg2 を使っている状況だと「そのためだけに SQLAlchemy 入れたくねぇ〜」という気持ちになってしまう。
そこで「DataFrame を .to_csv()
してそれを psycopg2 の .copy_from()
で読み込めば Bulk Insert できるのではないか」と考えてやってみたらできた。
from io import StringIO
import pandas as pd
import psycopg2
def df2db(conn: psycopg2.extensions.connection, df: pd.DataFrame, table: str):
buf = StringIO()
df.to_csv(buf, sep='\t', na_rep='\\N', index=False, header=False)
buf.seek(0)
columns = [f'"{col}"' for col in df.columns]
with conn.cursor() as cur:
cur.copy_from(buf, table, columns=columns)
こういう関数を作っておいて、
例えばこんな DataFrame を PostgreSQL の logs
というテーブルに Bulk Insert したいときは、
with psycopg2.connect('postgresql://...') as conn:
df2db(conn, df.reset_index(), 'logs')
conn.commit()
これでできる。
index は出力しないようにしているので、index もテーブルに入れたい場合は .reset_index()
しておく必要がある。
[2020.08 追記] 特殊なカラム名をもつテーブルに Bulk Insert できない場合の対処方法
- 大文字の英字を含むカラム名
-
.
などの記号を含むカラム名
など、いくつかのケースでは Bulk Insert が失敗することがある。
どうやら psycopg2 側ではカラム名のエスケープなどは特に行わずに単純に ,
で連結するだけのようなので、特殊なカラム名を持つテーブルの場合はうまくいかない。
この場合は .copy_from()
に渡すカラム名を CSV フォーマットに準じた方法でエスケープすればよい。
※ 冒頭のコードはカラム名を "
で囲むように修正済み。
[2020.08 追記] 欠損値を含む整数型のデータ列を扱いたい場合
欠損値を含む整数型のデータ列は、そのままだと Pandas が float にキャストしてしまい、copy_from でエラーが出てしまう。
とはいえ int に変換しようとしても Pandas の int 型はそのままだと欠損値を扱えないためエラーになってしまう。
この場合は、Pandas v0.24.0 で追加された Nullable integer data type を利用するとうまくいく。
https://qiita.com/hoto17296/items/b6c90db4b9bcdb7b6d78
[2021.02 追記] 別の方法
COPY FROM 使わなくてもこれでいいのでは?
def bulk_insert(conn: psycopg2.extensions.connection, df: pd.DataFrame, table: str):
query = f"""
INSERT INTO {table} ({", ".join(df.columns)})
VALUES {", ".join(["(" + ", ".join(["%s"] * len(df.columns)) + ")"] * len(df))}
"""
with conn.cursor() as cur:
cur.execute(query, df.values.flatten())
この方法だと、psycopg2 が扱えないデータ型が含まれている場合に、何らかの方法で変換してやる必要はある。(numpy.ndarray
→ list
とか)
参考: psycopg2 が扱える型 https://www.psycopg.org/docs/usage.html#adaptation-of-python-values-to-sql-types