はじめに
以下のチュートリアルをハンズオンします。
JWT(JSON Web Token)とは?
JWT(JSON Web Token)とは、JSON形式で表現された認証情報などをURL文字列などとして安全に送受信できるよう、符号化やデジタル署名の仕組みを規定した標準規格。
https://e-words.jp/w/JWT.html#google_vignette
要するに、「JSON ベースのデータを暗号化してつくられる文字列」のこと。
以下の解説も分かりやすい。
セットアップ
requirements.txt
# パスワードのハッシュを処理するため
bcrypt==4.0.1
passlib[bcrypt]
# JWTトークンの生成と検証を行うため
pyjwt
pip install -r requirements.txt
ドキュメントでは、以下のように書かれていたが、上記モジュールでないとサンプルは動作しなかった。
PythonでJWTトークンの生成と検証を行うために、python-joseをインストールする必要があります
下準備
JWTトークンの署名に使用されるランダムな秘密鍵を生成
openssl rand -hex 32
c4c573b0ba30dbf7aed6614d74372409ec0e1f10c4746e558f84aff97d7222ee
本来はこのキーはソースコードに含めてはならず、環境変数などで読み込むようにするなど、管理を徹底する必要がある。
ソース
app.py
from datetime import datetime, timedelta, timezone
from typing import Union
import jwt
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jwt.exceptions import InvalidTokenError
from passlib.context import CryptContext
from pydantic import BaseModel
# to get a string like this run:
# openssl rand -hex 32
SECRET_KEY = "c4c573b0ba30dbf7aed6614d74372409ec0e1f10c4746e558f84aff97d7222ee"
# JWTトークンの署名に使用するアルゴリズム
ALGORITHM = "HS256"
# トークンの有効期限
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# passwordは「secret」(ハッシュ化されている)
fake_users_db = {
"johndoe": {
"username": "johndoe",
"full_name": "John Doe",
"email": "johndoe@example.com",
"hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
"disabled": False,
}
}
# トークンのデータモデル
class Token(BaseModel):
access_token: str
token_type: str
# トークンに含まれるデータモデル
class TokenData(BaseModel):
username: Union[str, None] = None
# ユーザーのデータモデル
class User(BaseModel):
username: str
email: Union[str, None] = None
full_name: Union[str, None] = None
disabled: Union[bool, None] = None
# データベース内のユーザーデータモデル
class UserInDB(User):
hashed_password: str
# パスワードのハッシュ化に使用するコンテキスト
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# OAuth2スキーマの定義
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# FastAPIアプリケーションの初期化
app = FastAPI()
# パスワードの検証関数
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# パスワードのハッシュ化関数
def get_password_hash(password):
return pwd_context.hash(password)
# ユーザー名からユーザー情報を取得する関数
def get_user(db, username: str):
if username in db:
user_dict = db[username]
return UserInDB(**user_dict)
# ユーザーの認証関数
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 新しいアクセストークンを生成する
def create_access_token(data: dict, expires_delta: Union[timedelta, None] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.now(timezone.utc) + expires_delta
else:
expire = datetime.now(timezone.utc) + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 受け取ったトークンを復号して検証し、現在のユーザーを返す
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except InvalidTokenError:
raise credentials_exception
user = get_user(fake_users_db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# 現在のアクティブなユーザーを取得する関数
async def get_current_active_user(current_user: User = Depends(get_current_user)):
if current_user.disabled:
raise HTTPException(status_code=400, detail="Inactive user")
return current_user
# トークンの有効期限を設定
# JWTアクセストークンを作成
# usernameとpasswordが正しければ、access_tokenをreturnする。
@app.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends(),
) -> Token:
user = authenticate_user(fake_users_db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return Token(access_token=access_token, token_type="bearer")
# 現在のユーザー情報を取得するエンドポイント
@app.get("/users/me/", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_active_user)):
return current_user
# 現在のユーザーのアイテムを取得するエンドポイント
@app.get("/users/me/items/")
async def read_own_items(current_user: User = Depends(get_current_active_user)):
return [{"item_id": "Foo", "owner": current_user.username}]
実行・動作確認
FastAPI起動
uvicorn app:app --reload
ログインしてJWTトークンを取得
curl -X 'POST' \
'http://localhost:8000/token' \
-H 'accept: application/json' \
-H 'Content-Type: application/x-www-form-urlencoded' \
-d 'grant_type=&username=johndoe&password=secret&scope=&client_id=&client_secret='
以下のようにトークンが返ってくるればOK。
{"access_token":"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzIyODY5OTY2fQ.vL3deQZIoHFyliLBSycQVXHlH2gNlKBOkQSD0EMGl-4","token_type":"bearer"}
JWTトークンを使用してデータ取得
上記トークンを利用する。
curl -X 'GET' \
'http://localhost:8000/users/me/' \
-H 'accept: application/json' \
-H 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJqb2huZG9lIiwiZXhwIjoxNzIyODY5OTY2fQ.vL3deQZIoHFyliLBSycQVXHlH2gNlKBOkQSD0EMGl-4'
以下のように結果が返ってくれば成功。
{"username":"johndoe","email":"johndoe@example.com","full_name":"John Doe","disabled":false}
参考