3
2

【FastAPI】JWTで認証するな!〜セッションを使って認証をする方法〜

Last updated at Posted at 2024-08-13

はじめに

私は今、LCA分析用のシミュレーションツールを開発するプロジェクトに関わっています。
このツールはWebアプリケーションであり、バックエンドをFastAPI、フロントエンドをReactで開発し、これらをRESTful APIで繋いでいます。
このツールにおいて認証の処理が必要になったのですが、公式ドキュメントに書かれていたのは認可の方法であり、認証の方法は紹介されていませんでした。
そこで今回は、FastAPIにおいてセッションを使って認証を行う方法を紹介します。

背景知識

認証と認可の違い

まず、認証と認可の違いをはっきりとさせておく。

名称 説明
認証(authentication) 誰なのかを特定すること
認可(authorization) 権限を与えること

OAuth2は認可のための仕組みであり、認証のために使ってはいけない。

以下のFastAPIの公式ドキュメントはOAuth2においてJWTを使った認可の方法を紹介したものであり、認証の方法を紹介したものではない。

セッションid vs JWT

セッションidは認証、JWTは認可に使うものである。

セッションidは例えばDjangoでは32文字のランダムな文字列1、Flaskでは<ペイロードをエンコードした文字列>.<タイムスタンプをエンコードした文字列>.<署名をエンコードした文字列>の形式2で管理している。

一方、JWTの形式は以下のようなものである3

JWTの形式.
<ヘッダーをエンコードした文字列>.<ペイロードをエンコードした文字列>.<署名をエンコードした文字列>
JWTの例.
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiYWRtaW4iOnRydWV9.TJVA95OrM7E2cBab30RMHrHDcEfxjoYZgeFONFh7HgQ

ヘッダーにはJWTの署名検証を行うために必要な情報が、ペイロードにはユーザ名などの任意のデータがjsonの形式で保存されている3

メモリ vs cookies vs localStorage vs sessionStorage

セッションidやJWTをどこに保存しておくべきか?

ブラウザに情報を保存する場所として以下の4つが挙げられる4

名称 特徴
メモリ ReactでいうuseStateなどを使った方法。ページをリロードするとデータは失われる。
cookies データを任意の期間保存しておける。HttpOnlyフラグをTrueにしておけばJavaScriptからのアクセスを防ぐことができる。
localStorage データは永続的に保存される。XSS攻撃の危険性あり。
sessionStorage ページの再読み込みやブラウザの再起動後にデータが失われる。XSS攻撃の危険性あり。

使いやすさとセキュリティを考えるとセッションidやJWTはcookiesに保存するべきである。

認証方法

ここからはサンプルコードと合わせて認証方法を紹介していく。

1. JWTをcookiesで管理する方法(非推奨)

  1. pip install fastapi python-jose[cryptography] passlib[bcrypt] uvicorn python-multipart
  2. サンプルコード1をmain_jwt.pyに貼り付ける
  3. python main_jwt.py
  4. localhost:8000/docsにアクセス
サンプルコード1
from datetime import datetime, timedelta, timezone
from typing import Any, Union

import uvicorn
from fastapi import Cookie, Depends, FastAPI, HTTPException, Response, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import jwt
from passlib.context import CryptContext
from pydantic import BaseModel

# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30


# ユーザ名: johndoe, パスワード: secret
fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}


class Token(BaseModel):
    access_token: str
    token_type: str


class TokenData(BaseModel):
    username: Union[str, None] = None


class User(BaseModel):
    username: str
    email: Union[str, None] = None
    full_name: Union[str, None] = None
    disabled: Union[bool, None] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

app = FastAPI()


# plain_passwordとhashed_passwordが一致するか確認する
# plain_passwordはユーザから入力されたパスワード、hashed_passwordはDBに保存されているハッシュ化されたパスワード
def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


# パスワードをハッシュ化する
def get_password_hash(password):
    return pwd_context.hash(password)


# DBからユーザー情報を取得する
def get_user(db, username: str | None):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


# ユーザー認証を行う
def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


# /tokenエンドポイントにアクセスしたときにこの関数が呼び出されて、アクセストークンを生成する
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
    to_encode = data.copy()
    # もし有効期限が設定されていれば、有効期限を設定する
    if expires_delta:
        expire = datetime.now(timezone.utc) + expires_delta
    # 有効期限が設定されていない場合は、15分後に有効期限を設定する
    else:
        expire = datetime.now(timezone.utc) + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    # トークン = { "sub": "johndoe", "exp": 1618312741.0 } のようなデータをエンコードしたもの
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(token: str = Cookie(None)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: Any | None = payload.get("sub")
        token_data = TokenData(username=username)
    except JWTError:
        raise credentials_exception

    try:
        user = get_user(fake_users_db, username=token_data.username)
    except Exception:
        raise credentials_exception

    if user is None:
        raise credentials_exception
    return user


# ユーザーが有効かどうかを確認する
# disabledはユーザーが無効かどうかを示すフラグ
# ユーザが無効というのは、ユーザーが削除されたか、アカウントがロックされたか、何らかの理由でアクセスが制限されている状態を指す
async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


# もしパスワードがあっていれば、アクセストークンを返す
# アクセストークンはCookieやHeaderに設定されるもので以下の形式で返される
# {
#   "access_token": "string",
#   "token_type": "string"
# }
@app.post("/token")
async def login_for_access_token(response: Response, form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(
        fake_users_db,
        form_data.username,
        form_data.password
    )
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={
                "WWW-Authenticate": "Bearer error='incorrect username or password'"}
        )
    access_token_expires: timedelta = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    access_token = create_access_token(
        data={
            "sub": user.username
        },
        expires_delta=access_token_expires
    )
    # Reactとかで使う場合はレスポンスをlocalStorage, sessionStorage, cookieのどれかに保存しておくみたいな処理をすれば良い

    # cookiesにトークンを保存しておく
    token: Token = Token(access_token=access_token, token_type="bearer")
    expires: datetime = datetime.now(tz=timezone.utc) + access_token_expires
    response.set_cookie(key="token", value=token.access_token, expires=expires, httponly=True)

    return token


# ユーザー情報を取得する
# フローは以下の通り
# 1. ユーザがこのエンドポイントにアクセスする
# 2. get_current_active_user関数が呼び出される
# 3. get_current_user関数が呼び出される
# 4. トークンがデコードされ、ユーザー名が取得される
# ユーザから渡されるデータは以下のような形式
# curl -X 'GET' \
#   'http://127.0.0.1:8000/users/me/' \
#   -H 'accept: application/json' \
#   -H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzIyOTU0OTU4fQ.X9-kpGVwnHi4E6-qEhlv9ST26sfBzxv8P9xVc0fMTAs'
# つまり、React側から送るときには以下のようにすればいい
# fetch('http://localhost:8000/users/me/', {
#   method: 'GET',
#   headers: {
#     'Authorization': `Bearer ${token}`
#   }
#
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [
        {
            "item_id": "Foo",
            "owner": current_user.username
        }
    ]


if __name__ == "__main__":
    uvicorn.run("main_jwt:app", port=8000, reload=True)

このままだと改ざんができてしまうのではないか?例えばExpiresをブラウザで変更したりするなど。

試してみたができなかった。

ExpiresはJWTの中のペイロードの中とcookiesの中に2種類存在し、今回変更してみたのはcookiesの中にある方。

しかしcookiesの方を改ざんしてもJWTのペイロードの方のExpiresは変わっていないためデコードするときにJWTErrorにより認証は失敗する。

ただこの方法はサーバ側で情報を管理しているわけではないため、もしJWTが第三者に渡ってしまった場合にサーバ側でそのトークンを無効にすることができないという問題がある。

2. セッションidをサーバ側とcookiesで管理する方法(推奨)

以下の記事を参考にした。

  1. pip install fastapi uvicorn passlib[bcrypt] python-multipart
  2. サンプルコード2をmain_session.pyに貼り付ける
  3. python main_session.py
  4. localhost:8000/docsにアクセス
サンプルコード2
main_session.py
import secrets
from datetime import datetime, timedelta, timezone
from typing import Dict, Optional

import uvicorn
from fastapi import Cookie, Depends, FastAPI, HTTPException, Response, status
from fastapi.security import OAuth2PasswordRequestForm
from passlib.context import CryptContext
from pydantic import BaseModel

# パスワードをハッシュ化するためのキー。本番環境では変更すること。
SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
# アクセストークンの有効期限(分単位)
ACCESS_TOKEN_EXPIRE_MINUTES = 5
# セッションIDの長さ
SESSION_ID_LENGTH = 64


fake_users_db = {
    "johndoe": {
        "username": "johndoe",
        "full_name": "John Doe",
        "email": "johndoe@example.com",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False,
    }
}

# セッションIDとユーザー名を紐付けるためのデータベース。今回はメモリ上に保存する。
sessions_db: Dict[str, str] = {}


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

app = FastAPI()


def verify_password(plain_password, hashed_password):
    return pwd_context.verify(plain_password, hashed_password)


def get_password_hash(password):
    return pwd_context.hash(password)


def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


def authenticate_user(fake_db, username: str, password: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    if not verify_password(password, user.hashed_password):
        return False
    return user


def create_session_id() -> str:
    while True:
        session_id = secrets.token_hex(SESSION_ID_LENGTH)
        if session_id not in sessions_db:
            return session_id


async def get_current_user(session_id: str = Cookie(None)):
    if not session_id or session_id not in sessions_db:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    username = sessions_db[session_id]
    user = get_user(fake_users_db, username=username)
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Could not validate credentials",
            headers={"WWW-Authenticate": "Bearer"},
        )
    return user


async def get_current_active_user(current_user: User = Depends(get_current_user)):
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.post("/login")
async def login(response: Response, form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(
        fake_users_db, form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    session_id = create_session_id()
    sessions_db[session_id] = user.username
    expires = datetime.now(tz=timezone.utc) + \
        timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    response.set_cookie(
        key="session_id",
        value=session_id,
        expires=expires,
        httponly=True
    )


@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
    return current_user


@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
    return [{"item_id": "Foo", "owner": current_user.username}]

if __name__ == "__main__":
    uvicorn.run("main_session:app", port=8000, reload=True)

さいごに

FastAPIにおいてセッションを使って認証を行う方法を紹介しました。

今回紹介したコードは自由に使っていただいて大丈夫ですが、このコードを使用することで発生する問題やリスクについては、一切の責任を負いかねます。
適切なレビューとテストを行い、安全で信頼性の高いシステムを構築してください。

また、間違っている部分があればコメントよろしくお願いします。

  1. ryo-keima
    (・前田 涼輔). DRFでセッション認証. https://qiita.com/ryo-keima/items/79c75963abc831fd255f

  2. showchan33. Python : FlaskのsessionのKeyValueが、セッションCookieにどのように格納されているのかを調べてみた. https://qiita.com/showchan33/items/b714cca80985b3db2565

  3. JSON Web Token(JWT)の紹介とYahoo! JAPANにおけるJWTの活用. https://techblog.yahoo.co.jp/advent-calendar-2017/jwt/ 2

  4. k1b3. ブラウザに情報を保存する方法まとめ(In-Memory/Cookie/Local Storage/Session Storage). https://zenn.dev/kibe/articles/8ec80078e123a2

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2