PostgreSQL 9.5 から ON CONFLICT
が導入されて Upsert (Insert or Update) ができるようになったものの、複数行まとめてやることはできなかった。
[2020.08 追記]
コメントで指摘いただいたので追記。
ON CONFLICT ... DO UPDATE
内で使える EXCLUDED
句を使えば VALUES
に複数行を指定して Bulk Upsert ができるらしい。(未検証)
[追記おわり]
ON CONFLICT
を使わなくても、CTE を使うことで以下のように Bulk Upsert することができるっぽい。
WITH
-- write the new values
n(ip,visits,clicks) AS (
VALUES ('192.168.1.1',2,12),
('192.168.1.2',6,18),
('192.168.1.3',3,4)
),
-- update existing rows
upsert AS (
UPDATE page_views o
SET visits=n.visits, clicks=n.clicks
FROM n WHERE o.ip = n.ip
RETURNING o.ip
)
-- insert missing rows
INSERT INTO page_views (ip,visits,clicks)
SELECT n.ip, n.visits, n.clicks FROM n
WHERE n.ip NOT IN (
SELECT ip FROM upsert
)
※ SQL は以下のページからの引用
Faster data updates with CartoDB — CARTO Blog
Python で Bulk Upsert
Python + Pandas + asyncpg で CSV ファイルの内容をそのまま PostgreSQL に Bulk Upsert するやつを書いてみた。
以下注意点。
- Pandas DataFrame のカラム名と PostgreSQL テーブルのカラム名が完全に一致している必要がある
- 複合ユニークキーを持つテーブルに対応させるために最後の INSERT 部分が相関サブクエリになっているので、大きなデータに対して実行するとものすごく遅い
import asyncio
import asyncpg
import pandas as pd
async def bulk_upsert(conn, df: pd.DataFrame, table: str, key_columns: list):
assert len(df) > 0
assert len(key_columns) > 0
value_columns = df.iloc[:0].drop(key_columns, axis=1).columns.to_list()
assert len(value_columns) > 0
n = len(df.columns)
statements = ','.join(['(' + ','.join(['$' + str(i * n + j + 1) for j in range(n)]) + ')' for i in range(len(df))])
query = f"""
WITH
-- write the new values
n({','.join(df.columns)}) AS (VALUES {statements}),
-- update existing rows
upsert AS (
UPDATE {table} AS o
SET {', '.join([f'{col}=n.{col}' for col in value_columns])}
FROM n
WHERE {' AND '.join([f'o.{col} = n.{col}' for col in key_columns])}
RETURNING {', '.join([f'o.{col}' for col in key_columns])}
)
-- insert missing rows
INSERT INTO {table} ({', '.join(df.columns)})
SELECT {', '.join([f'n.{col}' for col in df.columns])}
FROM n
WHERE (
SELECT COUNT(1)
FROM upsert AS u
WHERE {' AND '.join([f'u.{col} = n.{col}' for col in key_columns])}
) = 0
"""
values = sum([list(row) for row in df.values], []) # flatten
await conn.execute(query, *values)
async def main():
conn = await asyncpg.connect('postgresql://postgres@localhost/')
await bulk_upsert(conn, pd.read_csv('users.csv'), 'users', ['id'])
await conn.close()
if __name__ == '__main__':
asyncio.get_event_loop().run_until_complete(main())