LoginSignup
4
1

More than 1 year has passed since last update.

FastAPI+React+DockerでQiitaみたいなサイトを作ってみたい -5日目-

Posted at

目次

5日目 - ユーザの認証の追加

公式のリファレンスに則ってJWT認証を設定する。

パスワードのハッシュ化

ライブラリのインストール

bash
poetry add passlib ["bcrypt"]# パスワードのハッシュするため

認証用のファイルを作成

api/v1/cruds/auth.py
from passlib.context import CryptContext
from sqlalchemy.orm import Session
from api.v1.models.user import User

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# パスワードのハッシュ化
def get_password_hash(password):
  return pwd_context.hash(password)

# パスワードの検証
def verify_password(plain_password,hashed_password):
  return pwd_context.verify(plain_password,hash_password)

# ユーザーの認証
def authenticate_user(db:Session,login_id:str,password:str):
  user = db.query(User).filter_by(login_id=login_id).first()
  if user is None:
    return None
  if not verify_password(password,user.password_hash):
    return None
  return user

ユーザーの作成時の処理の変更

api/v1/cruds/user.py
# ~~~
from api.v1.cruds.auth import get_password_hash
# ~~~
def create_user(db:Session,user:user_schema.UserCreateRequest):
  password_hash = get_password_hash(user.password)
  new_user = User(
    login_id      = user.login_id,
    name          = user.name,
    description   = user.description,
    password_hash = password_hash
  )
  db.add(new_user)
  db.commit()
  db.refresh(new_user)
  return new_user

これでパスワードのハッシュ化とハッシュ化したパスワードの認証を行うためのメソッドが用意できた。

JWTの認証の追加

JWT認証のためのSECRETキーなどの保存

以下よりsecretキーを生成

bash
openssl rand -hex 32

生成したキーを環境変数として読み込むためにdocker-compose.ymlと同じディレクトリに.envを生成
(環境変数の読み込み方を前回から変更)

.env
MYSQL_ROOT_PASSWORD=...
SECRET_KEY=...

環境変数をdocker-compose.yml内に定義

docker-compose.yaml
# ~~~
  backend:
  # ~~~
    environment:
      - MYSQL_ROOT_PASSWORD=$MYSQL_ROOT_PASSWORD
      - SECRET_KEY=$SECRET_KEY

これで環境変数は.envファイル書き換えるだけでオッケー

ライブラリのインストール

bash
poetry add python-jose # JWTの認証用
poetry add python-multipart # multipart/form-dataのリクエストを受け付けられるように

CRUD処理の追加

access_tokenの作成

api/v1/cruds/auth.py
from datetime import datetime, timedelta
from jose import jwt
import os
# ~~~
SECRET_KEY = os.environ.get("SECRET_KEY")
ALGORITHM = "HS256"
ACCESS_TOEKN_EXPIRE_MINUTES = 30
# ~~~
def create_access_token(data:dict):
  to_encode = data.copy()
  expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
  to_encode.update({"exp":expire})
  encode_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
  return encode_jwt

tokenからのユーザー情報の取得

api/v1/cruds/auth.py
from fastapi import OAuth2PasswordBearer
from jose import JWTError
# ~~~
def get_current_user(db:Session,token:str):
  try:
    payload = jwt.decode(token,SECRET_KEY,algorithm=ALGORITHM)
    login_id:str = payload.get("sub")
    if login_id is None:
      return None
  except JWTError:
    return None
  current_user = db.query(User).filter_by(login_id=login_id).first()
  if current_user is None:
    return None
  return current_user

schemaファイルの作成

api/v1/schemas/auth.py
from pydantic import BaseModel

class Token(BaseModel):
  access_token:str
  token_type:str

auth用のルーター作成

api/v1/routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from fastapi.security import OAuth2PasswordRequestForm

import api.v1.schemas.auth as auth_schema
import api.v1.schemas.user as user_schema
import api.v1.cruds.auth as auth_crud

from api.v1.db import get_db

router = APIRouter()

@router.post('/token', response_model=auth_schema.Token)
def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends(),db:Session = Depends(get_db)):
  user = auth_crud.authenticate_user(
    db=db,
    login_id=form_data.username,
    password=form_data.password
  )
  if user is None:
    raise HTTPException(
      status_code=status.HTTP_401_UNAUTHORIZED,
      detail="Incorrect login_id or password",
      headers={"WWW-Authenticate":"Bearer"}
    )
  access_token = auth_crud.get_current_user(db=db,token=token)
  return {"access_token":access_token,"token_type": "bearer"}

auth用のルーター反映

api/v1/main.py
# ~~~
from api.v1.routers import user,post,auth
# ~~~
app.include_router(auth.router)

これで認証用のエンドポイントができた。

ユーザー認証が必要なエンドポイントに反映

api/v1/router/post.py
# ~~~
import api.v1.cruds.auth as auth_crud
# ~~~
@router.post('/posts',response_model=post_schema.Post)
def create_post(post:post_schema.PostBase,token: str = Depends(auth_crud.oauth2_scheme),db:Session = Depends(get_db)):
  current_user = auth_crud.get_current_user(db=db,token=token)
  post = post_crud.create_post(db=db,post=post,user_id=current_user.id)
  return post
# ~~~
@router.post('/post/{id}/lgtm')
def add_lgtm(id:int,token: str = Depends(auth_crud.oauth2_scheme),db:Session = Depends(get_db)):
  current_user = auth_crud.get_current_user(db=db,token=token)
  lgtm_crud.add_lgtm(db=db,post_id=id,user_id=current_user.id)
  return None
# ~~~
@router.delete('/post/{id}/lgtm')
def delete_lgtm(id:int,token: str = Depends(auth_crud.oauth2_scheme),db:Session = Depends(get_db)):
  current_user = auth_crud.get_current_user(db=db,token=token)
  lgtm = lgtm_crud.get_lgtm(db=db,post_id=id,user_id=current_user.id)
  if lgtm is None:
    raise HTTPException(status_code = 400, detail = "Lgtm not Found")
  lgtm_crud.remove_lgtm(db=db,lgtm=lgtm)
  return None

確認

/docsにアクセスすると認証用のボタンが右上に設置される
スクリーンショット 2022-01-03 0.11.06.png

このボタンで認証を行なうと、認証が必要なエンドポイントのお試しができる。
とりあえずこれで認証が必要なルーティングが完成。

4
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
1