本記事について
この記事はプログラミング初学者の私が学んでいく中でわからない単語や概要を分かりやすくまとめたものです。
などありましたらコメントにてお知らせいただければ幸いです。
認証関係のクラスを作る
プロジェクト直下にファイルを作成auth_utils.py
JWTとは
JSON Web Token(JWT)Webを介して、2者の間で、情報を安全に転送するために使用されるJSONオブジェクト。認証システムや情報交換に用いられています。転送される情報は、デジタル署名されているため、信頼することができ。Token自体は主にヘッダー、ペイロード、署名で構成されている。これらの3つの部分はドット”.”で区切られている。
シークレットキーを環境変数に定義
.envファイルに
CSRF_KEY=XXXXXX
JWT_KEY=XXXXXX
auth_utils.py
import jwt
from fastapi import HTTPException
from passlib.context import CryptContext
from datetime import datetime, timedelta
from decouple import config
JWT_KEY = config('JWT_KEY') # .envより呼び出し
# 認証関係をまとめたクラス
class AuthJwtCsrt():
pwd_ctx = CryptContext(schemes=["bcrypt"], deprecated="auto") # CryptContextを使ってインスタンスを生成
secret_key = JWT_KEY
# フォームでタイピングしたpasswordをハッシュ化
def generate_hashed_pw(self, password) -> str:
return self.pwd_ctx.hash(password) # ハッシュ化
# passwordを検証
def verify_pw(self, plain_pw, hashed_pw) -> bool: # DBにあるハッシュ化されたpasswordを判定 bool値で返す
return self.pwd_ctx.verify(plain_pw, hashed_pw) # DBにあるハッシュ化打ち込まれたpasswordを判定
# jwtを生成
def encode_jwt(self, email) -> str: # ユーザーemail
payload = {
'exp': datetime.utcnow() + timedelta(days=0, minutes=5), # jwtの有効期限ここでは5分
'iat': datetime.utcnow(), # jwtが生成された日時
'sub': email # ユーザーを一意に識別出来るものを指定
}
return jwt.encode(
payload,
self.secret_key,
algorithm='HS256' # アルゴリズム
)
# jwtを解析
def decode_jwt(self, token) -> str:
try:
payload = jwt.decode(token, self.secret_key, algorithms=['HS256'])
return payload['sub']
except jwt.ExpiredSignatureError: # jwtトークンが執行している
raise HTTPException(
status_code=401, detail='The JWT has expired'
)
except jwt.InvalidTokenError as e: # jwtに準拠していない値、空のトークンが渡されたとき
raise HTTPException(status_code=401, detail='JWT is not valid')
database.py
-
from auth_utils import AuthJwtCsrtを記入 -
auth = AuthJwtCsrt()インスタンスを作る
ユーザーを新規で作る
# 新規ユーザー作成
async def db_signup(data: dict) -> dict:
email = data.get("email")
password = data.get("password")
# emailの存在の判定
overlap_user = await collection_user.find_one({"email": email}) # ユーザーが存在すればtrue
if overlap_user:
raise HTTPException(status_code=400, detail='Email is already taken') # trueの場合の例外発生
if not password or len(password) < 6: # パスワードが6文字以下、入力されていない場合
raise HTTPException(status_code=400, detail='Password too short')
# DBに登録
user = await collection_user.insert_one({"email": email, "password": auth.generate_hashed_pw(password)}) # generate_hashed_pwハッシュ化してDBに返す
new_user = await collection_user.find_one({"_id": user.inserted_id})
return user_serializer(new_user)
-
from fastapi import HTTPExceptionをimport
user.inserted_idを辞書型で返す関数
def user_serializer(user) -> dict:
return {
"id": str(user["_id"]), # オブジェクトIDをstrに直す
"email": user["email"],
}
-
inserted_idがオブジェクト型で返ってくるため、string型に直す必要がある為
ログイン
async def db_login(data: dict) -> str: # jwtトークンに返すためにstrにしている
email = data.get("email")
password = data.get("password")
user = await collection_user.find_one({"email": email})
# userが存在しない場合、ハッシュ化されたパスワードが一致しない場合
if not user or not auth.verify_pw(password, user["password"]):
raise HTTPException(
status_code=401, detail='Invalid email or password'
)
# 問題なければ
token = auth.encode_jwt(user['email']) # jwtを生成
return token
route_auth.py
routeフォルダの中に
route_auth.pyファイルを作る
from fastapi import APIRouter
from fastapi import Response, Request
from fastapi.encoders import jsonable_encoder
from schemas import UserBody, SuccessMsg, UserInfo
from database import (
db_signup,
db_login,
)
from auth_utils import AuthJwtCsrt
router = APIRouter()
auth = AuthJwtCsrt()
# 新規登録
@router.post("/api/register", response_model=UserInfo)
async def signup(user: UserBody):
user = jsonable_encoder(user) # dict型に変換
new_user = await db_signup(user)
return new_user
# ログイン
@router.post("/api/login", response_model=SuccessMsg)
async def login(response: Response, user:UserBody):
user = jsonable_encoder(user) # dict型に変換
token = await db_login(user)
response.set_cookie( # クッキーに設定
key="access_token", value=f"Bearer {token}", httponly=True, samesite="none", secure=True
)
return {"message": "Successfully logged-in"}
main.pyファイルに
app.include_router(route_auth.router)を追加
※pip install bcrypt==3.2.0をする。
pip freeze -> requirements.txtで追記
schemas.py
# フロントから送られる型
class UserBody(BaseModel):
email: str
password: str
# 複数のエンドポイントで使う
class UserInfo(BaseModel):
id: Optional[str] = None # Optionalで任意の値
email: str
-
from typing import Optionalのimport文を追記 - OptionalはNoneで返ってくる値に使う
## 参考