はじめに
実務でFastAPIを使用しており、データベース操作にSQLAlchemyを使用しています。
知識の整理をする意味でも、初期の設定とDB操作についてまとめようと思いました。
SQLAlchemyについてよくわからないという初学者の方に対して参考になれば幸いです
SQLAlchemyとは
SQLAlchemyとは、Python向けのSQLツールキットで、ORM(Object-Relational Mapper)ライブラリの一つです。SQLAlchemyを使用するとPythonのプログラムからデータベースとのやりとりを容易に行うことができます。
以下、SQLAlchemyの公式ドキュメントです。
ORM(Object-Relational Mapping)とは
ORMはオブジェクト指向プログラミング言語(今回の場合はPython)とリレーショナルデータベース(MySQLなど)との間のデータをマッピングします。
ORMを使用することにより、データベース操作を行う際SQLクエリを直接記述する必要がなく、プログラム内で操作を行うことができます。利点としては、以下の点が例として挙げられます。
- SQLクエリを書くことなく、プログラミング言語(python)だけでデータベース操作が可能
- 簡潔な命令で操作することができ、コードの可読性が向上する。
- SQLインジェクション攻撃への対策がなされており、セキュリティ面においてもメリットがある。 など
ソースコードとディレクトリ構造
今回、SQLAlchemyの公式ドキュメントを参照しながら以下のリポジトリで作業しながら挙動を確認しました。もしよければご参照ください。
また、ディレクトリ構造は以下のとおりです。
(ちょっと見づらいので、詳細は上記リポジトリをご参照ください)
/
├ src
│ ├ databases
│ | ├ migration
│ | | └ versions
│ | | └ xxxxxx_create_items_table.py
│ | └ alembic.ini
│ ├ models
│ | ├ __init__.py
│ | └ item.py
│ ├ routers
│ | ├ __init__.py
│ | └ item.py
│ ├ schemas
│ | ├ __init__.py
│ | └ item.py
│ ├ __init__.py
│ ├ db.py
│ └ main.py
├ Dockerfile
├ docker-compose.yml
└ README.md
セットアップ方法
インストール
パッケージ管理ツールとしてpip
を使用している場合は①、poetry
を使用している場合は②のコマンドによりライブラリをインストールします。
①pipを使用する場合
pip install sqlalchemy
②poetryを使用する場合
poetry add sqlalchemy
Session
の作成
FastAPIにおいてSQLAlchemyを使用する場合、データベースセッションを作成する必要があります。データベースとの通信を管理し、トランザクションを制御するための関数を作成するといったイメージです。
ファイル名はdb.py
で管理するとして以下一般的な実装例を記述します。(プロジェクトによってはsetting.py
など色々あるような印象です。)
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
# データベースの接続情報を設定
DATABASE_URL = "postgresql://username:password@localhost/database_name"
# データベースエンジンを作成
engine = create_engine(DATABASE_URL)
# セッションファクトリを作成
Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)
# モデルを定義するための基本となるBaseクラスを作成
Base = declarative_base()
# セッションを依存性として定義
def get_db():
db = Session()
try:
yield db
finally:
db.close()
上記の例では、データベースはpostgres
を使用する想定で記述しています。
実装例のURLはダミーで、username
やpassword
は、それぞれ設定しているものを組み合わせて代入する必要があります。このget_db()
を各エンドポイントに設定してデータを取得・更新する処理を記述することができます。
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy import select
from sqlalchemy.orm import Session
from src.models.item import Item
from src.schemas.item import Item as ItemCreate, ItemOrm
from src.db import get_db
router = APIRouter()
@router.get("/items", response_model=List[ItemOrm])
async def get_items(db: Session = Depends(get_db)):
"""
Itemを全件取得するためのエンドポイント
"""
items = db.scalars(select(Item)).all()
return items
データを作成する方法
SQLAlchemyを使用してデータを作成する方法を説明します。
全部網羅はできませんが、基本的な使い方としてご参照いただければと思います
今回は例として、Pydantic
というPythonの型定義を提供してくれるライブラリを使用してItem
モデルの型を以下のように定義しています。
from datetime import datetime
from pydantic import BaseModel, Field
class ItemBase(BaseModel):
name: str = Field(None, example='ふしぎなアメ')
description: str = Field(None, example='食べるとレベルが1アップする')
class Item(ItemBase):
pass
class ItemOrm(ItemBase):
id: int
created_at: datetime
updated_at: datetime
class Config:
from_attributes=True
データ1件を作成する場合
以下のとおり実装例を記載します。
パスオペレーション関数の引数としてdb: Session = Depends(get_db)
と先ほど作成したget_db()
を読み込むことで関数内で変数db
を使用し、DB操作をすることができます。
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from src.models.item import Item
from src.schemas.item import Item as ItemCreate, ItemOrm
from src.db import get_db
router = APIRouter()
@router.post("/items", response_model=ItemOrm)
async def create_item(item_data: ItemCreate, db: Session = Depends(get_db)):
"""
Itemを一件Insertするためのエンドポイント
"""
item = Item(**item_data.dict())
# データの追加
db.add(item)
# データの登録
db.commit()
# 一時データのリフレッシュ
db.refresh(item)
return item
上記では、パスオペレーション関数にItemCreate
という型のitem_dataを設定していますが、これによりパラメータとしてname
とdescription
を受け取ることができます。
モデルとして定義したItem
のインスタンスを作成し、引数として受け取ったitem_data
を代入します。
その後、db.add(item)
としてデータを追加、commit()
をすることで登録・保存することができます。
以下SwaggerUIの画面になり、Request body
部分のname
とdescription
のvalueをいじってExecuteを押せばデータが作成されます。
トランザクションを明示的に設定する
一件の保存だけであれば、上記の実装例で足りるのですが、複数の保存処理が一つのパスオペレーション関数で実行される場合、トランザクションとしての整合性を担保する必要があります。
その場合に、以下のように実装すればwith db as session:
にネストされたブロック内を一連のトランザクションとしてまとめることができます。
(書き方は一例ですので、詳細は公式ドキュメントをご覧ください)
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from src.models.item import Item
from src.schemas.item import Item as ItemCreate, ItemOrm
from src.db import get_db
router = APIRouter()
@router.post("/items", response_model=ItemOrm)
async def create_item(item_data: ItemCreate, db: Session = Depends(get_db)):
"""
Itemを一件Insertするためのエンドポイント
"""
with db as session:
item = Item(**item_data.dict())
session.add(item)
session.flush()
return item
flush()
は、トランザクション内での変更をデータベースに反映させる前に、変更を一時的にデータベースに送信するメソッドであり、commit()
のような確定処理の1歩手前の状態を維持してくれます。
flush()
を使うことにより、トランザクション内でエラーが発生した場合、データが中途半端に作られることなくロールバックしてくれます。
そして、トランザクションが無事完了した場合、セッションは自動的にクローズされて変更がデータベースにコミットされます。
複数のデータを作成する場合
一件ずつadd
するのではなく、リストとしてデータを受け取り、一気に保存したい時は以下のようにadd_all()
を使うことで実現できます。
from typing import List
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from src.models.item import Item
from src.schemas.item import Item as ItemCreate, ItemOrm
from src.db import get_db
router = APIRouter()
@router.post("/item_list", response_model=List[ItemOrm])
async def create_item_list(item_data_list: List[ItemCreate], db: Session = Depends(get_db)):
"""
複数まとめてItemをInsertするためのエンドポイント
"""
item_list = [Item(
name=item.name,
description=item.description,
) for item in item_data_list]
db.add_all(item_list)
db.commit()
return item_list
複数のデータ保存は処理速度などが重要になってくると思いますが、以下の記事が参考になりましたので添付させていただきます。
データを取得する方法
get_db()
を使用してデータを取得する方法を説明します。
query
を使用した取得方法もありますが、今回はSQLAlchemy2.0で推奨されているscalars()
を使用した取得方法をメインに紹介していきます。
基本的な記述方法としてご参照いただけると嬉しいです。
データ一覧を取得したい場合
作成したItem
データを全て取得する実装例を以下のとおり記載します。
db.scalars(select(Item)).all()
でItem
を全件取得し、それをitem
という変数に格納して返しています。
書き方は、このほかにもdb.execute(select(Item)).scalars().all()
があり、どちらも同じ結果が返ってきます。
from typing import List
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from src.models.item import Item
from src.schemas.item import Item as ItemCreate, ItemOrm
from src.db import get_db
router = APIRouter()
@router.get("/items", response_model=List[ItemOrm])
async def get_items(db: Session = Depends(get_db)):
"""
Itemを全件取得するためのエンドポイント
"""
# 複数取得の場合はscalars()を使用する。
items = db.scalars(select(Item)).all()
"""
もしくは以下のように記載してもOk。
items = db.execute(select(Item)).scalars().all()
"""
return items
ID指定した一件を取得したい場合
今度はID指定をして特定のItem
データを一件取得する場合の実装例を記載します。
複数件取得のscalars()
に対して1件のみの取得の場合はscalar()
を使用して取得することができます。
db.execute(select(Item).where(Item.id == item_id)).scalar()
や、db.scalars(select(Item).where(Item.id == item_id)).first()
としても取得することが可能です。
from typing import List
from fastapi import APIRouter, Depends, HTTPException
from sqlalchemy.orm import Session
from src.models.item import Item
from src.schemas.item import Item as ItemCreate, ItemOrm
from src.db import get_db
router = APIRouter()
@router.get("/items/{item_id}", response_model=ItemOrm)
async def get_item(item_id: int, db: Session = Depends(get_db)):
"""
ID指定したItemを取得するエンドポイント
"""
"""
一件取得の場合は、scalar()で取得可能
以下と同じ結果となる。
item = db.execute(select(Item).where(Item.id == item_id)).scalar()
item = db.scalars(select(Item).where(Item.id == item_id)).first()
"""
item = db.scalar(select(Item).where(Item.id == item_id))
if item is None:
raise HTTPException(status_code=404, detail="Item not found")
return item
最後に
以上、簡単ではありますがSQLAlchemyでのDB操作をまとめました!
最近になってFastAPIを触った新参者ですが、記事をまとめる中で色々調べて新たな気づきが多かったです。
ここまで読んでいただいてありがとうございました!何か誤りありましたらご指摘いただけますと嬉しいです