こちらと同様のことを行いました。
【FastAPI】OAuth2, JWT認証方法
変更点は、
ユーザーが foobar の一人だったのを、
scott, jack, betty の三人にしたことです。
| user | password |
|---|---|
| scott | tiger123 |
| jack | jack123 |
| betty | betty123 |
プログラム
フォルダー構造
$ tree api
api
├── database.py
├── hash.py
├── main.py
├── models.py
├── oauth2.py
└── schemas.py
database.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, declarative_base
DB_URL = "sqlite:///app.db"
engine = create_engine(DB_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
Base = declarative_base()
hash.py
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"])
def create_hashed_password(password):
"""ハッシュ値生成"""
return pwd_context.hash(password)
def verify_password(password, hashed_password):
"""ハッシュ値検証"""
return pwd_context.verify(password, hashed_password)
models.py
from sqlalchemy import Column, Integer, String
from hash import create_hashed_password
from database import engine, SessionLocal, Base
class User(Base):
"""データベースユーザーモデル定義"""
__tablename__ = "users"
id = Column(Integer, primary_key=True)
username = Column(String, nullable=False, unique=True)
password = Column(String, nullable=False)
email = Column(String, nullable=False)
# データベース作成
Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)
# ユーザーデータ作成
user_aa = User(
username="scott",
password=create_hashed_password("tiger123"),
email="scott@example.net"
)
user_bb = User(
username="jack",
password=create_hashed_password("jack123"),
email="jack@example.net"
)
user_cc = User(
username="betty",
password=create_hashed_password("betty123"),
email="betty@example.net"
)
# ユーザーデータ挿入
with SessionLocal() as db:
db.add(user_aa)
db.add(user_bb)
db.add(user_cc)
db.commit()
schemas.py
from pydantic import BaseModel
class User(BaseModel):
username: str
email: str
class UserInDB(User):
id: int
password: str
class Config:
orm_mode = True
class Token(BaseModel):
access_token: str
token_type: str
oauth2.py
from fastapi import APIRouter, Depends, HTTPException
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from sqlalchemy.orm import Session
from jose import jwt, JWTError
import datetime
from hash import verify_password
from database import get_db
import schemas
import models
SECRET_KEY = "e64e815602e23e2fe2d73024cb5ff526fd82c531d1e89012b517c3009e74ffa2"
ALGORITHM = "HS256"
EXP_MINUTES = 10
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
router = APIRouter(tags=["OAuth2"])
def get_user(username: str, db: Session):
"""usernameが一致するuserをreturnする。"""
user = db.query(models.User).filter(models.User.username == username).first()
return schemas.UserInDB(**vars(user))
def authenticate_user(username: str, password: str, db: Session):
"""usernameが一致するuserを取得し、passwordのハッシュ値検証が正しければuserをreturnする。"""
user = get_user(username, db)
if not user:
return False
if not verify_password(password, user.password):
return False
return user
def create_access_token(username: str):
"""usernameと有効期限を使用してJWTを生成する。"""
to_encode = {
"username": username,
"exp": datetime.datetime.utcnow() + datetime.timedelta(minutes=EXP_MINUTES)
}
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
"""tokenからusernameを取得し、usernameと一致するuserをreturnする。"""
credentials_exception = HTTPException(
status_code=401,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"}
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=ALGORITHM)
username = payload.get("username")
if not username:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user(username, db)
if not user:
raise credentials_exception
return user
@router.post("/token", response_model=schemas.Token)
def login_for_access_token(form: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
"""Formデータのusernameとpasswordが正しければ、access_tokenをreturnする。"""
user = authenticate_user(form.username, form.password, db)
if user is False:
raise HTTPException(
status_code=401,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"}
)
access_token = create_access_token(user.username)
return {"access_token": access_token, "token_type": "bearer"}
main.py
from fastapi import FastAPI, Depends
import oauth2
import schemas
app = FastAPI()
app.include_router(oauth2.router)
@app.get("/")
def index(current_user: schemas.User = Depends(oauth2.get_current_user)):
return {"username": current_user.username, "email": current_user.email}
サーバーの起動
uvicorn main:app --reload
起動すると、app.db というファイルが作成されます。
$ file app.db
app.db: SQLite 3.x database, last written using SQLite version 3040001, file counter 2, database pages 3, cookie 0x1, schema 4, UTF-8, version-valid-for 2
$ sqlite3 app.db
SQLite version 3.40.1 2022-12-28 14:03:47
Enter ".help" for usage hints.
sqlite> .table
users
sqlite> select * from users;
1|scott|$2b$12$t.tLp94wQvx1tGlt69K76u1AXQcUREbswdqE1VoezmB4B/H3dMnqm|scott@example.net
2|jack|$2b$12$4jHFDvLdHWPIgKmMIhafie3r2qvRuvBFATdDKix.3tS1EgaQS04Ye|jack@example.net
3|betty|$2b$12$0wrPXaimjfrXOwncq4uazO4qc3X7SipPKWNnV0MRzEpOUt8GKWwb2|betty@example.net
テストスクリプト
トークンの作成
http_login.sh
http --form $URL username=scott password=tiger123 > scott.json
http --form $URL username=jack password=jack123 > jack.json
http --form $URL username=betty password=betty123 > betty.json
次のようなファイルが出来ます。
$ jq . scott.json
{
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6InNjb3R0IiwiZXhwIjoxNjg0NjM0NDU2fQ.rLbVAdMvt3U4************",
"token_type": "bearer"
}
ユーザー情報の取得
http_get.sh
token=`jq .access_token scott.json | sed 's/"//g'`
http "http://localhost:8000/" "Authorization: Bearer ${token}"
#
token=`jq .access_token jack.json | sed 's/"//g'`
http "http://localhost:8000/" "Authorization: Bearer ${token}"
#
token=`jq .access_token betty.json | sed 's/"//g'`
http "http://localhost:8000/" "Authorization: Bearer ${token}"
実行結果
$ ./http_get.sh
HTTP/1.1 200 OK
content-length: 48
content-type: application/json
date: Sun, 21 May 2023 02:12:36 GMT
server: uvicorn
{
"email": "scott@example.net",
"username": "scott"
}
HTTP/1.1 200 OK
content-length: 46
content-type: application/json
date: Sun, 21 May 2023 02:12:36 GMT
server: uvicorn
{
"email": "jack@example.net",
"username": "jack"
}
HTTP/1.1 200 OK
content-length: 48
content-type: application/json
date: Sun, 21 May 2023 02:12:36 GMT
server: uvicorn
{
"email": "betty@example.net",
"username": "betty"
}