33
23

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

SQLAlchemy同期/非同期の比較

Posted at

PythonのフレームワークであるFastAPIを触る機会があって、その中で非同期プログラミングの存在を知りました。
非同期プログラミングについて色々調べていくと、DB接続も非同期処理に対応してきていることが分かり、PythonのSQLライブラリで有名なSQLAlchemyもバージョン1.4から非同期処理に対応しているようです。
時代は非同期プログラミングみたいですね。

ただSQLAlchemyの非同期処理の記事がまだ少なく、同期処理から非同期処理に変えるときに、苦労しそうなので、今回はSQLAlchemy同期処理と非同期処理で設定やクエリがどう違うの比較しようと思います。
SQLAlchemyにはcoreとormがありますが、ここではormで書いていきます。
APIにはFastAPI、DBはPostgreSQLを使っていきます。

動作確認用コードは下記リポジトリに置いています。
同期処理:https://github.com/y-p-e/sync_postgresql_fastapi
非同期処理:https://github.com/y-p-e/async_postgresql_fastapi

#事前準備と前提条件
まずは、APIとDBを接続するための設定を比較していきます。

PostgreSQL-ドライバー

まずは、アプリ(FastAPI)とDB(PostgreSQL)を接続するためのドライバーからです。

同期接続

PostgreSQLと同期接続するために使えるドライバーはいくつかあります。

  • psycopg2
  • pg8000
  • py-postgresql

この中ではpsycopg2が一番有名かと思うので、psycopg2を選択します。

参考:PythonからPostgreSQLに接続する方法


非同期接続

PostgreSQLと非同期接続するためのドライバーもいくつかあります。

  • Databases
  • Tortoise ORM
  • ormantic
  • GINO
  • asyncpg

ほとんどがSQLAlchemy Coreの方に対応しているのですが、FastAPIでコードを書いていくならSQLAlchemy ORMに対応していた方が良いので、ここではasyncpgを選択します。

参考:データベースの非同期処理

ちなみに、asyncpgはpsycopg2よりも3倍早いそうです。これだけでもasyncpgにする価値はありそうですね。

In our testing asyncpg is, on average, 3x faster than psycopg2 (and its asyncio variant -- aiopg).

参考:MagicStack/asyncpg

SQLAlchemy-エンジン & セッション

SQLAlchemyを動かすためのエンジンの設定の違いを見てみます。

同期接続

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base

DB_URL = '{}://{}:{}@{}:{}/{}'.format("postgresql+psycopg2", "admin", "password", "db", "5432", "sync_db")
engine = create_engine(DB_URL, echo=True)
Session = scoped_session(
            sessionmaker(
                autocommit = False,
                autoflush = False,
                bind = engine))  

非同期接続

from sqlalchemy import create_engine
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
from sqlalchemy.orm import sessionmaker, scoped_session, declarative_base


DB_URL = '{}://{}:{}@{}:{}/{}'.format("postgresql+asyncpg", "admin", "password", "db", "5432", "async_db")
engine = create_async_engine(DB_URL, echo=True)
Session = scoped_session(
            sessionmaker(
                autocommit = False,
                autoflush = False,
                bind = engine,
                class_=AsyncSession))   

非同期の場合は非同期用のcreate_async_engineAsyncSessionをインポートする必要があります。
そして、sessionmakerの引数にclass_=AsyncSessionを設定する必要があります。

ドライバー&エンジン&セッション

DBと接続するために、ドライバー、エンジン、セッションというものが出てきました。
最初これらの違いが良くわからなかったので、現実世界と照らし合わせて自分の理解を示しておきます。
自分の理解なので厳密には違うと思いますが、大きくは外れていないかと思っています。

まずドライバーは、DBへ続く道なのかなと思っています。そもそも道がなかったら目的地へも辿りつけないので、一番最初に道を作ってあげるイメージです。

その次に、エンジンです。エンジンというと車のエンジンが一番身近にあるかなと思います。なので、目的地に行くための乗り物がSQLAlchemyというイメージです。

最後にセッションです。セッションは、SQLAlchemy専用の道路というイメージです。大きな道にはバス専用の道路とかあるみたいな感じかなと思っています。

マイグレーション

では、実際にCRUD処理の違いを見ていく前に、テーブルを作っておきましょう。
テーブルを作るにはマイグレーションをする必要がありますが、マイグレーションを非同期でする必要はないかなと思うので同期処理で作っておきます。
SQLAlchemyで作られたテーブルのマイグレーションを実行するファイルを用意しました。

from sqlalchemy import create_engine
from api.models.task import Base

DB_URL = '{}://{}:{}@{}:{}/{}'.format("postgresql+psycopg2", "admin", "password", "db", "5432", "async_db")
engine = create_engine(DB_URL, echo=True)


def reset_database():
    Base.metadata.drop_all(bind=engine)
    Base.metadata.create_all(bind=engine)

if __name__ == "__main__":
    reset_database()

CRUD

準備ができたので、CRUD処理のそれぞれの違いを見ていきます。

Create

同期接続

def create_task(db: Session, task_create: task_schema.TaskCreate):
    task = task_model.Task(title=task_create.title)
    db.add(task)
    db.commit()
    return task

非同期接続

async def create_task(db: AsyncSession, task_create: task_schema.TaskCreate):
    task = task_model.Task(title=task_create.title)
    db.add(task)
    await db.commit()
    await db.refresh(task)
    return 

Read 全件取得

同期接続

def get_tasks(db: Session):
    tasks = db.query(task_model.Task).all()
    return 

非同期接続

async def get_tasks(db: AsyncSession):
    result = await (db.execute(select(task_model.Task.id,task_model.Task.title,)))
    return result.all()

Read 1件取得

同期接続

def get_task(db: Session, task_id):
    task = db.query(task_model.Task).filter(task_model.Task.id == task_id).first()
    return task

非同期接続

async def get_task(db: AsyncSession, task_id):
    result = await (db.execute(select(task_model.Task.id,task_model.Task.title,).filter(task_model.Task.id == task_id)))
    return result.first()

Update

同期接続

def update_task(db: Session, task_id, task_create: task_schema.TaskCreate):
    task = db.query(task_model.Task).filter(task_model.Task.id == task_id).first()
    task.title = task_create.title
    db.add(task)
    db.commit()
    return task

非同期接続

async def update_task(db: AsyncSession, task_id, task_create: task_schema.TaskCreate):
    result = await (db.execute(select(task_model.Task).filter(task_model.Task.id == task_id)))
    task = result.first()
    task[0].title = task_create.title
    db.add(task[0])
    await db.commit()
    await db.refresh(task[0])
    return task[0]

Delete

同期接続

def delete_task(db: Session, task_id):
    task = db.query(task_model.Task).filter(task_model.Task.id == task_id).delete()
    db.commit()
    return 

非同期接続

async def delete_task(db: AsyncSession, task_id):
    result = await db.execute(select(task_model.Task).filter(task_model.Task.id == task_id))
    task = result.first()
    await db.delete(task[0])
    await db.commit()
    return task

まとめ

SQLAlchemyのORMも非同期処理に対応したということで、同期処理と非同期処理の違いをまとめてみました。
非同期処理でも同期処理と似たような書き方はできるけど、微妙に違うので気をつけて書いていかないとハマるポイントかなと思います。
あとは、使えるドライバーがそもそも違うのでここも注意するところかなと思います。ただ、非同期処理対応のasyncpgはpsycopg2よりも3倍早いということなので、かなりメリットはありそうな気がします。

33
23
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
33
23

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?