はじめに
こちらも以前に、収集したホロライブの配信予定や動画情報を Android アプリや React アプリ から利用するためのバックエンド API を作成しましたが、ホロコレクタをアップデートしたのと、Pydantic や DI も含めた FastAPI による開発が強力だったので、FastAPI 版としてあらためて作成しました。
ホロコレクタ で収集した配信情報や配信者情報の参照・追加・更新・削除と認証を行う WebAPI となります。
全体的な処理の流れは、FastAPI や MongoDB のチュートリアルを参考にしています。
JWT 認証
MongoDB の users コレクションからユーザー名をもとに user ドキュメントを取得し、ハッシュ化したパスワードとの一致を確認します。
jwt_settings = get_jwt_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def verify_password(plain_password, hashed_password) -> bool:
# パスワードが正しいかどうかを確認する
return pwd_context.verify(plain_password, hashed_password)
async def authenticate_user(username: str, password: str, db = Depends(get_db)) -> UserModel | None:
# ユーザーを認証する
user: UserModel = await UserRepository.get_by_username(db, username)
if not user:
return None
if not verify_password(password, user.password):
return None
return user
ユーザーを確認した後に、有効期限や秘密キーおよび暗号化アルゴリズムを指定して JWT のトークンを生成します。
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
# アクセストークンを作成する
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=jwt_settings.exp_delta_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, jwt_settings.secret_key, algorithm=jwt_settings.algorithm)
return encoded_jwt
ルーター側の get_token 非同期関数(post オペレーション)は、フォームデータと DB インターフェースを依存注入しています。
フォームデータのユーザー名とパスワードおよび DB インターフェースを利用して認証を行い、Bearerタイプのトークンを生成して返却します。
@router.post('/token', response_model=Token)
async def get_token(request: OAuth2PasswordRequestForm = Depends(),
db: AgnosticDatabase = Depends(get_db)) -> dict:
user: UserModel = await authenticate_user(request.username, request.password, db)
if not user:
raise CredentialsException(message='Incorrect username or password')
access_token: str = create_access_token(data={'sub': user.username})
return {
'access_token': access_token,
'token_type': 'bearer'
}
各オペレーションは、get_current_active_user 関数を依存注入しており、受け取ったトークンを復号して検証し、現在のユーザーを返します。
@router.get("/schedules",
response_model=ScheduleCollection,
response_model_by_alias=False)
async def list_schedules(date: date = None,
code: str = None,
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> ScheduleCollection:
return await ScheduleRepository.list(db, date, code)
get_current_active_user 関数はこのようにしています。
トークンから取得した情報からユーザーを特定できない場合に認証エラーとしています。
async def get_current_user(token: str = Depends(oauth2_scheme), db = Depends(get_db)) -> UserModel:
# トークンから現在のユーザーを非同期で取得する
try:
payload = jwt.decode(token, jwt_settings.secret_key, algorithms=jwt_settings.algorithm)
username: str = payload.get("sub")
if username is None:
raise CredentialsException()
token_data = TokenData(username=username)
except JWTError:
raise CredentialsException()
return await UserRepository.get_by_username(db, username=token_data.username)
async def get_current_active_user(current_user: UserModel = Depends(get_current_user)) -> UserModel:
# 現在のユーザーが有効かどうかを確認する
if current_user.disabled:
raise InactiveUserException(identifier=current_user.id)
return current_user
MongoDB の利用
MongoDB の接続には非同期ドライバである Motor を利用し、try~yield~finally を用いることで後処理も行っています。
import motor.motor_asyncio
from motor.core import AgnosticClient, AgnosticDatabase
from api.settings import get_mongo_settings
mongo_settings = get_mongo_settings()
def get_db() -> AgnosticDatabase:
try:
client: AgnosticClient = motor.motor_asyncio.AsyncIOMotorClient(mongo_settings.uri)
db: AgnosticDatabase = client.get_database(mongo_settings.database)
yield db
finally:
client.close()
開発環境
- Windows 11
- PowerShell 7.4.0
- Visual Studio Code 1.85
- Python 3.11 + Poetry + pyenv
- MongoDB 6.0
開発準備
Poetry と pyenv の確認
参考:Windows で Python の開発環境を構築する(Poetry と pyenv を利用)
> poetry --version
Poetry version 1.3.2
> pyenv --version
pyenv 3.1.1
MongoDB の確認
> mongosh --version
1.6.0
> mongosh localhost:27017/admin -u admin -p
データベースの作成とロール(今回は dbOwner )の設定
MongoDB > use holoduledb
MongoDB > db.createUser( { user:"owner", pwd:"password", roles:[{ "role" : "dbOwner", "db" : "holoduledb" }] } );
プロジェクト作成
プロジェクトを作成してディレクトリに入る
> poetry new holoservice --name api
Created package api in holoservice
> cd holoservice
プロジェクトで利用する Python を pyenv でインストール
> pyenv install 3.11.1
プロジェクトで利用するローカルの Python のバージョンを変更
> pyenv local 3.11.1
> python -V
Python 3.11.1
バージョンを指定して Python 仮想環境を作成(pyenv で管理している Python のパスを指定)
> python -c "import sys; print(sys.executable)"
> poetry env use C:\Users\[UserName]\.pyenv\pyenv-win\versions\3.11.1\python.exe
パッケージの追加
> poetry add pylint
> poetry add fastapi
> poetry add uvicorn[standard]
> poetry add motor
> poetry add pydantic-settings
> poetry add pytz
> poetry add python-multipart
> poetry add python-jose
> poetry add passlib[bcrypt]
プログラム実装
設定(api/settings.py)
.env から MongoDB と JWT の設定を読み込みます。
from functools import lru_cache
from pydantic_settings import BaseSettings, SettingsConfigDict
class MongoSettings(BaseSettings):
# MongoDB関連の設定
uri: str
database: str
model_config = SettingsConfigDict(env_file=".env", env_prefix='mongo_')
class JwtSettings(BaseSettings):
# JWT関連の設定
secret_key: str
algorithm: str = "HS256"
exp_delta_minutes: int = 15
model_config = SettingsConfigDict(env_file=".env", env_prefix='jwt_')
@lru_cache
def get_mongo_settings() -> MongoSettings:
# キャッシュしたMongoDB設定を取得する
return MongoSettings()
@lru_cache
def get_jwt_settings() -> JwtSettings:
# キャッシュしたJWT設定を取得する
return JwtSettings()
認証(api/oauth2.py)
JWT 認証やユーザーの有効性の確認を行います。
from fastapi import Depends
from fastapi.security import OAuth2PasswordBearer
from datetime import datetime, timedelta
from jose import jwt
from jose.exceptions import JWTError
from passlib.context import CryptContext
from api.repository.user import UserRepository
from api.schemas.token import TokenData
from api.settings import get_jwt_settings
from api.schemas.user import UserModel
from api.db import get_db
from api.exceptions import CredentialsException, InactiveUserException
jwt_settings = get_jwt_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
def get_hashed_password(password) -> str:
# パスワードをハッシュ化する
return pwd_context.hash(password)
def verify_password(plain_password, hashed_password) -> bool:
# パスワードが正しいかどうかを確認する
return pwd_context.verify(plain_password, hashed_password)
async def authenticate_user(username: str, password: str, db = Depends(get_db)) -> UserModel | None:
# ユーザーを認証する
user: UserModel = await UserRepository.get_by_username(db, username)
if not user:
return None
if not verify_password(password, user.password):
return None
return user
def create_access_token(data: dict, expires_delta: timedelta | None = None) -> str:
# アクセストークンを作成する
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=jwt_settings.exp_delta_minutes)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, jwt_settings.secret_key, algorithm=jwt_settings.algorithm)
return encoded_jwt
async def get_current_user(token: str = Depends(oauth2_scheme), db = Depends(get_db)) -> UserModel:
# アクセストークンから現在のユーザーを非同期で取得する
try:
payload = jwt.decode(token, jwt_settings.secret_key, algorithms=jwt_settings.algorithm)
username: str = payload.get("sub")
if username is None:
raise CredentialsException()
token_data = TokenData(username=username)
except JWTError:
raise CredentialsException()
return await UserRepository.get_by_username(db, username=token_data.username)
async def get_current_active_user(current_user: UserModel = Depends(get_current_user)) -> UserModel:
# 現在のユーザーが有効かどうかを確認する
if current_user.disabled:
raise InactiveUserException(identifier=current_user.id)
return current_user
ミドルウェア(api/middlewares.py)
ログ記録やエラー処理などを後で組み込む予定で準備だけしています。
from typing import Any
from fastapi import Request
from api.exceptions import BaseAPIException
async def request_handler(request: Request, call_next) -> Any:
try:
# すべてのリクエストがパスオペレーションで処理される前に機能する
return await call_next(request)
except Exception as ex:
if isinstance(ex, BaseAPIException):
return ex.response()
raise ex
データベース(api/db.py)
非同期ドライバである Motor を利用して MongoDB の接続を管理しています。
import motor.motor_asyncio
from motor.core import AgnosticClient, AgnosticDatabase
from api.settings import get_mongo_settings
mongo_settings = get_mongo_settings()
def get_db() -> AgnosticDatabase:
# MongoDB へ接続し、データベースを返却する
try:
client: AgnosticClient = motor.motor_asyncio.AsyncIOMotorClient(mongo_settings.uri)
db: AgnosticDatabase = client.get_database(mongo_settings.database)
yield db
finally:
client.close()
例外(api/exceptions.py)
アプリケーションで利用する例外やエラーのクラスを定義しています。
from typing import Type
from fastapi.responses import JSONResponse
from fastapi import status
from pydantic import BaseModel, Field
class BaseError(BaseModel):
message: str = Field(..., description="Error message or description")
class BaseIdentifiedError(BaseError):
identifier: str = Field(..., description="Unique identifier which this error references to")
class NotFoundError(BaseIdentifiedError):
pass
class AlreadyExistsError(BaseIdentifiedError):
pass
class InactiveUserError(BaseIdentifiedError):
pass
class CredentialsError(BaseError):
pass
class BaseAPIException(Exception):
message = "Generic error"
code = status.HTTP_500_INTERNAL_SERVER_ERROR
model = BaseError
headers = None
def __init__(self, **kwargs):
kwargs.setdefault("message", self.message)
self.message = kwargs["message"]
self.data = self.model(**kwargs)
def __str__(self):
return self.message
def response(self):
return JSONResponse(
content=self.data.model_dump(),
status_code=self.code,
headers=self.headers
)
@classmethod
def response_model(cls):
return {cls.code: {"model": cls.model}}
class BaseIdentifiedException(BaseAPIException):
message = "Entity error"
code = status.HTTP_500_INTERNAL_SERVER_ERROR
model = BaseIdentifiedError
def __init__(self, identifier, **kwargs):
super().__init__(identifier=identifier, **kwargs)
class NotFoundException(BaseIdentifiedException):
message = "The entity does not exist"
code = status.HTTP_404_NOT_FOUND
model = NotFoundError
class AlreadyExistsException(BaseIdentifiedException):
message = "The entity already exists"
code = status.HTTP_409_CONFLICT
model = AlreadyExistsError
class InactiveUserException(BaseIdentifiedException):
message = "Inactive user"
code = status.HTTP_400_BAD_REQUEST
model = InactiveUserError
class CredentialsException(BaseAPIException):
message = "Colud not validate credentials"
code = status.HTTP_401_UNAUTHORIZED,
model = CredentialsError
headers = {'WWW-Authenticate': "Bearer"}
def get_exception_responses(*args: Type[BaseAPIException]) -> dict:
responses = dict()
for cls in args:
responses.update(cls.response_model())
return responses
メイン(api/main.py)
このアプリケーションのエントリポイントです。
FastAPIのインスタンスを作成して初期化しています。
from fastapi import FastAPI, Request, status
from fastapi.exceptions import RequestValidationError
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
from api.routers import schedule
from api.routers import streamer
from api.routers import user
from api.routers import authentication
from api.middlewares import request_handler
# FastAPIインスタンスを作成
app = FastAPI(
title="Holodule API Service",
summary="ホロジュールから取得した配信スケジュールを API として提供します。",
)
# HTTPリクエストの前後で処理を実行するためのミドルウェアを登録
app.middleware("http")(request_handler)
# CORS設定
origins: list = ["*"] # とりあえず全てのオリジンを許可
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# FastAPIルーターを登録
app.include_router(schedule.router)
app.include_router(streamer.router)
app.include_router(user.router)
app.include_router(authentication.router)
# FastAPIのエラーハンドラーを登録
@app.exception_handler(RequestValidationError)
async def handler(request:Request, exc:RequestValidationError) -> JSONResponse:
print(exc)
return JSONResponse(content={}, status_code=status.HTTP_422_UNPROCESSABLE_ENTITY)
配信情報ルーター(api/routers/schedule.py)
配信情報に関わる FastAPI のパスオペレーション関数群です。
from datetime import date
from fastapi import APIRouter, Body, Depends, Response, status
from motor.core import AgnosticDatabase
from api.oauth2 import get_current_active_user
from api.db import get_db
from api.repository.schedule import ScheduleRepository
from api.schemas.schedule import ScheduleModel, ScheduleCollection
router = APIRouter(
tags=['schedule']
)
@router.get("/schedules/{id}",
response_model=ScheduleModel,
response_model_by_alias=False)
async def get_schedule(id: str,
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> ScheduleModel:
# id を指定してスケジュールを取得
return await ScheduleRepository.get_by_id(db, id)
@router.get("/schedules",
response_model=ScheduleCollection,
response_model_by_alias=False)
async def list_schedules(date: date = None,
code: str = None,
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> ScheduleCollection:
# date や code を指定してスケジュールを取得
return await ScheduleRepository.list(db, date, code)
@router.post("/schedules",
response_model=ScheduleModel,
status_code=status.HTTP_201_CREATED,
response_model_by_alias=False)
async def create_schedule(schedule: ScheduleModel = Body(...),
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> ScheduleModel:
# スケジュールを登録
return await ScheduleRepository.create(db, schedule)
@router.put("/schedules/{id}",
response_model=ScheduleModel,
response_model_by_alias=False)
async def update_schedule(id: str,
schedule: ScheduleModel = Body(...),
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> ScheduleModel:
# スケジュールを更新
return await ScheduleRepository.update(db, id, schedule)
@router.delete("/schedules/{id}")
async def delete_schedule(id: str,
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> Response:
# スケジュールをさくじょ
await ScheduleRepository.delete(db, id);
return Response(status_code=status.HTTP_204_NO_CONTENT)
配信者情報ルーター(api/routers/streamer.py)
配信者情報に関わる FastAPI のパスオペレーション関数群です。
from fastapi import APIRouter, Body, Depends, Response, status
from motor.core import AgnosticDatabase
from api.oauth2 import get_current_active_user
from api.db import get_db
from api.repository.streamer import StreamerRepository
from api.schemas.streamer import StreamerModel, StreamerCollection
router = APIRouter(
tags=['streamer']
)
@router.get("/streamers/{id}",
response_model=StreamerModel,
response_model_by_alias=False)
async def get_streamer(id: str,
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> StreamerModel:
# id を指定して配信者情報を取得
return await StreamerRepository.get_by_id(db, id);
@router.get("/streamers",
response_model=StreamerCollection,
response_model_by_alias=False)
async def list_streamers(db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> StreamerCollection:
# 配信者情報を取得
return await StreamerRepository.list(db);
@router.post("/streamers",
response_model=StreamerModel,
status_code=status.HTTP_201_CREATED,
response_model_by_alias=False)
async def create_streamer(streamer: StreamerModel = Body(...),
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> StreamerModel:
# 配信者情報を登録
return await StreamerRepository.create(db, streamer);
@router.put("/streamers/{id}",
response_model=StreamerModel,
response_model_by_alias=False)
async def update_streamer(id: str,
streamer: StreamerModel = Body(...),
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> StreamerModel:
# 配信者情報を更新
return await StreamerRepository.update(db, id, streamer);
@router.delete("/streamers/{id}")
async def delete_streamer(id: str,
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> Response:
# 配信者情報を削除
await StreamerRepository.delete(db, id);
return Response(status_code=status.HTTP_204_NO_CONTENT)
ユーザー情報ルーター(api/routers/user.py)
ユーザー情報に関わる FastAPI のパスオペレーション関数群です。
from fastapi import APIRouter, Body, Depends, Response, status
from motor.core import AgnosticDatabase
from api.oauth2 import get_current_active_user, get_hashed_password
from api.db import get_db
from api.repository.user import UserRepository
from api.schemas.user import UserModel, UserCollection
router = APIRouter(
tags=['user']
)
@router.get("/users/{id}",
response_model=UserModel,
response_model_by_alias=False)
async def get_user(id: str,
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> UserModel:
# id を指定してユーザーを取得
return await UserRepository.get_by_id(db, id);
@router.get("/users",
response_model=UserCollection,
response_model_by_alias=False)
async def list_users(db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> UserCollection:
# ユーザーを取得
return await UserRepository.list(db);
@router.post("/users",
response_model=UserModel,
status_code=status.HTTP_201_CREATED,
response_model_by_alias=False)
# 認証無効化(利用ユーザー新規登録時)
# async def create_user(user: UserModel = Body(...),
# db: AgnosticDatabase = Depends(get_db),
# current_user: str = Depends(get_current_active_user)) -> UserModel:
async def create_user(user: UserModel = Body(...),
db: AgnosticDatabase = Depends(get_db)) -> UserModel:
# パスワードをハッシュ化してユーザーを登録
if user.password is not None:
user.password = get_hashed_password(user.password)
return await UserRepository.create(db, user);
@router.put("/users/{id}",
response_model=UserModel,
response_model_by_alias=False)
async def update_user(id: str,
user: UserModel = Body(...),
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> UserModel:
# パスワードをハッシュ化してユーザーを更新
if user.password is not None:
user.password = get_hashed_password(user.password)
return await UserRepository.update(db, id, user);
@router.delete("/users/{id}")
async def delete_user(id: str,
db: AgnosticDatabase = Depends(get_db),
current_user: str = Depends(get_current_active_user)) -> Response:
# ユーザーを削除
await UserRepository.delete(db, id);
return Response(status_code=status.HTTP_204_NO_CONTENT)
認証ルーター(api/routers/authentication.py)
認証に関わる FastAPI のパスオペレーション関数群です。
from fastapi import APIRouter
from fastapi.param_functions import Depends
from fastapi.security.oauth2 import OAuth2PasswordRequestForm
from motor.core import AgnosticDatabase
from api.db import get_db
from api.exceptions import CredentialsException
from api.oauth2 import authenticate_user, create_access_token
from api.schemas.token import Token
from api.schemas.user import UserModel
router = APIRouter(
tags=['authentication']
)
@router.post('/token', response_model=Token)
async def get_token(request: OAuth2PasswordRequestForm = Depends(),
db: AgnosticDatabase = Depends(get_db)) -> dict:
# ユーザー名とパスワードを指定して認証を行いトークンを発行
user: UserModel = await authenticate_user(request.username, request.password, db)
if not user:
raise CredentialsException(message='Incorrect username or password')
access_token: str = create_access_token(data={'sub': user.username})
return {
'access_token': access_token,
'token_type': 'bearer'
}
配信情報リポジトリ(api/repository/schedule.py)
配信情報に関わるリポジトリパターンの関数群です。
from datetime import datetime, timedelta, date, time
from bson import ObjectId
from pymongo import ReturnDocument
from motor.core import AgnosticDatabase
from api.schemas.schedule import ScheduleModel, ScheduleCollection
from api.exceptions import NotFoundException
class ScheduleRepository:
@staticmethod
async def get_by_id(db: AgnosticDatabase,
id: str) -> ScheduleModel:
schedule_collection = db.get_collection("schedules")
# ウォルラス演算子を利用して schedule が None でない場合に schedule を返す
if (schedule := await schedule_collection.find_one({"_id": ObjectId(id)})) is not None:
return schedule
raise NotFoundException(identifier=id)
@staticmethod
async def list(db: AgnosticDatabase,
date: date = None,
code: str = None) -> ScheduleCollection:
schedule_collection = db.get_collection("schedules")
filter_dict = {}
if date is not None:
start = datetime.combine(date, time())
end = start + timedelta(days=1)
filter_dict["streaming_at"] = {'$gte': start, '$lt': end}
if code is not None:
filter_dict["code"] = code
return ScheduleCollection(schedules=await schedule_collection.find(filter_dict)
.sort("streaming_at", -1)
.to_list(1000))
@staticmethod
async def create(db: AgnosticDatabase,
schedule: ScheduleModel) -> ScheduleModel:
schedule_collection = db.get_collection("schedules")
result = await schedule_collection.insert_one(
schedule.model_dump(by_alias=True, exclude=["id"])
)
return await ScheduleRepository.get_by_id(db, result.inserted_id)
@staticmethod
async def update(db: AgnosticDatabase,
id: str,
schedule: ScheduleModel) -> ScheduleModel:
schedule_collection = db.get_collection("schedules")
# ユーザーのキーと値を取得し、辞書内包表記を利用して値が None でない場合に update_schedule に格納する
update_schedule = {
k: v for k, v in schedule.model_dump(by_alias=True).items() if v is not None
}
if len(update_schedule) >= 1:
result = await schedule_collection.find_one_and_update(
{"_id": ObjectId(id)},
{"$set": update_schedule},
return_document=ReturnDocument.AFTER, # 更新後のドキュメントを返す
)
if result is not None:
return result
else:
raise NotFoundException(identifier=id)
@staticmethod
async def delete(db: AgnosticDatabase,
id: str):
schedule_collection = db.get_collection("schedules")
result = await schedule_collection.delete_one({"_id": ObjectId(id)})
if not result.deleted_count:
raise NotFoundException(identifier=id)
配信者情報リポジトリ(api/repository/streamer.py)
配信者情報に関わるリポジトリパターンの関数群です。
from bson import ObjectId
from pymongo import ReturnDocument
from motor.core import AgnosticDatabase
from api.schemas.streamer import StreamerModel, StreamerCollection
from api.exceptions import NotFoundException
class StreamerRepository:
@staticmethod
async def get_by_id(db: AgnosticDatabase,
id: str) -> StreamerModel:
streamer_collection = db.get_collection("streamers")
# ウォルラス演算子を利用して streamer が None でない場合に streamer を返す
if (streamer := await streamer_collection.find_one({"_id": ObjectId(id)})) is not None:
return StreamerModel(**streamer)
raise NotFoundException(identifier=id)
@staticmethod
async def get_by_streamername(db: AgnosticDatabase,
streamername: str) -> StreamerModel:
streamer_collection = db.get_collection("streamers")
# ウォルラス演算子を利用して streamer が None でない場合に streamer を返す
if (streamer := await streamer_collection.find_one({"name": streamername})) is not None:
return StreamerModel(**streamer)
raise NotFoundException(identifier=streamername)
@staticmethod
async def list(db: AgnosticDatabase) -> StreamerCollection:
streamer_collection = db.get_collection("streamers")
return StreamerCollection(streamers=await streamer_collection.find().to_list(1000))
@staticmethod
async def create(db: AgnosticDatabase,
streamer: StreamerModel) -> StreamerModel:
streamer_collection = db.get_collection("streamers")
result = await streamer_collection.insert_one(
streamer.model_dump(by_alias=True, exclude=["id"])
)
return await StreamerRepository.get_by_id(db, result.inserted_id)
@staticmethod
async def update(db: AgnosticDatabase,
id: str,
streamer: StreamerModel) -> StreamerModel:
streamer_collection = db.get_collection("streamers")
# ストリーマーのキーと値を取得し、辞書内包表記を利用して値が None でない場合に update_streamer に格納する
update_streamer = {
k: v for k, v in streamer.model_dump(by_alias=True).items() if v is not None
}
# 更新対象が存在する場合は更新する
if len(update_streamer) >= 1:
result = await streamer_collection.find_one_and_update(
{"_id": ObjectId(id)},
{"$set": update_streamer},
return_document=ReturnDocument.AFTER, # 更新後のドキュメントを返す
)
if result is not None:
return result
else:
raise NotFoundException(identifier=id)
@staticmethod
async def delete(db: AgnosticDatabase,
id: str):
streamer_collection = db.get_collection("streamers")
result = await streamer_collection.delete_one({"_id": ObjectId(id)})
if not result.deleted_count:
raise NotFoundException(identifier=id)
ユーザー情報リポジトリ(api/repository/user.py)
ユーザー情報に関わるリポジトリパターンの関数群です。
from bson import ObjectId
from pymongo import ReturnDocument
from motor.core import AgnosticDatabase
from api.schemas.user import UserModel, UserCollection
from api.exceptions import NotFoundException
class UserRepository:
@staticmethod
async def get_by_id(db: AgnosticDatabase,
id: str) -> UserModel:
user_collection = db.get_collection("users")
# ウォルラス演算子を利用して user が None でない場合に user を返す
if (user := await user_collection.find_one({"_id": ObjectId(id)})) is not None:
return UserModel(**user)
raise NotFoundException(identifier=id)
@staticmethod
async def get_by_username(db: AgnosticDatabase,
username: str) -> UserModel:
user_collection = db.get_collection("users")
# ウォルラス演算子を利用して user が None でない場合に user を返す
if (user := await user_collection.find_one({"username": username})) is not None:
return UserModel(**user)
raise NotFoundException(identifier=username)
@staticmethod
async def list(db: AgnosticDatabase) -> UserCollection:
user_collection = db.get_collection("users")
return UserCollection(users=await user_collection.find().to_list(1000))
@staticmethod
async def create(db: AgnosticDatabase,
user: UserModel) -> UserModel:
user_collection = db.get_collection("users")
result = await user_collection.insert_one(
user.model_dump(by_alias=True, exclude=["id"])
)
return await UserRepository.get_by_id(db, result.inserted_id)
@staticmethod
async def update(db: AgnosticDatabase,
id: str,
user: UserModel) -> UserModel:
user_collection = db.get_collection("users")
# ユーザーのキーと値を取得し、辞書内包表記を利用して値が None でない場合に update_user に格納する
update_user = {
k: v for k, v in user.model_dump(by_alias=True).items() if v is not None
}
# 更新対象が存在する場合は更新する
if len(update_user) >= 1:
result = await user_collection.find_one_and_update(
{"_id": ObjectId(id)},
{"$set": update_user},
return_document=ReturnDocument.AFTER, # 更新後のドキュメントを返す
)
if result is not None:
return result
else:
raise NotFoundException(identifier=id)
@staticmethod
async def delete(db: AgnosticDatabase,
id: str):
user_collection = db.get_collection("users")
result = await user_collection.delete_one({"_id": ObjectId(id)})
if not result.deleted_count:
raise NotFoundException(identifier=id)
配信情報スキーマ(api/schemas/schedule.py)
配信情報に関わるスキーマ(APIのリクエストとレスポンスの定義)です。
from datetime import datetime, timezone, timedelta
from typing_extensions import Annotated
from pydantic import BaseModel, ConfigDict, Field, computed_field
from pydantic.functional_validators import BeforeValidator
PyObjectId = Annotated[str, BeforeValidator(str)]
UTC = timezone.utc
JST = timezone(timedelta(hours=+9), "JST")
class ScheduleModel(BaseModel):
id: PyObjectId | None = Field(alias="_id", default=None, description="スケジュールID")
code: str | None = Field(default=None, description="配信者コード")
video_id: str | None = Field(default=None, description="動画ID")
streaming_at: datetime | None = Field(default_factory=lambda: datetime.now(tz=JST), description="配信日時")
name: str | None = Field(default=None, description="配信者名")
title: str | None = Field(default=None, description="タイトル")
url: str | None = Field(default=None, description="Youtube URL")
description: str | None = Field(default=None, description="概要")
published_at: datetime | None = Field(default_factory=lambda: datetime.now(tz=JST), description="投稿日時")
channel_id: str | None = Field(default=None, description="チャンネルID")
channel_title: str | None = Field(default=None, description="チャンネル名")
tags: list[str] = Field(default_factory=list, description="タグ")
@computed_field
@property
def key(self) -> str:
return self.code + "_" + self.streaming_at.strftime("%Y%m%d_%H%M%S") if (self.code is not None and self.streaming_at is not None) else ""
model_config = ConfigDict(
populate_by_name=True, # エイリアス名でのアクセスを許可するか(例えば id と _id)
arbitrary_types_allowed=True, # 任意の型を許可するか
json_schema_extra={
"example": {
"code": "HL0000",
"video_id": "動画ID",
"streaming_at": "2023-12-01T12:00:00Z",
"name": "配信者名",
"title": "タイトル",
"url": "Youtube URL",
"description": "概要",
"published_at": "2023-12-01T12:00:00Z",
"channel_id": "チャンネルID",
"channel_title": "チャンネル名",
"tags": []
}
},
)
class ScheduleCollection(BaseModel):
schedules: list[ScheduleModel]
配信者情報スキーマ(api/schemas/streamer.py)
配信者情報に関わるスキーマ(APIのリクエストとレスポンスの定義)です。
from typing_extensions import Annotated
from pydantic import BaseModel, ConfigDict, Field
from pydantic.functional_validators import BeforeValidator
PyObjectId = Annotated[str, BeforeValidator(str)]
class StreamerModel(BaseModel):
id: PyObjectId | None = Field(alias="_id", default=None, description="ストリーマーID")
code: str | None = Field(default=None, description="ストリーマーコード")
name: str | None = Field(default=None, description="ストリーマー名")
group: str | None = Field(default=None, description="グループ")
affiliations: list[str] | None = Field(default=None, description="所属")
image_name: str | None = Field(default=None, description="画像名")
channel_id: str | None = Field(default=None, description="チャンネルID")
is_retired: bool | None = Field(default=False, description="引退済み")
model_config = ConfigDict(
populate_by_name=True, # エイリアス名でのアクセスを許可するか(例えば id と _id)
arbitrary_types_allowed=True, # 任意の型を許可するか
json_schema_extra={
"example": {
"code": "HL0000",
"name": "ホロライブ",
"group": "hololive",
"affiliations": ['bland', 'jp'],
"image_name": "hololive.jpg",
"channel_id": "@hololive",
"is_retired": False
}
},
)
class StreamerCollection(BaseModel):
streamers: list[StreamerModel]
ユーザー情報スキーマ(api/schemas/user.py)
ユーザー情報に関わるスキーマ(APIのリクエストとレスポンスの定義)です。
from typing_extensions import Annotated
from pydantic import BaseModel, ConfigDict, Field
from pydantic.functional_validators import BeforeValidator
PyObjectId = Annotated[str, BeforeValidator(str)]
class UserModel(BaseModel):
id: PyObjectId | None = Field(alias="_id", default=None, description="ユーザーID")
username: str | None = Field(default=None, description="ユーザー名")
password: str | None = Field(default=None, description="パスワード")
firstname: str | None = Field(default=None, description="名")
lastname: str | None = Field(default=None, description="姓")
disabled: bool | None = Field(default=False, description="無効")
model_config = ConfigDict(
populate_by_name=True, # エイリアス名でのアクセスを許可するか(例えば id と _id)
arbitrary_types_allowed=True, # 任意の型を許可するか
json_schema_extra={
"example": {
"username": "ユーザー名",
"password": "パスワード",
"firstname": "名",
"lastname": "姓",
"disabled": False
}
},
)
class UserCollection(BaseModel):
users: list[UserModel]
トークン情報スキーマ(api/schemas/token.py)
トークン情報に関わるスキーマ(APIのリクエストとレスポンスの定義)です。
from pydantic import BaseModel
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: str | None = None
プログラム実行
.env ファイルを作成して、URLやJWTを設定
MONGO_URI = "mongodb://[user]:[password]@127.0.0.1:27017/[db]"
MONGO_DATABASE = "holoduledb"
JWT_SECRET_KEY = "[JWT_SECRET_KEY]"
JWT_ALGORITHM = "HS256"
JWT_EXP_DELTA_MINUTES = 15
lounch.json の設定(デバッグ実行用)
{
"version": "1.0.0",
"configurations": [
{
"name": "Python: FastAPI",
"type": "python",
"request": "launch",
"module": "uvicorn",
"args": [
"api.main:app",
"--reload",
"--port",
"8001"
],
"jinja": true,
"justMyCode": true
}
]
}
プログラムの実行
ASGI フレームワークで開発された FastAPI アプリを実行するため、ASGI(Asynchronous Server Gateway Interface)サーバーである uvicorn を利用します。
> poetry run uvicorn api.main:app --reload --port 8001
たとえば、HTTPS を Nginx で終端させて FastAPI へリバースプロキシさせる場合、下記のように Nginx のコンフィグにプロキシパスを設定し、FastAPI の実行時にルートパスを指定しておきます。
http {
server {
listen 80;
listen 443 ssl;
...
location /holoservice/ {
proxy_pass http://127.0.0.1:8001/;
}
}
}
> poetry run uvicorn api.main:app --reload --port 8001 --root-path /holoservice
実行結果
uvicorn により ASGI サーバーが起動します。
INFO: Will watch for changes in these directories: [...]
INFO: Uvicorn running on http://127.0.0.1:8001 (Press CTRL+C to quit)
INFO: Started reloader process [31616] using WatchFiles
INFO: Started server process [23480]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: 127.0.0.1:54201 - "GET / HTTP/1.1" 404 Not Found
INFO: 127.0.0.1:54201 - "GET /favicon.ico HTTP/1.1" 404 Not Found
サーバー起動後、ブラウザから下記にアクセスします。
http://127.0.0.1:8001/docs
ブラウザに Swagger UI が表示されます。(/docs のほかに /redoc も利用できます)
Swagger UI の [POST] /users から、認証のためのユーザーを登録しておきます。
Swagger UI の [Authorize] から、登録したユーザーの username と password で認証します。
認証が完了したら Close で閉じます。
認証後に Swagger UI の [GET] /schedules から、日付を指定して配信情報を取得してみます。
HTTPステータス 200 でリクエストが成功し、MongoDB から取得した配信情報がレスポンスされました。
おわりに
今回は、FastAPI を利用して、収集したホロライブの配信予定や動画情報を Android アプリや React アプリ から利用するためのバックエンド API を開発しました。
FastAPI はシンプルでとても使いやすく、Pydantic を活用したスキーマの定義や、Depends による依存性注入の効果によるものだと思います。
引き続き、この API を利用したフロントエンド側を React(TypeScript) か Android アプリ(Kotlin)で開発してみます。