1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

FastAPI × JWT で認証機能を実装してみた

Posted at

はじめに

前回は FastAPI と SQLAlchemy を使った基本的なユーザー管理 API を作成し、React でフロントエンド部分を実装しました。
今回はその続きとして、JWT による認証(ログイン/ログアウト)機能を最小構成で追加します。

概要

本記事では、以下の内容を解説します:

  • JWT を用いた認証(ログイン/ログアウト)の実装
  • 認証用スキーマ・サービス・ルーターの追加
  • 設定の拡張(Settings による環境変数管理)
  • アクセストークン(JWT)の発行と検証
  • Swagger UI による動作確認

プロジェクト構成(変更点のみ)

今回のプロジェクトでは、前回のユーザー登録 API をベースに、認証機能の実装を追加しています。
可読性と保守性のため、モデル・スキーマ・サービス・ルーターを分離した構成です。。

fastapi-test/
├── config.py                  # アプリ設定/DB接続(Settings, create_tables を追加)
├── docker-compose.yml
├── main.py                    # ルーター登録に auth を追加
├── routers/
│   └── auth.py                # 認証ルーター(login/logout)
├── schemas/
│   └── auth.py                # 認証用スキーマ
├── services/
│   └── auth.py                # 認証サービス(JWT発行・検証)
├── requirements.txt
└── tests/                     # テストコード(※今回は省略)
    └── test_auth.py

1. 設定項目の追加

1-1. 依存追加

requirements.txt

以下を追記してインストールしてください。

python-jose[cryptography]>=3.3.0
passlib[bcrypt]>=1.7.4
email-validator>=2.1.0
pip install -r requirements.txt

1-2. 設定の追加

config.py

JWT と DB 設定を環境変数/.env から読み込む Settings を追加します。from config import settings で参照できます。

"""アプリ設定とデータベース接続設定"""
from typing import Generator
from pydantic import Field
from pydantic_settings import BaseSettings
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session


class Settings(BaseSettings):
    secret_key: str = Field("change_me", env="SECRET_KEY")
    algorithm: str = Field("HS256", env="ALGORITHM")
    access_token_expire_minutes: int = Field(30, env="ACCESS_TOKEN_EXPIRE_MINUTES")
    database_url: str = Field(
        "postgresql://user:password@db:5432/fastapi-test", env="DATABASE_URL"
    )

    class Config:
        env_file = ".env"
        extra = "ignore"


settings = Settings()

engine = create_engine(settings.database_url)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()


def get_db() -> Generator[Session, None, None]:
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


def create_tables():
    from models.user import User  # テーブル作成に必要
    Base.metadata.create_all(bind=engine)

2. 認証機能の追加

2-1. スキーマ追加

schemas/auth.py

from typing import Optional
from pydantic import BaseModel, Field, EmailStr


class UserLogin(BaseModel):
    email: EmailStr = Field(..., description="メールアドレス")
    password: str = Field(..., description="パスワード")


class Token(BaseModel):
    access_token: str = Field(...)
    token_type: str = Field(default="bearer")


class TokenData(BaseModel):
    email: Optional[str] = None

2-2. サービス追加

services/auth.py

from datetime import datetime, timedelta
from typing import Optional
from sqlalchemy.orm import Session
from jose import JWTError, jwt
from fastapi import HTTPException, status, Header
from models.user import User
from schemas.auth import UserLogin, TokenData
from .users import UserService
from config import settings

SECRET_KEY = settings.secret_key
ALGORITHM = settings.algorithm
ACCESS_TOKEN_EXPIRE_MINUTES = settings.access_token_expire_minutes


class AuthService:
    @staticmethod
    def get_token_from_header(authorization: Optional[str] = Header(None)) -> Optional[str]:
        if not authorization or not authorization.startswith("Bearer "):
            return None
        return authorization.replace("Bearer ", "")

    @staticmethod
    def authenticate_user(db: Session, email: str, password: str) -> Optional[User]:
        user = UserService.get_user_by_email(db, email)
        if not user:
            return None
        if not UserService.verify_password(password, user.password):
            return None
        return user

    @staticmethod
    def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
        to_encode = data.copy()
        expire = datetime.utcnow() + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
        to_encode.update({"exp": expire})
        return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

    @staticmethod
    def verify_token(token: str) -> Optional[TokenData]:
        try:
            payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
            email: str = payload.get("sub")
            if email is None:
                return None
            return TokenData(email=email)
        except JWTError:
            return None

    @staticmethod
    def login_user(db: Session, user_login: UserLogin) -> dict:
        user = AuthService.authenticate_user(db, user_login.email, user_login.password)
        if not user:
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="メールアドレスまたはパスワードが正しくありません",
                headers={"WWW-Authenticate": "Bearer"},
            )
        access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
        access_token = AuthService.create_access_token(data={"sub": user.email}, expires_delta=access_token_expires)
        return {"access_token": access_token, "token_type": "bearer", "user": {"id": user.id, "name": user.name, "email": user.email}}

2-3. ルーター追加

routers/auth.py

from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from config import get_db
from schemas.auth import UserLogin, Token
from services.auth import AuthService

router = APIRouter(prefix="/api/auth", tags=["auth"])


@router.post("/login", response_model=Token)
async def login(user_login: UserLogin, db: Session = Depends(get_db)):
    try:
        result = AuthService.login_user(db, user_login)
        return Token(access_token=result["access_token"], token_type=result["token_type"])
    except HTTPException:
        raise
    except Exception:
        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="ログイン処理中にエラーが発生しました")


@router.post("/logout")
async def logout():
    return {"message": "ログアウトしました。クライアント側でトークンを削除してください。"}

2-4. ルーター登録

main.py

from routers import users, auth  # 追記: auth を追加

app.include_router(users.router)
app.include_router(auth.router)  # 追記

3. 起動と動作確認

3-1. コンテナを起動

docker compose up -d --build

3-2. Swagger UI でログインを確認

  1. ブラウザで http://localhost:8000/docs を開く
  2. POST /api/auth/login を開き、Try it out を押す
  3. リクエストボディに登録済みユーザーの認証情報を入力して実行

例(ボディ)

{
  "email": "user@example.com",
  "password": "P@ssw0rd"
}

成功時(例)

{
  "access_token": "eyJhbGciOiJIUz...",
  "token_type": "bearer"
}

失敗時(例)

{
  "detail": "メールアドレスまたはパスワードが正しくありません"
}

まとめ

本記事では、FastAPI に JWT 認証(ログイン/ログアウト)を追加し、Swagger UI からリクエストして挙動を確認できるところまで実装しました。

ただし UI と連動した体験がないと分かりにくい面もあるため、次回はフロントエンドと連携し、実際のログインフォームからログインできるように進めます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?