21
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

asyncpg でよくやる操作まとめ

Last updated at Posted at 2020-08-30

psycopg2 でよくやる操作まとめ - Qiita がじわじわ LGTM が増える人気記事になっているが、最近は aiohttp Server との組み合わせで asyncpg の方をよく使うようになってきたので、こちらについてもまとめる。

しかし公式ドキュメントが普通にわかりやすいし分量も多くないので全部読んでしまうのが早いかもしれない。
asyncpg — asyncpg Documentation

asyncpg 概要

  • Python から PostgreSQL にアクセスするためのモジュール
  • asyncio を活用した非同期処理を行うため Python 3.5 以上でないと動かない
  • DB-API の仕様 には準拠していない (そもそも DB-API に規定されている仕様は同期的な API のため)

基本的な使い方

インストールする

$ pip install asyncpg

接続する

import asyncpg

dsn = "postgresql://username:password@hostname:5432/database"
conn = await asyncpg.connect(dsn)

# ...

await conn.close()

connasyncpg.connection.Connection オブジェクトとして得られる。

コネクションプールを作成する

import asyncpg

dsn = "postgresql://username:password@hostname:5432/database"
async with asyncpg.create_pool(dsn) as pool:
    await pool.execute("...")

poolasyncpg.pool.Pool オブジェクトとして得られる。

プールからコネクションを取り出して使うこともできるが、プールに対して直接 execute などのメソッドを実行することもできる。

クエリを実行する

asyncpg では cursor を作成せずとも直接 conn.execute でクエリを実行できる。

await conn.execute("INSERT INTO users(name) VALUES($1)", "foo")

クエリ内に $数字 と書いておいてクエリ実行時の第2引数以降に値を指定すると、クエリに値を埋め込むことができる。 datetime 型など、一部の型のオブジェクトはそのまま指定してもよしなに内部で変換してクエリに埋め込んでくれる。どの型をどう変換するかは公式ドキュメントに載っており、自分でカスタマイズすることもできる。

データを取得する

execute の代わりに fetch などのメソッドでクエリを実行すると結果を取得できる。

await conn.fetch("SELECT * FROM users")  #=> [<Record id='1' name='foo'>, <Record id='2' name='bar'>]

結果は asyncpg.Record オブジェクトのリストとして得られる。

最初の1行目だけを取得する fetchrow や、1行1列目の値だけを取得する fetchval も便利。

await conn.fetchrow("SELECT * FROM users WHERE id = $1", "1")  #=> <Record id='1' name='foo'>
await conn.fetchval("SELECT COUNT(1) FROM users")  #=> 2

Record オブジェクトについて

Record オブジェクトは tuple のようにも dict のようにも振る舞うので、わざわざ他のデータ型に変換する必要はあまりなさそう。

record = await conn.fetchrow("SELECT * FROM users WHERE id = $1", "1")

record[1]  #=> インデックスでアクセス
record["name"]  #=> キーでアクセス

トランザクションを使う

トランザクションを使う場合は非同期コンテキストマネージャ (async with) ブロック内に処理を書く。

async with conn.transaction():
    await conn.execute("...")

async with を使ってトランザクションを開始した場合は、ブロックが終了した時点で自動で commit されるため、明示的に commit する必要はない。

また、コネクションプーリングをしている場合は明示的にコネクションを取得してトランザクションを開始する。

async with pool.acquire() as conn:
    async with conn.transaction():
        await conn.execute("...")

タイムアウトを設定する

コネクション作成時に command_timeout に秒数を指定すると、クエリのデフォルトタイムアウトを設定できる。

conn = await asyncpg.connect(dsn, command_timeout=60)

クエリ実行のたびに個別にタイムアウトを設定する場合は executefetch の引数で timeout を指定する。

await conn.execute("...", timeout=60)

便利 Tips

接続が失敗した際にリトライする

アプリの開発環境を Docker で動かしていて、DB とアプリを同時に起動すると DB が起動しきる前にアプリが接続しにいってエラーになることがある。これを回避するために DB が起動するまで何度か再試行すればよい。

from asyncio import sleep
import asyncpg

async def connect(*args, **kwargs):
    e = None
    for i in range(10):
        try:
            await sleep(i)
            return await asyncpg.create_pool(*args, **kwargs)
        except ConnectionError as _e:
            e = _e
    raise e

取得したデータを Pandas DataFrame にする

fetch の結果は Record オブジェクトのリストなので、それをそのまま pd.DataFrame() に入れてしまってもなんとかなる。

import pandas as pd

records = await conn.fetch("SELECT * FROM users")
if len(records) > 0:
    df = pd.DataFrame(records, columns=list(records[0].keys()))
else:
    # 結果が0件だった場合の処理

Pandas DataFrame の内容を INSERT する

asyncpg は COPY 系のメソッドが豊富で、タプルのリストを COPY コマンドで Bulk Insert してくれる copy_records_to_table を使えば Pandas DataFrame も簡単に INSERT できる。

await conn.copy_records_to_table("users", records=df.itertuples(), columns=df.columns)

CSV ファイルを INSERT する

同様に copy_to_table を使えば CSV フォーマットなどのファイルから簡単に INSERT できる。

await conn.copy_to_table("users", source="users.csv", format="csv")

指定したオプションは多くが COPY クエリにそのまま流れるので、COPY クエリの仕様を理解すれば使いこなせる。(format, null, header など)
PostgreSQL: Documentation: COPY

JSON 型を自動で変換する

json jsonb 型のデータは Python 側では文字列として扱われるため都度変換する必要があるが、 Connection.set_type_codec を使うとこれを自動で行うことができる。

import json

conn = await asyncpg.connect(dsn)
await conn.set_type_codec("jsonb", schema="pg_catalog", encoder=json.dumps, decoder=json.loads)

コネクションプールを使う場合は作成時に init オプションで指定する。

import json

async def init_connection(conn):
    await conn.set_type_codec("jsonb", schema="pg_catalog", encoder=json.dumps, decoder=json.loads)

pool = await asyncpg.create_pool(dsn, init=init_connection)
21
22
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
22

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?