psycopg2 でよくやる操作まとめ - Qiita がじわじわ LGTM が増える人気記事になっているが、最近は aiohttp Server との組み合わせで asyncpg の方をよく使うようになってきたので、こちらについてもまとめる。
しかし公式ドキュメントが普通にわかりやすいし分量も多くないので全部読んでしまうのが早いかもしれない。
asyncpg — asyncpg Documentation
asyncpg 概要
- Python から PostgreSQL にアクセスするためのモジュール
- asyncio を活用した非同期処理を行うため Python 3.5 以上でないと動かない
- DB-API の仕様 には準拠していない (そもそも DB-API に規定されている仕様は同期的な API のため)
基本的な使い方
インストールする
$ pip install asyncpg
接続する
import asyncpg
dsn = "postgresql://username:password@hostname:5432/database"
conn = await asyncpg.connect(dsn)
# ...
await conn.close()
conn
は asyncpg.connection.Connection
オブジェクトとして得られる。
コネクションプールを作成する
import asyncpg
dsn = "postgresql://username:password@hostname:5432/database"
async with asyncpg.create_pool(dsn) as pool:
await pool.execute("...")
pool
は asyncpg.pool.Pool
オブジェクトとして得られる。
プールからコネクションを取り出して使うこともできるが、プールに対して直接 execute
などのメソッドを実行することもできる。
クエリを実行する
asyncpg では cursor を作成せずとも直接 conn.execute
でクエリを実行できる。
await conn.execute("INSERT INTO users(name) VALUES($1)", "foo")
クエリ内に $数字
と書いておいてクエリ実行時の第2引数以降に値を指定すると、クエリに値を埋め込むことができる。 datetime
型など、一部の型のオブジェクトはそのまま指定してもよしなに内部で変換してクエリに埋め込んでくれる。どの型をどう変換するかは公式ドキュメントに載っており、自分でカスタマイズすることもできる。
データを取得する
execute
の代わりに fetch
などのメソッドでクエリを実行すると結果を取得できる。
await conn.fetch("SELECT * FROM users") #=> [<Record id='1' name='foo'>, <Record id='2' name='bar'>]
結果は asyncpg.Record
オブジェクトのリストとして得られる。
最初の1行目だけを取得する fetchrow
や、1行1列目の値だけを取得する fetchval
も便利。
await conn.fetchrow("SELECT * FROM users WHERE id = $1", "1") #=> <Record id='1' name='foo'>
await conn.fetchval("SELECT COUNT(1) FROM users") #=> 2
Record オブジェクトについて
Record オブジェクトは tuple のようにも dict のようにも振る舞うので、わざわざ他のデータ型に変換する必要はあまりなさそう。
record = await conn.fetchrow("SELECT * FROM users WHERE id = $1", "1")
record[1] #=> インデックスでアクセス
record["name"] #=> キーでアクセス
トランザクションを使う
トランザクションを使う場合は非同期コンテキストマネージャ (async with) ブロック内に処理を書く。
async with conn.transaction():
await conn.execute("...")
async with を使ってトランザクションを開始した場合は、ブロックが終了した時点で自動で commit されるため、明示的に commit する必要はない。
また、コネクションプーリングをしている場合は明示的にコネクションを取得してトランザクションを開始する。
async with pool.acquire() as conn:
async with conn.transaction():
await conn.execute("...")
タイムアウトを設定する
コネクション作成時に command_timeout
に秒数を指定すると、クエリのデフォルトタイムアウトを設定できる。
conn = await asyncpg.connect(dsn, command_timeout=60)
クエリ実行のたびに個別にタイムアウトを設定する場合は execute
や fetch
の引数で timeout
を指定する。
await conn.execute("...", timeout=60)
便利 Tips
接続が失敗した際にリトライする
アプリの開発環境を Docker で動かしていて、DB とアプリを同時に起動すると DB が起動しきる前にアプリが接続しにいってエラーになることがある。これを回避するために DB が起動するまで何度か再試行すればよい。
from asyncio import sleep
import asyncpg
async def connect(*args, **kwargs):
e = None
for i in range(10):
try:
await sleep(i)
return await asyncpg.create_pool(*args, **kwargs)
except ConnectionError as _e:
e = _e
raise e
取得したデータを Pandas DataFrame にする
fetch の結果は Record オブジェクトのリストなので、それをそのまま pd.DataFrame()
に入れてしまってもなんとかなる。
import pandas as pd
records = await conn.fetch("SELECT * FROM users")
if len(records) > 0:
df = pd.DataFrame(records, columns=list(records[0].keys()))
else:
# 結果が0件だった場合の処理
Pandas DataFrame の内容を INSERT する
asyncpg は COPY 系のメソッドが豊富で、タプルのリストを COPY コマンドで Bulk Insert してくれる copy_records_to_table
を使えば Pandas DataFrame も簡単に INSERT できる。
await conn.copy_records_to_table("users", records=df.itertuples(), columns=df.columns)
CSV ファイルを INSERT する
同様に copy_to_table
を使えば CSV フォーマットなどのファイルから簡単に INSERT できる。
await conn.copy_to_table("users", source="users.csv", format="csv")
指定したオプションは多くが COPY クエリにそのまま流れるので、COPY クエリの仕様を理解すれば使いこなせる。(format, null, header など)
PostgreSQL: Documentation: COPY
JSON 型を自動で変換する
json
jsonb
型のデータは Python 側では文字列として扱われるため都度変換する必要があるが、 Connection.set_type_codec
を使うとこれを自動で行うことができる。
import json
conn = await asyncpg.connect(dsn)
await conn.set_type_codec("jsonb", schema="pg_catalog", encoder=json.dumps, decoder=json.loads)
コネクションプールを使う場合は作成時に init
オプションで指定する。
import json
async def init_connection(conn):
await conn.set_type_codec("jsonb", schema="pg_catalog", encoder=json.dumps, decoder=json.loads)
pool = await asyncpg.create_pool(dsn, init=init_connection)