PythonのフレームワークであるFastAPIを触る機会があって、その中で非同期プログラミングの存在を知りました。
非同期プログラミングについて色々調べていくと、DB接続も非同期処理に対応してきていることが分かり、PythonのSQLライブラリで有名なSQLAlchemyもバージョン1.4から非同期処理に対応しているようです。
時代は非同期プログラミングみたいですね。
ただSQLAlchemyの非同期処理の記事がまだ少なく、同期処理から非同期処理に変えるときに、苦労しそうなので、今回はSQLAlchemy同期処理と非同期処理で設定やクエリがどう違うの比較しようと思います。
SQLAlchemyにはcoreとormがありますが、ここではormで書いていきます。
APIにはFastAPI、DBはPostgreSQLを使っていきます。
動作確認用コードは下記リポジトリに置いています。
同期処理:https://github.com/y-p-e/sync_postgresql_fastapi
非同期処理:https://github.com/y-p-e/async_postgresql_fastapi
#事前準備と前提条件
まずは、APIとDBを接続するための設定を比較していきます。
PostgreSQL-ドライバー
まずは、アプリ(FastAPI)とDB(PostgreSQL)を接続するためのドライバーからです。
同期接続
PostgreSQLと同期接続するために使えるドライバーはいくつかあります。
- psycopg2
- pg8000
- py-postgresql
この中ではpsycopg2が一番有名かと思うので、psycopg2を選択します。
非同期接続
PostgreSQLと非同期接続するためのドライバーもいくつかあります。
- Databases
- Tortoise ORM
- ormantic
- GINO
- asyncpg
ほとんどがSQLAlchemy Coreの方に対応しているのですが、FastAPIでコードを書いていくならSQLAlchemy ORMに対応していた方が良いので、ここではasyncpgを選択します。
参考:データベースの非同期処理
ちなみに、asyncpgはpsycopg2よりも3倍早いそうです。これだけでもasyncpgにする価値はありそうですね。
In our testing asyncpg is, on average, 3x faster than psycopg2 (and its asyncio variant -- aiopg).
SQLAlchemy-エンジン & セッション
SQLAlchemyを動かすためのエンジンの設定の違いを見てみます。
同期接続
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base
DB_URL = '{}://{}:{}@{}:{}/{}'.format("postgresql+psycopg2", "admin", "password", "db", "5432", "sync_db")
engine = create_engine(DB_URL, echo=True)
Session = scoped_session(
sessionmaker(
autocommit = False,
autoflush = False,
bind = engine))
非同期接続
from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base
DB_URL = '{}://{}:{}@{}:{}/{}'.format("postgresql+asyncpg", "admin", "password", "db", "5432", "async_db")
engine = create_async_engine(DB_URL, echo=True)
Session = scoped_session(
sessionmaker(
autocommit = False,
autoflush = False,
bind = engine,
class_=AsyncSession))
非同期の場合は非同期用のcreate_async_engine
とAsyncSession
をインポートする必要があります。
そして、sessionmakerの引数にclass_=AsyncSession
を設定する必要があります。
ドライバー&エンジン&セッション
DBと接続するために、ドライバー、エンジン、セッションというものが出てきました。
最初これらの違いが良くわからなかったので、現実世界と照らし合わせて自分の理解を示しておきます。
自分の理解なので厳密には違うと思いますが、大きくは外れていないかと思っています。
まずドライバーは、DBへ続く道なのかなと思っています。そもそも道がなかったら目的地へも辿りつけないので、一番最初に道を作ってあげるイメージです。
その次に、エンジンです。エンジンというと車のエンジンが一番身近にあるかなと思います。なので、目的地に行くための乗り物がSQLAlchemyというイメージです。
最後にセッションです。セッションは、SQLAlchemy専用の道路というイメージです。大きな道にはバス専用の道路とかあるみたいな感じかなと思っています。
マイグレーション
では、実際にCRUD処理の違いを見ていく前に、テーブルを作っておきましょう。
テーブルを作るにはマイグレーションをする必要がありますが、マイグレーションを非同期でする必要はないかなと思うので同期処理で作っておきます。
SQLAlchemyで作られたテーブルのマイグレーションを実行するファイルを用意しました。
from sqlalchemy import create_engine
from api.models.task import Base
DB_URL = '{}://{}:{}@{}:{}/{}'.format("postgresql+psycopg2", "admin", "password", "db", "5432", "async_db")
engine = create_engine(DB_URL, echo=True)
def reset_database():
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
if __name__ == "__main__":
reset_database()
CRUD
準備ができたので、CRUD処理のそれぞれの違いを見ていきます。
Create
同期接続
def create_task(db: Session, task_create: task_schema.TaskCreate):
task = task_model.Task(title=task_create.title)
db.add(task)
db.commit()
return task
非同期接続
async def create_task(db: AsyncSession, task_create: task_schema.TaskCreate):
task = task_model.Task(title=task_create.title)
db.add(task)
await db.commit()
await db.refresh(task)
return
Read 全件取得
同期接続
def get_tasks(db: Session):
tasks = db.query(task_model.Task).all()
return
非同期接続
async def get_tasks(db: AsyncSession):
result = await (db.execute(select(task_model.Task.id,task_model.Task.title,)))
return result.all()
Read 1件取得
同期接続
def get_task(db: Session, task_id):
task = db.query(task_model.Task).filter(task_model.Task.id == task_id).first()
return task
非同期接続
async def get_task(db: AsyncSession, task_id):
result = await (db.execute(select(task_model.Task.id,task_model.Task.title,).filter(task_model.Task.id == task_id)))
return result.first()
Update
同期接続
def update_task(db: Session, task_id, task_create: task_schema.TaskCreate):
task = db.query(task_model.Task).filter(task_model.Task.id == task_id).first()
task.title = task_create.title
db.add(task)
db.commit()
return task
非同期接続
async def update_task(db: AsyncSession, task_id, task_create: task_schema.TaskCreate):
result = await (db.execute(select(task_model.Task).filter(task_model.Task.id == task_id)))
task = result.first()
task[0].title = task_create.title
db.add(task[0])
await db.commit()
await db.refresh(task[0])
return task[0]
Delete
同期接続
def delete_task(db: Session, task_id):
task = db.query(task_model.Task).filter(task_model.Task.id == task_id).delete()
db.commit()
return
非同期接続
async def delete_task(db: AsyncSession, task_id):
result = await db.execute(select(task_model.Task).filter(task_model.Task.id == task_id))
task = result.first()
await db.delete(task[0])
await db.commit()
return task
まとめ
SQLAlchemyのORMも非同期処理に対応したということで、同期処理と非同期処理の違いをまとめてみました。
非同期処理でも同期処理と似たような書き方はできるけど、微妙に違うので気をつけて書いていかないとハマるポイントかなと思います。
あとは、使えるドライバーがそもそも違うのでここも注意するところかなと思います。ただ、非同期処理対応のasyncpgはpsycopg2よりも3倍早いということなので、かなりメリットはありそうな気がします。