LoginSignup
17
12

More than 3 years have passed since last update.

PostgreSQL で Bulk Upsert

Last updated at Posted at 2018-11-07

PostgreSQL 9.5 から ON CONFLICT が導入されて Upsert (Insert or Update) ができるようになったものの、複数行まとめてやることはできなかった。

[2020.08 追記]
コメントで指摘いただいたので追記。
ON CONFLICT ... DO UPDATE 内で使える EXCLUDED 句を使えば VALUES に複数行を指定して Bulk Upsert ができるらしい。(未検証)
[追記おわり]

ON CONFLICT を使わなくても、CTE を使うことで以下のように Bulk Upsert することができるっぽい。

WITH
-- write the new values
n(ip,visits,clicks) AS (
  VALUES ('192.168.1.1',2,12),
         ('192.168.1.2',6,18),
         ('192.168.1.3',3,4)
),
-- update existing rows
upsert AS (
  UPDATE page_views o
  SET visits=n.visits, clicks=n.clicks
  FROM n WHERE o.ip = n.ip
  RETURNING o.ip
)
-- insert missing rows
INSERT INTO page_views (ip,visits,clicks)
SELECT n.ip, n.visits, n.clicks FROM n
WHERE n.ip NOT IN (
  SELECT ip FROM upsert
)

※ SQL は以下のページからの引用
Faster data updates with CartoDB — CARTO Blog

Python で Bulk Upsert

Python + Pandas + asyncpg で CSV ファイルの内容をそのまま PostgreSQL に Bulk Upsert するやつを書いてみた。

以下注意点。

  • Pandas DataFrame のカラム名と PostgreSQL テーブルのカラム名が完全に一致している必要がある
  • 複合ユニークキーを持つテーブルに対応させるために最後の INSERT 部分が相関サブクエリになっているので、大きなデータに対して実行するとものすごく遅い
import asyncio
import asyncpg
import pandas as pd


async def bulk_upsert(conn, df: pd.DataFrame, table: str, key_columns: list):
    assert len(df) > 0
    assert len(key_columns) > 0
    value_columns = df.iloc[:0].drop(key_columns, axis=1).columns.to_list()
    assert len(value_columns) > 0
    n = len(df.columns)
    statements = ','.join(['(' + ','.join(['$' + str(i * n + j + 1) for j in range(n)]) + ')' for i in range(len(df))])
    query = f"""
        WITH
        -- write the new values
        n({','.join(df.columns)}) AS (VALUES {statements}),
        -- update existing rows
        upsert AS (
          UPDATE {table} AS o
          SET {', '.join([f'{col}=n.{col}' for col in value_columns])}
          FROM n
          WHERE {' AND '.join([f'o.{col} = n.{col}' for col in key_columns])}
          RETURNING {', '.join([f'o.{col}' for col in key_columns])}
        )
        -- insert missing rows
        INSERT INTO {table} ({', '.join(df.columns)})
        SELECT {', '.join([f'n.{col}' for col in df.columns])}
        FROM n
        WHERE (
          SELECT COUNT(1)
          FROM upsert AS u
          WHERE {' AND '.join([f'u.{col} = n.{col}' for col in key_columns])}
        ) = 0
        """
    values = sum([list(row) for row in df.values], [])  # flatten
    await conn.execute(query, *values)


async def main():
    conn = await asyncpg.connect('postgresql://postgres@localhost/')
    await bulk_upsert(conn, pd.read_csv('users.csv'), 'users', ['id'])
    await conn.close()


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(main())
17
12
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
12