目次
5日目 - ユーザの認証の追加
公式のリファレンスに則ってJWT認証を設定する。
パスワードのハッシュ化
ライブラリのインストール
bash
poetry add passlib ["bcrypt"]# パスワードのハッシュするため
認証用のファイルを作成
api/v1/cruds/auth.py
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from api.v1.models.user import User
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
# パスワードのハッシュ化
def get_password_hash(password):
return pwd_context.hash(password)
# パスワードの検証
def verify_password(plain_password,hashed_password):
return pwd_context.verify(plain_password,hash_password)
# ユーザーの認証
def authenticate_user(db:Session,login_id:str,password:str):
user = db.query(User).filter_by(login_id=login_id).first()
if user is None:
return None
if not verify_password(password,user.password_hash):
return None
return user
ユーザーの作成時の処理の変更
api/v1/cruds/user.py
# ~~~
from api.v1.cruds.auth import get_password_hash
# ~~~
def create_user(db:Session,user:user_schema.UserCreateRequest):
password_hash = get_password_hash(user.password)
new_user = User(
login_id = user.login_id,
name = user.name,
description = user.description,
password_hash = password_hash
)
db.add(new_user)
db.commit()
db.refresh(new_user)
return new_user
これでパスワードのハッシュ化とハッシュ化したパスワードの認証を行うためのメソッドが用意できた。
JWTの認証の追加
JWT認証のためのSECRETキーなどの保存
以下よりsecretキーを生成
bash
openssl rand -hex 32
生成したキーを環境変数として読み込むためにdocker-compose.ymlと同じディレクトリに.envを生成
(環境変数の読み込み方を前回から変更)
.env
MYSQL_ROOT_PASSWORD=...
SECRET_KEY=...
環境変数をdocker-compose.yml内に定義
docker-compose.yaml
# ~~~
backend:
# ~~~
environment:
- MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD
- SECRET_KEY=$SECRET_KEY
これで環境変数は.envファイル書き換えるだけでオッケー
ライブラリのインストール
bash
poetry add python-jose # JWTの認証用
poetry add python-multipart # multipart/form-dataのリクエストを受け付けられるように
CRUD処理の追加
access_tokenの作成
api/v1/cruds/auth.py
from datetime import datetime, timedelta
from jose import jwt
import os
# ~~~
SECRET_KEY = os.environ.get("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOEKN_EXPIRE_MINUTES = 30
# ~~~
def create_access_token(data:dict):
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp":expire})
encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encode_jwt
tokenからのユーザー情報の取得
api/v1/cruds/auth.py
from fastapi import OAuth2PasswordBearer
from jose import JWTError
# ~~~
def get_current_user(db:Session,token:str):
try:
payload = jwt.decode(token,SECRET_KEY,algorithm=ALGORITHM)
login_id:str = payload.get("sub")
if login_id is None:
return None
except JWTError:
return None
current_user = db.query(User).filter_by(login_id=login_id).first()
if current_user is None:
return None
return current_user
schemaファイルの作成
api/v1/schemas/auth.py
from pydantic import BaseModel
class Token(BaseModel):
access_token:str
token_type:str
auth用のルーター作成
api/v1/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from fastapi.security import OAuth2PasswordRequestForm
import api.v1.schemas.auth as auth_schema
import api.v1.schemas.user as user_schema
import api.v1.cruds.auth as auth_crud
from api.v1.db import get_db
router = APIRouter()
@router.post('/token', response_model=auth_schema.Token)
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(),db:Session = Depends(get_db)):
user = auth_crud.authenticate_user(
db=db,
login_id=form_data.username,
password=form_data.password
)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect login_id or password",
headers={"WWW-Authenticate":"Bearer"}
)
access_token = auth_crud.get_current_user(db=db,token=token)
return {"access_token":access_token,"token_type": "bearer"}
auth用のルーター反映
api/v1/main.py
# ~~~
from api.v1.routers import user,post,auth
# ~~~
app.include_router(auth.router)
これで認証用のエンドポイントができた。
ユーザー認証が必要なエンドポイントに反映
api/v1/router/post.py
# ~~~
import api.v1.cruds.auth as auth_crud
# ~~~
@router.post('/posts',response_model=post_schema.Post)
def create_post(post:post_schema.PostBase,token: str = Depends(auth_crud.oauth2_scheme),db:Session = Depends(get_db)):
current_user = auth_crud.get_current_user(db=db,token=token)
post = post_crud.create_post(db=db,post=post,user_id=current_user.id)
return post
# ~~~
@router.post('/post/{id}/lgtm')
def add_lgtm(id:int,token: str = Depends(auth_crud.oauth2_scheme),db:Session = Depends(get_db)):
current_user = auth_crud.get_current_user(db=db,token=token)
lgtm_crud.add_lgtm(db=db,post_id=id,user_id=current_user.id)
return None
# ~~~
@router.delete('/post/{id}/lgtm')
def delete_lgtm(id:int,token: str = Depends(auth_crud.oauth2_scheme),db:Session = Depends(get_db)):
current_user = auth_crud.get_current_user(db=db,token=token)
lgtm = lgtm_crud.get_lgtm(db=db,post_id=id,user_id=current_user.id)
if lgtm is None:
raise HTTPException(status_code = 400, detail = "Lgtm not Found")
lgtm_crud.remove_lgtm(db=db,lgtm=lgtm)
return None
確認
このボタンで認証を行なうと、認証が必要なエンドポイントのお試しができる。
とりあえずこれで認証が必要なルーティングが完成。