0
0

【FastAPI JWT tutorial】パスワード(およびハッシュ化)によるOAuth2、JWTトークンによるBearer

Last updated at Posted at 2024-08-05

はじめに

以下のチュートリアルをハンズオンします。

JWT(JSON Web Token)とは?

JWT(JSON Web Token)とは、JSON形式で表現された認証情報などをURL文字列などとして安全に送受信できるよう、符号化やデジタル署名の仕組みを規定した標準規格。
https://e-words.jp/w/JWT.html#google_vignette

要するに、「JSON ベースのデータを暗号化してつくられる文字列」のこと。

以下の解説も分かりやすい。

セットアップ

requirements.txt
# パスワードのハッシュを処理するため
bcrypt==4.0.1
passlib[bcrypt]

# JWTトークンの生成と検証を行うため
pyjwt
pip install -r requirements.txt

ドキュメントでは、以下のように書かれていたが、上記モジュールでないとサンプルは動作しなかった。

PythonでJWTトークンの生成と検証を行うために、python-joseをインストールする必要があります

下準備

JWTトークンの署名に使用されるランダムな秘密鍵を生成

openssl rand -hex 32
c4c573b0ba30dbf7aed6614d74372409ec0e1f10c4746e558f84aff97d7222ee

本来はこのキーはソースコードに含めてはならず、環境変数などで読み込むようにするなど、管理を徹底する必要がある。

ソース

app.py
from datetime import datetime, timedelta, timezone
from typing import Union

import jwt
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "c4c573b0ba30dbf7aed6614d74372409ec0e1f10c4746e558f84aff97d7222ee"
# JWTトークンの署名に使用するアルゴリズム
ALGORITHM = "HS256"
# トークンの有効期限
ACCESS_TOKEN_EXPIRE_MINUTES = 30

# passwordは「secret」(ハッシュ化されている)
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}

# トークンのデータモデル
class Token(BaseModel):
    access_token: str
    token_type: str

# トークンに含まれるデータモデル
class TokenData(BaseModel):
    username: Union[str, None] = None

# ユーザーのデータモデル
class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None

# データベース内のユーザーデータモデル
class UserInDB(User):
    hashed_password: str

# パスワードのハッシュ化に使用するコンテキスト
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# OAuth2スキーマの定義
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# FastAPIアプリケーションの初期化
app = FastAPI()

# パスワードの検証関数
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)

# パスワードのハッシュ化関数
def get_password_hash(password):
    return pwd_context.hash(password)

# ユーザー名からユーザー情報を取得する関数
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)

# ユーザーの認証関数
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user

# 新しいアクセストークンを生成する
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt

# 受け取ったトークンを復号して検証し、現在のユーザーを返す
async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
        token_data = TokenData(username=username)
    except InvalidTokenError:
        raise credentials_exception
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception
    return user

# 現在のアクティブなユーザーを取得する関数
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user

# トークンの有効期限を設定
# JWTアクセストークンを作成
# usernameとpasswordが正しければ、access_tokenをreturnする。
@app.post("/token")
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
    user = authenticate_user(fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={"sub": user.username}, expires_delta=access_token_expires
    )
    return Token(access_token=access_token, token_type="bearer")

# 現在のユーザー情報を取得するエンドポイント
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user

# 現在のユーザーのアイテムを取得するエンドポイント
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

実行・動作確認

FastAPI起動

uvicorn app:app --reload

ログインしてJWTトークンを取得

curl -X 'POST' \
  'http://localhost:8000/token' \
  -H 'accept: application/json' \
  -H 'Content-Type: application/x-www-form-urlencoded' \
  -d 'grant_type=&username=johndoe&password=secret&scope=&client_id=&client_secret='

以下のようにトークンが返ってくるればOK。

{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzIyODY5OTY2fQ.vL3deQZIoHFyliLBSycQVXHlH2gNlKBOkQSD0EMGl-4","token_type":"bearer"}

JWTトークンを使用してデータ取得

上記トークンを利用する。

curl -X 'GET' \
  'http://localhost:8000/users/me/' \
  -H 'accept: application/json' \
  -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzIyODY5OTY2fQ.vL3deQZIoHFyliLBSycQVXHlH2gNlKBOkQSD0EMGl-4'

以下のように結果が返ってくれば成功。

{"username":"johndoe","email":"johndoe@example.com","full_name":"John Doe","disabled":false}

参考

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0