psycopg のバージョン3がいつの間にか出ていた。1
asyncio に対応しているのと row_factory が Pydantic 型を返せるようになったと聞いて気になったのでちょっと触ってみたメモ。
初期設定
今回は FastAPI の非同期エンドポイントから使うことを想定して、コネクションプールと接続オブジェクトを返す非同期ジェネレータを作成した。
database.py
from os import getenv
from typing import AsyncGenerator
from psycopg import AsyncConnection
from psycopg_pool import AsyncConnectionPool
DATABASE_URL = getenv("DATABASE_URL")
assert DATABASE_URL
pool = AsyncConnectionPool(DATABASE_URL, open=False)
async def get_db() -> AsyncGenerator[AsyncConnection, None]:
async with pool.connection() as conn:
yield conn
app.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
import database
@asynccontextmanager
async def lifespan(app: FastAPI):
await database.pool.open()
yield
await database.pool.close()
app = FastAPI(lifespan=lifespan)
row_factory で Pydantic 型を得る
row_factory を使うことでクエリ結果を Pydantic 型で得られるようになった。
views.py
from fastapi import APIRouter, Depends, HTTPException
from psycopg.rows import class_row
from pydantic import BaseModel
from database import AsyncConnection, get_db
router = APIRouter()
class Item(BaseModel):
id: int
name: str
@router.get("/items/{item_id}")
async def get_item(item_id: int, db: AsyncConnection = Depends(get_db)) -> Item:
async with db.cursor(row_factory=class_row(Item)) as cur:
await cur.execute("SELECT * FROM items WHERE %s", (item_id,))
item = await cur.fetchone()
if item is None:
raise HTTPException(status_code=404)
return item
その他
操作感はだいたい psycopg2 と同じまま asyncio 対応されたという感触だった。
昔まとめたこの内容がほぼそのまま使えそう。