1
1

More than 1 year has passed since last update.

40代おっさんFastAPI、MongoDB連携メモ 認証関係

Posted at

本記事について

この記事はプログラミング初学者の私が学んでいく中でわからない単語や概要を分かりやすくまとめたものです。
などありましたらコメントにてお知らせいただければ幸いです。

認証関係のクラスを作る

プロジェクト直下にファイルを作成auth_utils.py

JWTとは
JSON Web Token(JWT)Webを介して、2者の間で、情報を安全に転送するために使用されるJSONオブジェクト。認証システムや情報交換に用いられています。転送される情報は、デジタル署名されているため、信頼することができ。Token自体は主にヘッダー、ペイロード、署名で構成されている。これらの3つの部分はドット”.”で区切られている。

シークレットキーを環境変数に定義

.envファイルに

CSRF_KEY=XXXXXX
JWT_KEY=XXXXXX

auth_utils.py

import jwt
from fastapi import HTTPException
from passlib.context import CryptContext
from datetime import datetime, timedelta
from decouple import config

JWT_KEY = config('JWT_KEY') # .envより呼び出し

# 認証関係をまとめたクラス
class AuthJwtCsrt():
    pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto") # CryptContextを使ってインスタンスを生成
    secret_key = JWT_KEY

    # フォームでタイピングしたpasswordをハッシュ化
    def generate_hashed_pw(self, password) -> str:
        return self.pwd_ctx.hash(password) # ハッシュ化

    # passwordを検証
    def verify_pw(self, plain_pw, hashed_pw) -> bool: # DBにあるハッシュ化されたpasswordを判定 bool値で返す
        return self.pwd_ctx.verify(plain_pw, hashed_pw) # DBにあるハッシュ化打ち込まれたpasswordを判定

    # jwtを生成
    def encode_jwt(self, email) -> str: # ユーザーemail
        payload = {
            'exp': datetime.utcnow() + timedelta(days=0, minutes=5), # jwtの有効期限ここでは5分
            'iat': datetime.utcnow(), # jwtが生成された日時
            'sub': email # ユーザーを一意に識別出来るものを指定
        }
        return jwt.encode(
            payload,
            self.secret_key,
            algorithm='HS256' # アルゴリズム
        )
    
    # jwtを解析
    def decode_jwt(self, token) -> str:
        try:
            payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
            return payload['sub']
        except jwt.ExpiredSignatureError: # jwtトークンが執行している
            raise HTTPException(
                status_code=401, detail='The JWT has expired'
            )
        except jwt.InvalidTokenError as e: # jwtに準拠していない値、空のトークンが渡されたとき
            raise HTTPException(status_code=401, detail='JWT is not valid')

database.py

  • from auth_utils import AuthJwtCsrtを記入
  • auth = AuthJwtCsrt()インスタンスを作る

ユーザーを新規で作る

# 新規ユーザー作成
async def db_signup(data: dict) -> dict:
    email = data.get("email")
    password = data.get("password")
    # emailの存在の判定
    overlap_user = await collection_user.find_one({"email": email}) # ユーザーが存在すればtrue
    if overlap_user:
        raise HTTPException(status_code=400, detail='Email is already taken') # trueの場合の例外発生
    if not password or len(password) < 6: # パスワードが6文字以下、入力されていない場合
        raise HTTPException(status_code=400, detail='Password too short')
    # DBに登録
    user = await collection_user.insert_one({"email": email, "password": auth.generate_hashed_pw(password)}) # generate_hashed_pwハッシュ化してDBに返す
    new_user = await collection_user.find_one({"_id": user.inserted_id})
    return user_serializer(new_user)
  • from fastapi import HTTPExceptionをimport
user.inserted_idを辞書型で返す関数
def user_serializer(user) -> dict:
    return {
        "id": str(user["_id"]), # オブジェクトIDをstrに直す
        "email": user["email"],
    }
  • inserted_idがオブジェクト型で返ってくるため、string型に直す必要がある為

ログイン

async def db_login(data: dict) -> str: # jwtトークンに返すためにstrにしている
    email = data.get("email")
    password = data.get("password")
    user = await collection_user.find_one({"email": email})
    # userが存在しない場合、ハッシュ化されたパスワードが一致しない場合
    if not user or not auth.verify_pw(password, user["password"]):
        raise HTTPException(
            status_code=401, detail='Invalid email or password'
        )
    # 問題なければ
    token = auth.encode_jwt(user['email']) # jwtを生成
    return token

route_auth.py

routeフォルダの中に
route_auth.pyファイルを作る

from fastapi import APIRouter
from fastapi import Response, Request
from fastapi.encoders import jsonable_encoder
from schemas import UserBody, SuccessMsg, UserInfo
from database import (
    db_signup,
    db_login,
)
from auth_utils import AuthJwtCsrt

router = APIRouter()
auth = AuthJwtCsrt()

# 新規登録
@router.post("/api/register", response_model=UserInfo)
async def signup(user: UserBody):
    user = jsonable_encoder(user) # dict型に変換
    new_user = await db_signup(user)
    return new_user

# ログイン
@router.post("/api/login", response_model=SuccessMsg)
async def login(response: Response, user:UserBody):
    user = jsonable_encoder(user) # dict型に変換
    token = await db_login(user)
    response.set_cookie( # クッキーに設定
        key="access_token", value=f"Bearer {token}", httponly=True, samesite="none", secure=True
    )
    return {"message": "Successfully logged-in"}

main.pyファイルに
app.include_router(route_auth.router)を追加

※pip install bcrypt==3.2.0をする。
 pip freeze -> requirements.txtで追記

schemas.py

# フロントから送られる型
class UserBody(BaseModel):
    email: str
    password: str

# 複数のエンドポイントで使う
class UserInfo(BaseModel):
    id: Optional[str] = None # Optionalで任意の値
    email: str
  • from typing import Optionalのimport文を追記
  • OptionalはNoneで返ってくる値に使う

## 参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1