1
0

More than 1 year has passed since last update.

FastAPI: JWT認証

Posted at

こちらと同様のことを行いました。
【FastAPI】OAuth2, JWT認証方法

変更点は、
ユーザーが foobar の一人だったのを、
scott, jack, betty の三人にしたことです。

user password
scott tiger123
jack jack123
betty betty123

プログラム

フォルダー構造

$ tree api
api
├── database.py
├── hash.py
├── main.py
├── models.py
├── oauth2.py
└── schemas.py
database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base

DB_URL = "sqlite:///app.db"
engine = create_engine(DB_URL, connect_args={"check_same_thread": False})

SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


Base = declarative_base()
hash.py
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"])


def create_hashed_password(password):
    """ハッシュ値生成"""
    return pwd_context.hash(password)


def verify_password(password, hashed_password):
    """ハッシュ値検証"""
    return pwd_context.verify(password, hashed_password)
models.py
from sqlalchemy import Column, Integer, String

from hash import create_hashed_password
from database import engine, SessionLocal, Base


class User(Base):
    """データベースユーザーモデル定義"""
    __tablename__ = "users"

    id = Column(Integer, primary_key=True)
    username = Column(String, nullable=False, unique=True)
    password = Column(String, nullable=False)
    email = Column(String, nullable=False)


# データベース作成
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)

# ユーザーデータ作成
user_aa = User(
    username="scott",
    password=create_hashed_password("tiger123"),
    email="scott@example.net"
)
user_bb = User(
    username="jack",
    password=create_hashed_password("jack123"),
    email="jack@example.net"
)
user_cc = User(
    username="betty",
    password=create_hashed_password("betty123"),
    email="betty@example.net"
)

# ユーザーデータ挿入
with SessionLocal() as db:
    db.add(user_aa)
    db.add(user_bb)
    db.add(user_cc)
    db.commit()
schemas.py
from pydantic import BaseModel


class User(BaseModel):
    username: str
    email:  str


class UserInDB(User):
    id: int
    password: str

    class Config:
        orm_mode = True


class Token(BaseModel):
    access_token: str
    token_type: str
oauth2.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from jose import jwt, JWTError
import datetime

from hash import verify_password
from database import get_db
import schemas
import models

SECRET_KEY = "e64e815602e23e2fe2d73024cb5ff526fd82c531d1e89012b517c3009e74ffa2"
ALGORITHM = "HS256"
EXP_MINUTES = 10

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

router = APIRouter(tags=["OAuth2"])


def get_user(username: str, db: Session):
    """usernameが一致するuserをreturnする。"""
    user = db.query(models.User).filter(models.User.username == username).first()
    return schemas.UserInDB(**vars(user))


def authenticate_user(username: str, password: str, db: Session):
    """usernameが一致するuserを取得し、passwordのハッシュ値検証が正しければuserをreturnする。"""
    user = get_user(username, db)
    if not user:
        return False
    if not verify_password(password, user.password):
        return False
    return user


def create_access_token(username: str):
    """usernameと有効期限を使用してJWTを生成する。"""
    to_encode = {
        "username": username,
        "exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=EXP_MINUTES)
    }
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
    """tokenからusernameを取得し、usernameと一致するuserをreturnする。"""
    credentials_exception = HTTPException(
        status_code=401,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"}
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
        username = payload.get("username")
        if not username:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = get_user(username, db)
    if not user:
        raise credentials_exception
    return user


@router.post("/token", response_model=schemas.Token)
def login_for_access_token(form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
    """Formデータのusernameとpasswordが正しければ、access_tokenをreturnする。"""
    user = authenticate_user(form.username, form.password, db)
    if user is False:
        raise HTTPException(
            status_code=401,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"}
        )
    access_token = create_access_token(user.username)
    return {"access_token": access_token, "token_type": "bearer"}
main.py
from fastapi import FastAPI, Depends

import oauth2
import schemas

app = FastAPI()
app.include_router(oauth2.router)


@app.get("/")
def index(current_user: schemas.User = Depends(oauth2.get_current_user)):
    return {"username": current_user.username, "email": current_user.email}

サーバーの起動

uvicorn main:app --reload

起動すると、app.db というファイルが作成されます。

$ file app.db 
app.db: SQLite 3.x database, last written using SQLite version 3040001, file counter 2, database pages 3, cookie 0x1, schema 4, UTF-8, version-valid-for 2
$ sqlite3 app.db 
SQLite version 3.40.1 2022-12-28 14:03:47
Enter ".help" for usage hints.

sqlite> .table
users

sqlite> select * from users;
1|scott|$2b$12$t.tLp94wQvx1tGlt69K76u1AXQcUREbswdqE1VoezmB4B/H3dMnqm|scott@example.net
2|jack|$2b$12$4jHFDvLdHWPIgKmMIhafie3r2qvRuvBFATdDKix.3tS1EgaQS04Ye|jack@example.net
3|betty|$2b$12$0wrPXaimjfrXOwncq4uazO4qc3X7SipPKWNnV0MRzEpOUt8GKWwb2|betty@example.net

テストスクリプト

トークンの作成

http_login.sh
http --form $URL  username=scott password=tiger123 > scott.json
http --form $URL  username=jack password=jack123 > jack.json
http --form $URL  username=betty password=betty123 > betty.json

次のようなファイルが出来ます。

$ jq . scott.json 
{
  "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InNjb3R0IiwiZXhwIjoxNjg0NjM0NDU2fQ.rLbVAdMvt3U4************",
  "token_type": "bearer"
}

ユーザー情報の取得

http_get.sh
token=`jq .access_token scott.json | sed 's/"//g'`
http "http://localhost:8000/"  "Authorization: Bearer ${token}"
#
token=`jq .access_token jack.json | sed 's/"//g'`
http "http://localhost:8000/"  "Authorization: Bearer ${token}"
#
token=`jq .access_token betty.json | sed 's/"//g'`
http "http://localhost:8000/"  "Authorization: Bearer ${token}"

実行結果

$ ./http_get.sh 
HTTP/1.1 200 OK
content-length: 48
content-type: application/json
date: Sun, 21 May 2023 02:12:36 GMT
server: uvicorn

{
    "email": "scott@example.net",
    "username": "scott"
}


HTTP/1.1 200 OK
content-length: 46
content-type: application/json
date: Sun, 21 May 2023 02:12:36 GMT
server: uvicorn

{
    "email": "jack@example.net",
    "username": "jack"
}


HTTP/1.1 200 OK
content-length: 48
content-type: application/json
date: Sun, 21 May 2023 02:12:36 GMT
server: uvicorn

{
    "email": "betty@example.net",
    "username": "betty"
}
1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0