Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
6
Help us understand the problem. What are the problem?
@hoto17296

psycopg2 で Pandas DataFrame を Bulk Insert する

Pandas DataFrame を PostgreSQL に Bulk Insert したいときは SQLAlchemy を入れて .to_sql() を使うとできるのだけど、PostgreSQL クライアントとして psycopg2 を使っている状況だと「そのためだけに SQLAlchemy 入れたくねぇ〜」という気持ちになってしまう。

そこで「DataFrame を .to_csv() してそれを psycopg2 の .copy_from() で読み込めば Bulk Insert できるのではないか」と考えてやってみたらできた。

from io import StringIO
import pandas as pd
import psycopg2

def df2db(conn: psycopg2.extensions.connection, df: pd.DataFrame, table: str):
    buf = StringIO()
    df.to_csv(buf, sep='\t', na_rep='\\N', index=False, header=False)
    buf.seek(0)
    columns = [f'"{col}"' for col in df.columns]
    with conn.cursor() as cur:
        cur.copy_from(buf, table, columns=columns)
    conn.commit()

こういう関数を作っておいて、

image.png

例えばこんな DataFrame を PostgreSQL の logs というテーブルに Bulk Insert したいときは、

with psycopg2.connect('postgresql://...') as conn:
    df2db(conn, df.reset_index(), 'logs')

これでできる。

index は出力しないようにしているので、index もテーブルに入れたい場合は .reset_index() しておく必要がある。

[2020.08 追記] 特殊なカラム名をもつテーブルに Bulk Insert できない場合の対処方法

  • 大文字の英字を含むカラム名
  • . などの記号を含むカラム名

など、いくつかのケースでは Bulk Insert が失敗することがある。

どうやら psycopg2 側ではカラム名のエスケープなどは特に行わずに単純に , で連結するだけのようなので、特殊なカラム名を持つテーブルの場合はうまくいかない。

この場合は .copy_from() に渡すカラム名を CSV フォーマットに準じた方法でエスケープすればよい。
※ 冒頭のコードはカラム名を " で囲むように修正済み。

[2020.08 追記] 欠損値を含む整数型のデータ列を扱いたい場合

欠損値を含む整数型のデータ列は、そのままだと Pandas が float にキャストしてしまい、copy_from でエラーが出てしまう。
とはいえ int に変換しようとしても Pandas の int 型はそのままだと欠損値を扱えないためエラーになってしまう。

この場合は、Pandas v0.24.0 で追加された Nullable integer data type を利用するとうまくいく。
https://qiita.com/hoto17296/items/b6c90db4b9bcdb7b6d78

[2021.02 追記] 別の方法

COPY FROM 使わなくてもこれでいいのでは?

def bulk_insert(conn: psycopg2.extensions.connection, df: pd.DataFrame, table: str):
    query = f"""
        INSERT INTO {table} ({", ".join(df.columns)})
        VALUES {", ".join(["(" + ", ".join(["%s"] * len(df.columns)) + ")"] * len(df))}
        """
    with conn.cursor() as cur:
        cur.execute(query, df.values.flatten())
    conn.commit()

この方法だと、psycopg2 が扱えないデータ型が含まれている場合に、何らかの方法で変換してやる必要はある。(numpy.ndarraylist とか)

参考: psycopg2 が扱える型 https://www.psycopg.org/docs/usage.html#adaptation-of-python-values-to-sql-types

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
6
Help us understand the problem. What are the problem?