16
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

psycopg2 で Pandas DataFrame を Bulk Insert する

Last updated at Posted at 2020-03-28

Pandas DataFrame を PostgreSQL に Bulk Insert したいときは SQLAlchemy を入れて .to_sql() を使うとできるのだけど、PostgreSQL クライアントとして psycopg2 を使っている状況だと「そのためだけに SQLAlchemy 入れたくねぇ〜」という気持ちになってしまう。

そこで「DataFrame を .to_csv() してそれを psycopg2 の .copy_from() で読み込めば Bulk Insert できるのではないか」と考えてやってみたらできた。

from io import StringIO
import pandas as pd
import psycopg2

def df2db(conn: psycopg2.extensions.connection, df: pd.DataFrame, table: str):
    buf = StringIO()
    df.to_csv(buf, sep='\t', na_rep='\\N', index=False, header=False)
    buf.seek(0)
    columns = [f'"{col}"' for col in df.columns]
    with conn.cursor() as cur:
        cur.copy_from(buf, table, columns=columns)

こういう関数を作っておいて、

image.png

例えばこんな DataFrame を PostgreSQL の logs というテーブルに Bulk Insert したいときは、

with psycopg2.connect('postgresql://...') as conn:
    df2db(conn, df.reset_index(), 'logs')
    conn.commit()

これでできる。

index は出力しないようにしているので、index もテーブルに入れたい場合は .reset_index() しておく必要がある。

[2020.08 追記] 特殊なカラム名をもつテーブルに Bulk Insert できない場合の対処方法

  • 大文字の英字を含むカラム名
  • . などの記号を含むカラム名

など、いくつかのケースでは Bulk Insert が失敗することがある。

どうやら psycopg2 側ではカラム名のエスケープなどは特に行わずに単純に , で連結するだけのようなので、特殊なカラム名を持つテーブルの場合はうまくいかない。

この場合は .copy_from() に渡すカラム名を CSV フォーマットに準じた方法でエスケープすればよい。
※ 冒頭のコードはカラム名を " で囲むように修正済み。

[2020.08 追記] 欠損値を含む整数型のデータ列を扱いたい場合

欠損値を含む整数型のデータ列は、そのままだと Pandas が float にキャストしてしまい、copy_from でエラーが出てしまう。
とはいえ int に変換しようとしても Pandas の int 型はそのままだと欠損値を扱えないためエラーになってしまう。

この場合は、Pandas v0.24.0 で追加された Nullable integer data type を利用するとうまくいく。
https://qiita.com/hoto17296/items/b6c90db4b9bcdb7b6d78

[2021.02 追記] 別の方法

COPY FROM 使わなくてもこれでいいのでは?

def bulk_insert(conn: psycopg2.extensions.connection, df: pd.DataFrame, table: str):
    query = f"""
        INSERT INTO {table} ({", ".join(df.columns)})
        VALUES {", ".join(["(" + ", ".join(["%s"] * len(df.columns)) + ")"] * len(df))}
        """
    with conn.cursor() as cur:
        cur.execute(query, df.values.flatten())

この方法だと、psycopg2 が扱えないデータ型が含まれている場合に、何らかの方法で変換してやる必要はある。(numpy.ndarraylist とか)

参考: psycopg2 が扱える型 https://www.psycopg.org/docs/usage.html#adaptation-of-python-values-to-sql-types

16
10
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
10

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?