LoginSignup
3
3
記事投稿キャンペーン 「2024年!初アウトプットをしよう」

ホロジュール収集バックエンド API の FastAPI 化

Last updated at Posted at 2024-01-09

はじめに

こちらも以前に、収集したホロライブの配信予定や動画情報を Android アプリや React アプリ から利用するためのバックエンド API を作成しましたが、ホロコレクタをアップデートしたのと、Pydantic や DI も含めた FastAPI による開発が強力だったので、FastAPI 版としてあらためて作成しました。

ホロコレクタ で収集した配信情報や配信者情報の参照・追加・更新・削除と認証を行う WebAPI となります。

全体的な処理の流れは、FastAPI や MongoDB のチュートリアルを参考にしています。

JWT 認証

MongoDB の users コレクションからユーザー名をもとに user ドキュメントを取得し、ハッシュ化したパスワードとの一致を確認します。

jwt_settings = get_jwt_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def verify_password(plain_password, hashed_password) -> bool:
    # パスワードが正しいかどうかを確認する
    return pwd_context.verify(plain_password, hashed_password)

async def authenticate_user(username: str, password: str, db = Depends(get_db)) -> UserModel | None:
    # ユーザーを認証する
    user: UserModel = await UserRepository.get_by_username(db, username)
    if not user:
        return None
    if not verify_password(password, user.password):
        return None
    return user

ユーザーを確認した後に、有効期限や秘密キーおよび暗号化アルゴリズムを指定して JWT のトークンを生成します。

def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    # アクセストークンを作成する
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=jwt_settings.exp_delta_minutes)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, jwt_settings.secret_key, algorithm=jwt_settings.algorithm)
    return encoded_jwt

ルーター側の get_token 非同期関数(post オペレーション)は、フォームデータと DB インターフェースを依存注入しています。
フォームデータのユーザー名とパスワードおよび DB インターフェースを利用して認証を行い、Bearerタイプのトークンを生成して返却します。

@router.post('/token', response_model=Token)
async def get_token(request: OAuth2PasswordRequestForm = Depends(), 
                    db: AgnosticDatabase = Depends(get_db)) -> dict:
    user: UserModel = await authenticate_user(request.username, request.password, db)
    if not user:
        raise CredentialsException(message='Incorrect username or password')
    access_token: str = create_access_token(data={'sub': user.username})
    return {
        'access_token': access_token,
        'token_type': 'bearer'
    }

各オペレーションは、get_current_active_user 関数を依存注入しており、受け取ったトークンを復号して検証し、現在のユーザーを返します。

@router.get("/schedules", 
            response_model=ScheduleCollection, 
            response_model_by_alias=False)
async def list_schedules(date: date = None, 
                         code: str = None, 
                         db: AgnosticDatabase = Depends(get_db), 
                         current_user: str = Depends(get_current_active_user)) -> ScheduleCollection:
    return await ScheduleRepository.list(db, date, code)

get_current_active_user 関数はこのようにしています。
トークンから取得した情報からユーザーを特定できない場合に認証エラーとしています。

async def get_current_user(token: str = Depends(oauth2_scheme), db = Depends(get_db)) -> UserModel:
    # トークンから現在のユーザーを非同期で取得する
    try:
        payload = jwt.decode(token, jwt_settings.secret_key, algorithms=jwt_settings.algorithm)
        username: str = payload.get("sub")
        if username is None:
            raise CredentialsException()
        token_data = TokenData(username=username)
    except JWTError:
        raise CredentialsException()
    return await UserRepository.get_by_username(db, username=token_data.username)

async def get_current_active_user(current_user: UserModel = Depends(get_current_user)) -> UserModel:
    # 現在のユーザーが有効かどうかを確認する
    if current_user.disabled:
        raise InactiveUserException(identifier=current_user.id)
    return current_user

MongoDB の利用

MongoDB の接続には非同期ドライバである Motor を利用し、try~yield~finally を用いることで後処理も行っています。

import motor.motor_asyncio
from motor.core import AgnosticClient, AgnosticDatabase
from api.settings import get_mongo_settings

mongo_settings = get_mongo_settings()

def get_db() -> AgnosticDatabase:
    try:
        client: AgnosticClient = motor.motor_asyncio.AsyncIOMotorClient(mongo_settings.uri)
        db: AgnosticDatabase = client.get_database(mongo_settings.database)
        yield db
    finally:
        client.close()

開発環境

  • Windows 11
  • PowerShell 7.4.0
  • Visual Studio Code 1.85
  • Python 3.11 + Poetry + pyenv
  • MongoDB 6.0

開発準備

Poetry と pyenv の確認

参考:Windows で Python の開発環境を構築する(Poetry と pyenv を利用)

> poetry --version
Poetry version 1.3.2

> pyenv --version
pyenv 3.1.1

MongoDB の確認

> mongosh --version
1.6.0

> mongosh localhost:27017/admin -u admin -p

データベースの作成とロール(今回は dbOwner )の設定

MongoDB > use holoduledb
MongoDB > db.createUser( { user:"owner", pwd:"password", roles:[{ "role" : "dbOwner", "db" : "holoduledb" }] } );

プロジェクト作成

プロジェクトを作成してディレクトリに入る

> poetry new holoservice --name api
Created package api in holoservice
> cd holoservice

プロジェクトで利用する Python を pyenv でインストール

> pyenv install 3.11.1

プロジェクトで利用するローカルの Python のバージョンを変更

> pyenv local 3.11.1
> python -V
Python 3.11.1

バージョンを指定して Python 仮想環境を作成(pyenv で管理している Python のパスを指定)

> python -c "import sys; print(sys.executable)"
> poetry env use C:\Users\[UserName]\.pyenv\pyenv-win\versions\3.11.1\python.exe

パッケージの追加

> poetry add pylint
> poetry add fastapi
> poetry add uvicorn[standard]
> poetry add motor
> poetry add pydantic-settings
> poetry add pytz
> poetry add python-multipart
> poetry add python-jose
> poetry add passlib[bcrypt]

プログラム実装

設定(api/settings.py)

.env から MongoDB と JWT の設定を読み込みます。

from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict

class MongoSettings(BaseSettings):
    # MongoDB関連の設定
    uri: str
    database: str
    model_config = SettingsConfigDict(env_file=".env", env_prefix='mongo_')

class JwtSettings(BaseSettings):
    # JWT関連の設定
    secret_key: str
    algorithm: str = "HS256"
    exp_delta_minutes: int = 15
    model_config = SettingsConfigDict(env_file=".env", env_prefix='jwt_')

@lru_cache
def get_mongo_settings() -> MongoSettings:
    # キャッシュしたMongoDB設定を取得する
    return MongoSettings()

@lru_cache
def get_jwt_settings() -> JwtSettings:
    # キャッシュしたJWT設定を取得する
    return JwtSettings()

認証(api/oauth2.py)

JWT 認証やユーザーの有効性の確認を行います。

from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
from datetime import datetime, timedelta
from jose import jwt
from jose.exceptions import JWTError
from passlib.context import CryptContext
from api.repository.user import UserRepository
from api.schemas.token import TokenData
from api.settings import get_jwt_settings
from api.schemas.user import UserModel
from api.db import get_db
from api.exceptions import CredentialsException, InactiveUserException

jwt_settings = get_jwt_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

def get_hashed_password(password) -> str:
    # パスワードをハッシュ化する
    return pwd_context.hash(password)

def verify_password(plain_password, hashed_password) -> bool:
    # パスワードが正しいかどうかを確認する
    return pwd_context.verify(plain_password, hashed_password)

async def authenticate_user(username: str, password: str, db = Depends(get_db)) -> UserModel | None:
    # ユーザーを認証する
    user: UserModel = await UserRepository.get_by_username(db, username)
    if not user:
        return None
    if not verify_password(password, user.password):
        return None
    return user

def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
    # アクセストークンを作成する
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=jwt_settings.exp_delta_minutes)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, jwt_settings.secret_key, algorithm=jwt_settings.algorithm)
    return encoded_jwt

async def get_current_user(token: str = Depends(oauth2_scheme), db = Depends(get_db)) -> UserModel:
    # アクセストークンから現在のユーザーを非同期で取得する
    try:
        payload = jwt.decode(token, jwt_settings.secret_key, algorithms=jwt_settings.algorithm)
        username: str = payload.get("sub")
        if username is None:
            raise CredentialsException()
        token_data = TokenData(username=username)
    except JWTError:
        raise CredentialsException()
    return await UserRepository.get_by_username(db, username=token_data.username)

async def get_current_active_user(current_user: UserModel = Depends(get_current_user)) -> UserModel:
    # 現在のユーザーが有効かどうかを確認する
    if current_user.disabled:
        raise InactiveUserException(identifier=current_user.id)
    return current_user

ミドルウェア(api/middlewares.py)

ログ記録やエラー処理などを後で組み込む予定で準備だけしています。

from typing import Any
from fastapi import Request
from api.exceptions import BaseAPIException

async def request_handler(request: Request, call_next) -> Any:
    try:
        # すべてのリクエストがパスオペレーションで処理される前に機能する
        return await call_next(request)
    except Exception as ex:
        if isinstance(ex, BaseAPIException):
            return ex.response()
        raise ex

データベース(api/db.py)

非同期ドライバである Motor を利用して MongoDB の接続を管理しています。

import motor.motor_asyncio
from motor.core import AgnosticClient, AgnosticDatabase
from api.settings import get_mongo_settings

mongo_settings = get_mongo_settings()

def get_db() -> AgnosticDatabase:
    # MongoDB へ接続し、データベースを返却する
    try:
        client: AgnosticClient = motor.motor_asyncio.AsyncIOMotorClient(mongo_settings.uri)
        db: AgnosticDatabase = client.get_database(mongo_settings.database)
        yield db
    finally:
        client.close()

例外(api/exceptions.py)

アプリケーションで利用する例外やエラーのクラスを定義しています。

from typing import Type
from fastapi.responses import JSONResponse
from fastapi import status
from pydantic import BaseModel, Field

class BaseError(BaseModel):
    message: str = Field(..., description="Error message or description")

class BaseIdentifiedError(BaseError):
    identifier: str = Field(..., description="Unique identifier which this error references to")

class NotFoundError(BaseIdentifiedError):
    pass

class AlreadyExistsError(BaseIdentifiedError):
    pass

class InactiveUserError(BaseIdentifiedError):
    pass

class CredentialsError(BaseError):
    pass

class BaseAPIException(Exception):
    message = "Generic error"
    code = status.HTTP_500_INTERNAL_SERVER_ERROR
    model = BaseError
    headers = None

    def __init__(self, **kwargs):
        kwargs.setdefault("message", self.message)
        self.message = kwargs["message"]
        self.data = self.model(**kwargs)

    def __str__(self):
        return self.message

    def response(self):
        return JSONResponse(
            content=self.data.model_dump(),
            status_code=self.code,
            headers=self.headers
        )

    @classmethod
    def response_model(cls):
        return {cls.code: {"model": cls.model}}

class BaseIdentifiedException(BaseAPIException):
    message = "Entity error"
    code = status.HTTP_500_INTERNAL_SERVER_ERROR
    model = BaseIdentifiedError

    def __init__(self, identifier, **kwargs):
        super().__init__(identifier=identifier, **kwargs)

class NotFoundException(BaseIdentifiedException):
    message = "The entity does not exist"
    code = status.HTTP_404_NOT_FOUND
    model = NotFoundError

class AlreadyExistsException(BaseIdentifiedException):
    message = "The entity already exists"
    code = status.HTTP_409_CONFLICT
    model = AlreadyExistsError

class InactiveUserException(BaseIdentifiedException):
    message = "Inactive user"
    code = status.HTTP_400_BAD_REQUEST
    model = InactiveUserError

class CredentialsException(BaseAPIException):
    message = "Colud not validate credentials"
    code = status.HTTP_401_UNAUTHORIZED,
    model = CredentialsError
    headers = {'WWW-Authenticate': "Bearer"}

def get_exception_responses(*args: Type[BaseAPIException]) -> dict:
    responses = dict()
    for cls in args:
        responses.update(cls.response_model())
    return responses

メイン(api/main.py)

このアプリケーションのエントリポイントです。
FastAPIのインスタンスを作成して初期化しています。

from fastapi import FastAPI, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from api.routers import schedule
from api.routers import streamer
from api.routers import user
from api.routers import authentication
from api.middlewares import request_handler

# FastAPIインスタンスを作成
app = FastAPI(
    title="Holodule API Service",
    summary="ホロジュールから取得した配信スケジュールを API として提供します。",
)

# HTTPリクエストの前後で処理を実行するためのミドルウェアを登録
app.middleware("http")(request_handler)

# CORS設定
origins: list = ["*"] # とりあえず全てのオリジンを許可
app.add_middleware(
    CORSMiddleware,
    allow_origins=origins,
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# FastAPIルーターを登録
app.include_router(schedule.router)
app.include_router(streamer.router)
app.include_router(user.router)
app.include_router(authentication.router)

# FastAPIのエラーハンドラーを登録
@app.exception_handler(RequestValidationError)
async def handler(request:Request, exc:RequestValidationError) -> JSONResponse:
    print(exc)
    return JSONResponse(content={}, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)

配信情報ルーター(api/routers/schedule.py)

配信情報に関わる FastAPI のパスオペレーション関数群です。

from datetime import date
from fastapi import APIRouter, Body, Depends, Response, status
from motor.core import AgnosticDatabase
from api.oauth2 import get_current_active_user
from api.db import get_db
from api.repository.schedule import ScheduleRepository
from api.schemas.schedule import ScheduleModel, ScheduleCollection

router = APIRouter(
    tags=['schedule']
)

@router.get("/schedules/{id}", 
            response_model=ScheduleModel, 
            response_model_by_alias=False)
async def get_schedule(id: str, 
                       db: AgnosticDatabase = Depends(get_db), 
                       current_user: str = Depends(get_current_active_user)) -> ScheduleModel:
    # id を指定してスケジュールを取得
    return await ScheduleRepository.get_by_id(db, id)

@router.get("/schedules", 
            response_model=ScheduleCollection, 
            response_model_by_alias=False)
async def list_schedules(date: date = None, 
                         code: str = None, 
                         db: AgnosticDatabase = Depends(get_db), 
                         current_user: str = Depends(get_current_active_user)) -> ScheduleCollection:
    # date や code を指定してスケジュールを取得
    return await ScheduleRepository.list(db, date, code)

@router.post("/schedules", 
             response_model=ScheduleModel, 
             status_code=status.HTTP_201_CREATED, 
             response_model_by_alias=False)
async def create_schedule(schedule: ScheduleModel = Body(...), 
                          db: AgnosticDatabase = Depends(get_db), 
                          current_user: str = Depends(get_current_active_user)) -> ScheduleModel:
    # スケジュールを登録
    return await ScheduleRepository.create(db, schedule)

@router.put("/schedules/{id}", 
            response_model=ScheduleModel, 
            response_model_by_alias=False)
async def update_schedule(id: str, 
                          schedule: ScheduleModel = Body(...), 
                          db: AgnosticDatabase = Depends(get_db), 
                          current_user: str = Depends(get_current_active_user)) -> ScheduleModel:
    # スケジュールを更新
    return await ScheduleRepository.update(db, id, schedule)

@router.delete("/schedules/{id}")
async def delete_schedule(id: str, 
                          db: AgnosticDatabase = Depends(get_db), 
                          current_user: str = Depends(get_current_active_user)) -> Response:
    # スケジュールをさくじょ
    await ScheduleRepository.delete(db, id);
    return Response(status_code=status.HTTP_204_NO_CONTENT)

配信者情報ルーター(api/routers/streamer.py)

配信者情報に関わる FastAPI のパスオペレーション関数群です。

from fastapi import APIRouter, Body, Depends, Response, status
from motor.core import AgnosticDatabase
from api.oauth2 import get_current_active_user
from api.db import get_db
from api.repository.streamer import StreamerRepository
from api.schemas.streamer import StreamerModel, StreamerCollection

router = APIRouter(
    tags=['streamer']
)

@router.get("/streamers/{id}", 
            response_model=StreamerModel, 
            response_model_by_alias=False)
async def get_streamer(id: str, 
                   db: AgnosticDatabase = Depends(get_db), 
                   current_user: str = Depends(get_current_active_user)) -> StreamerModel:
    # id を指定して配信者情報を取得
    return await StreamerRepository.get_by_id(db, id);

@router.get("/streamers", 
            response_model=StreamerCollection, 
            response_model_by_alias=False)
async def list_streamers(db: AgnosticDatabase = Depends(get_db), 
                     current_user: str = Depends(get_current_active_user)) -> StreamerCollection:
    # 配信者情報を取得
    return await StreamerRepository.list(db);

@router.post("/streamers", 
             response_model=StreamerModel, 
             status_code=status.HTTP_201_CREATED, 
             response_model_by_alias=False)
async def create_streamer(streamer: StreamerModel = Body(...), 
                      db: AgnosticDatabase = Depends(get_db), 
                      current_user: str = Depends(get_current_active_user)) -> StreamerModel:
    # 配信者情報を登録
    return await StreamerRepository.create(db, streamer);

@router.put("/streamers/{id}", 
            response_model=StreamerModel, 
            response_model_by_alias=False)
async def update_streamer(id: str, 
                      streamer: StreamerModel = Body(...), 
                      db: AgnosticDatabase = Depends(get_db), 
                      current_user: str = Depends(get_current_active_user)) -> StreamerModel:
    # 配信者情報を更新
    return await StreamerRepository.update(db, id, streamer);

@router.delete("/streamers/{id}")
async def delete_streamer(id: str, 
                      db: AgnosticDatabase = Depends(get_db), 
                      current_user: str = Depends(get_current_active_user)) -> Response:
    # 配信者情報を削除
    await StreamerRepository.delete(db, id);
    return Response(status_code=status.HTTP_204_NO_CONTENT)

ユーザー情報ルーター(api/routers/user.py)

ユーザー情報に関わる FastAPI のパスオペレーション関数群です。

from fastapi import APIRouter, Body, Depends, Response, status
from motor.core import AgnosticDatabase
from api.oauth2 import get_current_active_user, get_hashed_password
from api.db import get_db
from api.repository.user import UserRepository
from api.schemas.user import UserModel, UserCollection

router = APIRouter(
    tags=['user']
)

@router.get("/users/{id}", 
            response_model=UserModel, 
            response_model_by_alias=False)
async def get_user(id: str, 
                   db: AgnosticDatabase = Depends(get_db), 
                   current_user: str = Depends(get_current_active_user)) -> UserModel:
    # id を指定してユーザーを取得
    return await UserRepository.get_by_id(db, id);

@router.get("/users", 
            response_model=UserCollection, 
            response_model_by_alias=False)
async def list_users(db: AgnosticDatabase = Depends(get_db), 
                     current_user: str = Depends(get_current_active_user)) -> UserCollection:
    # ユーザーを取得
    return await UserRepository.list(db);

@router.post("/users", 
             response_model=UserModel, 
             status_code=status.HTTP_201_CREATED, 
             response_model_by_alias=False)
# 認証無効化(利用ユーザー新規登録時)
# async def create_user(user: UserModel = Body(...), 
#                       db: AgnosticDatabase = Depends(get_db), 
#                       current_user: str = Depends(get_current_active_user)) -> UserModel:
async def create_user(user: UserModel = Body(...), 
                      db: AgnosticDatabase = Depends(get_db)) -> UserModel:
    # パスワードをハッシュ化してユーザーを登録
    if user.password is not None:
        user.password = get_hashed_password(user.password)
    return await UserRepository.create(db, user);

@router.put("/users/{id}", 
            response_model=UserModel, 
            response_model_by_alias=False)
async def update_user(id: str, 
                      user: UserModel = Body(...), 
                      db: AgnosticDatabase = Depends(get_db), 
                      current_user: str = Depends(get_current_active_user)) -> UserModel:
    # パスワードをハッシュ化してユーザーを更新
    if user.password is not None:
        user.password = get_hashed_password(user.password)
    return await UserRepository.update(db, id, user);

@router.delete("/users/{id}")
async def delete_user(id: str, 
                      db: AgnosticDatabase = Depends(get_db), 
                      current_user: str = Depends(get_current_active_user)) -> Response:
    # ユーザーを削除
    await UserRepository.delete(db, id);
    return Response(status_code=status.HTTP_204_NO_CONTENT)

認証ルーター(api/routers/authentication.py)

認証に関わる FastAPI のパスオペレーション関数群です。

from fastapi import APIRouter
from fastapi.param_functions import Depends
from fastapi.security.oauth2 import OAuth2PasswordRequestForm
from motor.core import AgnosticDatabase
from api.db import get_db
from api.exceptions import CredentialsException
from api.oauth2 import authenticate_user, create_access_token
from api.schemas.token import Token
from api.schemas.user import UserModel

router = APIRouter(
    tags=['authentication']
)

@router.post('/token', response_model=Token)
async def get_token(request: OAuth2PasswordRequestForm = Depends(), 
                    db: AgnosticDatabase = Depends(get_db)) -> dict:
    # ユーザー名とパスワードを指定して認証を行いトークンを発行
    user: UserModel = await authenticate_user(request.username, request.password, db)
    if not user:
        raise CredentialsException(message='Incorrect username or password')
    access_token: str = create_access_token(data={'sub': user.username})
    return {
        'access_token': access_token,
        'token_type': 'bearer'
    }

配信情報リポジトリ(api/repository/schedule.py)

配信情報に関わるリポジトリパターンの関数群です。

from datetime import datetime, timedelta, date, time
from bson import ObjectId
from pymongo import ReturnDocument
from motor.core import AgnosticDatabase
from api.schemas.schedule import ScheduleModel, ScheduleCollection
from api.exceptions import NotFoundException

class ScheduleRepository:
    @staticmethod
    async def get_by_id(db: AgnosticDatabase, 
                        id: str) -> ScheduleModel:
        schedule_collection = db.get_collection("schedules")
        # ウォルラス演算子を利用して schedule が None でない場合に schedule を返す
        if (schedule := await schedule_collection.find_one({"_id": ObjectId(id)})) is not None:
            return schedule
        raise NotFoundException(identifier=id)

    @staticmethod
    async def list(db: AgnosticDatabase, 
                   date: date = None, 
                   code: str = None) -> ScheduleCollection:
        schedule_collection = db.get_collection("schedules")
        filter_dict = {}
        if date is not None:
            start = datetime.combine(date, time())
            end = start + timedelta(days=1)
            filter_dict["streaming_at"] = {'$gte': start, '$lt': end}
        if code is not None:
            filter_dict["code"] = code
        return ScheduleCollection(schedules=await schedule_collection.find(filter_dict)
                                  .sort("streaming_at", -1)
                                  .to_list(1000))

    @staticmethod
    async def create(db: AgnosticDatabase, 
                     schedule: ScheduleModel) -> ScheduleModel:
        schedule_collection = db.get_collection("schedules")
        result = await schedule_collection.insert_one(
            schedule.model_dump(by_alias=True, exclude=["id"])
        )
        return await ScheduleRepository.get_by_id(db, result.inserted_id)

    @staticmethod
    async def update(db: AgnosticDatabase, 
                     id: str, 
                     schedule: ScheduleModel) -> ScheduleModel:
        schedule_collection = db.get_collection("schedules")
        # ユーザーのキーと値を取得し、辞書内包表記を利用して値が None でない場合に update_schedule に格納する
        update_schedule = {
            k: v for k, v in schedule.model_dump(by_alias=True).items() if v is not None
        }
        if len(update_schedule) >= 1:
            result = await schedule_collection.find_one_and_update(
                {"_id": ObjectId(id)},
                {"$set": update_schedule},
                return_document=ReturnDocument.AFTER, # 更新後のドキュメントを返す
            )
        if result is not None:
            return result
        else:
            raise NotFoundException(identifier=id)

    @staticmethod
    async def delete(db: AgnosticDatabase, 
                     id: str):
        schedule_collection = db.get_collection("schedules")
        result = await schedule_collection.delete_one({"_id": ObjectId(id)})
        if not result.deleted_count:
            raise NotFoundException(identifier=id)

配信者情報リポジトリ(api/repository/streamer.py)

配信者情報に関わるリポジトリパターンの関数群です。

from bson import ObjectId
from pymongo import ReturnDocument
from motor.core import AgnosticDatabase
from api.schemas.streamer import StreamerModel, StreamerCollection
from api.exceptions import NotFoundException

class StreamerRepository:
    @staticmethod
    async def get_by_id(db: AgnosticDatabase, 
                        id: str) -> StreamerModel:
        streamer_collection = db.get_collection("streamers")
        # ウォルラス演算子を利用して streamer が None でない場合に streamer を返す
        if (streamer := await streamer_collection.find_one({"_id": ObjectId(id)})) is not None:
            return StreamerModel(**streamer)
        raise NotFoundException(identifier=id)

    @staticmethod
    async def get_by_streamername(db: AgnosticDatabase, 
                              streamername: str) -> StreamerModel:
        streamer_collection = db.get_collection("streamers")
        # ウォルラス演算子を利用して streamer が None でない場合に streamer を返す
        if (streamer := await streamer_collection.find_one({"name": streamername})) is not None:
            return StreamerModel(**streamer)
        raise NotFoundException(identifier=streamername)

    @staticmethod
    async def list(db: AgnosticDatabase) -> StreamerCollection:
        streamer_collection = db.get_collection("streamers")
        return StreamerCollection(streamers=await streamer_collection.find().to_list(1000))

    @staticmethod
    async def create(db: AgnosticDatabase, 
                     streamer: StreamerModel) -> StreamerModel:
        streamer_collection = db.get_collection("streamers")
        result = await streamer_collection.insert_one(
            streamer.model_dump(by_alias=True, exclude=["id"])
        )
        return await StreamerRepository.get_by_id(db, result.inserted_id)

    @staticmethod
    async def update(db: AgnosticDatabase, 
                     id: str, 
                     streamer: StreamerModel) -> StreamerModel:
        streamer_collection = db.get_collection("streamers")
        # ストリーマーのキーと値を取得し、辞書内包表記を利用して値が None でない場合に update_streamer に格納する
        update_streamer = {
            k: v for k, v in streamer.model_dump(by_alias=True).items() if v is not None
        }
        # 更新対象が存在する場合は更新する
        if len(update_streamer) >= 1:
            result = await streamer_collection.find_one_and_update(
                {"_id": ObjectId(id)},
                {"$set": update_streamer},
                return_document=ReturnDocument.AFTER, # 更新後のドキュメントを返す
            )
        if result is not None:
            return result
        else:
            raise NotFoundException(identifier=id)

    @staticmethod
    async def delete(db: AgnosticDatabase, 
                     id: str):
        streamer_collection = db.get_collection("streamers")
        result = await streamer_collection.delete_one({"_id": ObjectId(id)})
        if not result.deleted_count:
            raise NotFoundException(identifier=id)

ユーザー情報リポジトリ(api/repository/user.py)

ユーザー情報に関わるリポジトリパターンの関数群です。

from bson import ObjectId
from pymongo import ReturnDocument
from motor.core import AgnosticDatabase
from api.schemas.user import UserModel, UserCollection
from api.exceptions import NotFoundException

class UserRepository:
    @staticmethod
    async def get_by_id(db: AgnosticDatabase, 
                        id: str) -> UserModel:
        user_collection = db.get_collection("users")
        # ウォルラス演算子を利用して user が None でない場合に user を返す
        if (user := await user_collection.find_one({"_id": ObjectId(id)})) is not None:
            return UserModel(**user)
        raise NotFoundException(identifier=id)

    @staticmethod
    async def get_by_username(db: AgnosticDatabase, 
                              username: str) -> UserModel:
        user_collection = db.get_collection("users")
        # ウォルラス演算子を利用して user が None でない場合に user を返す
        if (user := await user_collection.find_one({"username": username})) is not None:
            return UserModel(**user)
        raise NotFoundException(identifier=username)

    @staticmethod
    async def list(db: AgnosticDatabase) -> UserCollection:
        user_collection = db.get_collection("users")
        return UserCollection(users=await user_collection.find().to_list(1000))

    @staticmethod
    async def create(db: AgnosticDatabase, 
                     user: UserModel) -> UserModel:
        user_collection = db.get_collection("users")
        result = await user_collection.insert_one(
            user.model_dump(by_alias=True, exclude=["id"])
        )
        return await UserRepository.get_by_id(db, result.inserted_id)

    @staticmethod
    async def update(db: AgnosticDatabase, 
                     id: str, 
                     user: UserModel) -> UserModel:
        user_collection = db.get_collection("users")
        # ユーザーのキーと値を取得し、辞書内包表記を利用して値が None でない場合に update_user に格納する
        update_user = {
            k: v for k, v in user.model_dump(by_alias=True).items() if v is not None
        }
        # 更新対象が存在する場合は更新する
        if len(update_user) >= 1:
            result = await user_collection.find_one_and_update(
                {"_id": ObjectId(id)},
                {"$set": update_user},
                return_document=ReturnDocument.AFTER, # 更新後のドキュメントを返す
            )
        if result is not None:
            return result
        else:
            raise NotFoundException(identifier=id)

    @staticmethod
    async def delete(db: AgnosticDatabase, 
                     id: str):
        user_collection = db.get_collection("users")
        result = await user_collection.delete_one({"_id": ObjectId(id)})
        if not result.deleted_count:
            raise NotFoundException(identifier=id)

配信情報スキーマ(api/schemas/schedule.py)

配信情報に関わるスキーマ(APIのリクエストとレスポンスの定義)です。

from datetime import datetime, timezone, timedelta
from typing_extensions import Annotated
from pydantic import BaseModel, ConfigDict, Field, computed_field
from pydantic.functional_validators import BeforeValidator

PyObjectId = Annotated[str, BeforeValidator(str)]
UTC = timezone.utc
JST = timezone(timedelta(hours=+9), "JST")

class ScheduleModel(BaseModel):
    id: PyObjectId | None = Field(alias="_id", default=None, description="スケジュールID")
    code: str | None = Field(default=None, description="配信者コード")
    video_id: str | None = Field(default=None, description="動画ID")
    streaming_at: datetime | None = Field(default_factory=lambda: datetime.now(tz=JST), description="配信日時")
    name: str | None = Field(default=None, description="配信者名")
    title: str | None = Field(default=None, description="タイトル")
    url: str | None = Field(default=None, description="Youtube URL")
    description: str | None = Field(default=None, description="概要")
    published_at: datetime | None = Field(default_factory=lambda: datetime.now(tz=JST), description="投稿日時") 
    channel_id: str | None = Field(default=None, description="チャンネルID")
    channel_title: str | None = Field(default=None, description="チャンネル名")
    tags: list[str] = Field(default_factory=list, description="タグ")

    @computed_field
    @property
    def key(self) -> str:
        return self.code + "_" + self.streaming_at.strftime("%Y%m%d_%H%M%S") if (self.code is not None and self.streaming_at is not None) else ""

    model_config = ConfigDict(
        populate_by_name=True,          # エイリアス名でのアクセスを許可するか(例えば id と _id)
        arbitrary_types_allowed=True,   # 任意の型を許可するか
        json_schema_extra={
            "example": {
                "code": "HL0000",
                "video_id": "動画ID",
                "streaming_at": "2023-12-01T12:00:00Z",
                "name": "配信者名",
                "title": "タイトル",
                "url": "Youtube URL",
                "description": "概要",
                "published_at": "2023-12-01T12:00:00Z",
                "channel_id": "チャンネルID",
                "channel_title": "チャンネル名",
                "tags": []
            }
        },
    )

class ScheduleCollection(BaseModel):
    schedules: list[ScheduleModel]

配信者情報スキーマ(api/schemas/streamer.py)

配信者情報に関わるスキーマ(APIのリクエストとレスポンスの定義)です。

from typing_extensions import Annotated
from pydantic import BaseModel, ConfigDict, Field
from pydantic.functional_validators import BeforeValidator

PyObjectId = Annotated[str, BeforeValidator(str)]

class StreamerModel(BaseModel):
    id: PyObjectId | None = Field(alias="_id", default=None, description="ストリーマーID")
    code: str | None = Field(default=None, description="ストリーマーコード")
    name: str | None = Field(default=None, description="ストリーマー名")
    group: str | None = Field(default=None, description="グループ")
    affiliations: list[str] | None = Field(default=None, description="所属")
    image_name: str | None = Field(default=None, description="画像名")
    channel_id: str | None = Field(default=None, description="チャンネルID")
    is_retired: bool | None = Field(default=False, description="引退済み")

    model_config = ConfigDict(
        populate_by_name=True,          # エイリアス名でのアクセスを許可するか(例えば id と _id)
        arbitrary_types_allowed=True,   # 任意の型を許可するか
        json_schema_extra={
            "example": {
                "code": "HL0000",
                "name": "ホロライブ",
                "group": "hololive",
                "affiliations": ['bland', 'jp'],
                "image_name": "hololive.jpg",
                "channel_id": "@hololive",
                "is_retired": False
            }
        },
    )

class StreamerCollection(BaseModel):
    streamers: list[StreamerModel]

ユーザー情報スキーマ(api/schemas/user.py)

ユーザー情報に関わるスキーマ(APIのリクエストとレスポンスの定義)です。

from typing_extensions import Annotated
from pydantic import BaseModel, ConfigDict, Field
from pydantic.functional_validators import BeforeValidator

PyObjectId = Annotated[str, BeforeValidator(str)]

class UserModel(BaseModel):
    id: PyObjectId | None = Field(alias="_id", default=None, description="ユーザーID")
    username: str | None = Field(default=None, description="ユーザー名")
    password: str | None = Field(default=None, description="パスワード")
    firstname: str | None = Field(default=None, description="")
    lastname: str | None = Field(default=None, description="")
    disabled: bool | None = Field(default=False, description="無効")

    model_config = ConfigDict(
        populate_by_name=True,          # エイリアス名でのアクセスを許可するか(例えば id と _id)
        arbitrary_types_allowed=True,   # 任意の型を許可するか
        json_schema_extra={
            "example": {
                "username": "ユーザー名",
                "password": "パスワード",
                "firstname": "",
                "lastname": "",
                "disabled": False
            }
        },
    )

class UserCollection(BaseModel):
    users: list[UserModel]

トークン情報スキーマ(api/schemas/token.py)

トークン情報に関わるスキーマ(APIのリクエストとレスポンスの定義)です。

from pydantic import BaseModel

class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: str | None = None

プログラム実行

.env ファイルを作成して、URLやJWTを設定

MONGO_URI = "mongodb://[user]:[password]@127.0.0.1:27017/[db]"
MONGO_DATABASE = "holoduledb"
JWT_SECRET_KEY = "[JWT_SECRET_KEY]"
JWT_ALGORITHM = "HS256"
JWT_EXP_DELTA_MINUTES = 15

lounch.json の設定(デバッグ実行用)

{
    "version": "1.0.0",
    "configurations": [
        {
            "name": "Python: FastAPI",
            "type": "python",
            "request": "launch",
            "module": "uvicorn",
            "args": [
                "api.main:app",
                "--reload",
                "--port",
                "8001"
            ],
            "jinja": true,
            "justMyCode": true
        }
    ]
}

プログラムの実行

ASGI フレームワークで開発された FastAPI アプリを実行するため、ASGI(Asynchronous Server Gateway Interface)サーバーである uvicorn を利用します。

> poetry run uvicorn api.main:app --reload --port 8001

たとえば、HTTPS を Nginx で終端させて FastAPI へリバースプロキシさせる場合、下記のように Nginx のコンフィグにプロキシパスを設定し、FastAPI の実行時にルートパスを指定しておきます。

http {
    server {
        listen  80;
        listen 443 ssl;

        ...

        location /holoservice/ {
            proxy_pass http://127.0.0.1:8001/;
        }
    }
}
> poetry run uvicorn api.main:app --reload --port 8001 --root-path /holoservice

実行結果

uvicorn により ASGI サーバーが起動します。

INFO:     Will watch for changes in these directories: [...]
INFO:     Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)
INFO:     Started reloader process [31616] using WatchFiles
INFO:     Started server process [23480]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     127.0.0.1:54201 - "GET / HTTP/1.1" 404 Not Found
INFO:     127.0.0.1:54201 - "GET /favicon.ico HTTP/1.1" 404 Not Found

サーバー起動後、ブラウザから下記にアクセスします。

http://127.0.0.1:8001/docs

ブラウザに Swagger UI が表示されます。(/docs のほかに /redoc も利用できます)

img01.png

Swagger UI の [POST] /users から、認証のためのユーザーを登録しておきます。

img02.png

Swagger UI の [Authorize] から、登録したユーザーの username と password で認証します。

img03.png

認証が完了したら Close で閉じます。

img04.png

認証後に Swagger UI の [GET] /schedules から、日付を指定して配信情報を取得してみます。

img05.png

HTTPステータス 200 でリクエストが成功し、MongoDB から取得した配信情報がレスポンスされました。

おわりに

今回は、FastAPI を利用して、収集したホロライブの配信予定や動画情報を Android アプリや React アプリ から利用するためのバックエンド API を開発しました。

FastAPI はシンプルでとても使いやすく、Pydantic を活用したスキーマの定義や、Depends による依存性注入の効果によるものだと思います。

引き続き、この API を利用したフロントエンド側を React(TypeScript) か Android アプリ(Kotlin)で開発してみます。

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3