はじめに
Pythonを使ったwebアプリにJWT(JSON Web Token)認証を導入する手順を簡潔にまとめてみました。
まだ実装したことがない方の参考になれば嬉しいです。
(ユーザー登録・パスワード認証は実装されている前提で進めさせていただきます)
JWT認証って何がいいの?
JWTは「認証が成功したあとの認証状態を保つ仕組み」です。
・ログインした人ごとに サーバー側でセッション情報を保存・共有(Redisとか)する必要がない
・サーバーを複数立てても どのサーバーでも同じトークンを検証できるため、スケールしやすい(負荷分散が楽)
・改ざんできない(署名付き)
といったメリットがあります。
もっと詳しく知りたい方は、こちらの記事がおすすめです。
JWT認証の流れを理解する
1. 必要なライブラリをインストール
pip install PyJWT
(PythonでJWT(JSON Web Token)を簡単に生成・検証できるライブラリ)
「requirements.txt」を使用する場合は、
PyJWT
COPY requirements.txt .
RUN pip install -r requirements.txt
2. JWT関連の共通設定
JWTトークンを扱うときの、共通の設定を記述します。
import datetime
SECRET_KEY = ...
# 以下を追加
JWT_SECRET_KEY = SECRET_KEY
JWT_ALGORITHM = 'HS256'
ACCESS_TOKEN_KEY = 'jwt_access'
REFRESH_TOKEN_KEY = 'jwt_refresh'
ACCESS_TOKEN_LIFETIME = datetime.timedelta(minutes=5)
REFRESH_TOKEN_LIFETIME = datetime.timedelta(days=7)
3. JWT関連のモジュール
JWT関連のユーティリティ関数をまとめたモジュールを作成します。
import jwt
from django.conf import settings
from datetime import datetime
from django.http import JsonResponse
from jwt.exceptions import ExpiredSignatureError, InvalidTokenError
def generate_jwt(payload, lifetime):
exp = datetime.utcnow() + lifetime
payload.update({"exp": exp})
token = jwt.encode(payload, settings.JWT_SECRET_KEY, algorithm=settings.JWT_ALGORITHM)
return token
def decode_jwt(token):
return jwt.decode(token, settings.JWT_SECRET_KEY, algorithms=[settings.JWT_ALGORITHM])
def create_token_response(user_id, message):
response = JsonResponse({'message': message})
# access_token
access_token_lifetime = settings.ACCESS_TOKEN_LIFETIME
access_token = generate_jwt({'user_id': user_id}, access_token_lifetime)
access_expiry = datetime.utcnow() + access_token_lifetime
response.set_cookie(
settings.ACCESS_TOKEN_KEY,
access_token,
httponly=True,
secure=True,
samesite='Lax',
expires=access_expiry
)
# refresh_token
refresh_token_lifetime = settings.REFRESH_TOKEN_LIFETIME
refresh_token = generate_jwt({'user_id': user_id, 'type': 'refresh'}, refresh_token_lifetime)
refresh_expiry = datetime.utcnow() + refresh_token_lifetime
response.set_cookie(
settings.REFRESH_TOKEN_KEY,
refresh_token,
httponly=True,
secure=True,
samesite='Lax',
expires=refresh_expiry
)
return response
def delete_token_response(message):
response = JsonResponse({'message': message})
response.delete_cookie(settings.ACCESS_TOKEN_KEY)
response.delete_cookie(settings.REFRESH_TOKEN_KEY)
return response
4. ビュー関数に適用
ログイン時:POSTメソッドで送信された username と password を使ってログイン処理を行い、認証に成功すればトークン付きのレスポンスを返します。
ログアウト時:トークンを無効化してログアウト処理を実行します。
from django.views.decorators.http import require_http_methods
from django.http import HttpResponseBadRequest
from django.contrib.auth import authenticate, login, logout
from .utils import create_token_response, delete_token_response
@require_http_methods(["POST"])
def mylogin(request):
username = request.POST.get["username"]
password = request.POST.get["password"]
user = authenticate(request, username=username, password=password)
if user is None:
return HttpResponseBadRequest()
login(request, user)
return create_token_response(request.user.id, "OK")
@login_required
@require_http_methods(["POST"])
def mylogout(request):
request.user.is_login = False
request.user.save()
logout(request)
return delete_token_response("OK")
5. カスタムミドルウェアの作成
JWTを使った認証処理をリクエストとレスポンスの間に挟んで自動で制御できるようにします。
MIDDLEWARE = [
'middleware.JWTAuthenticationMiddleware'
]
import jwt
from datetime import datetime
from django.conf import settings
from django.contrib.auth import get_user_model, logout
from django.utils.deprecation import MiddlewareMixin
from django.contrib.auth.models import AnonymousUser
from .utils import decode_jwt, generate_jwt
from django.shortcuts import render
User = get_user_model()
EXCLUDED_PATHS = [
'/',
]
class JWTAuthenticationMiddleware(MiddlewareMixin):
def process_request(self, request):
if request.path in EXCLUDED_PATHS:
return
access_token = request.COOKIES.get(settings.ACCESS_TOKEN_KEY)
refresh_token = request.COOKIES.get(settings.REFRESH_TOKEN_KEY)
if access_token:
try:
payload = decode_jwt(access_token)
user = User.objects.get(id=payload["user_id"])
request.user = user
return
except jwt.ExpiredSignatureError:
# need to refresh token
pass
except (jwt.DecodeError, User.DoesNotExist):
self.set_user_logout(request)
return
if refresh_token:
try:
payload = decode_jwt(refresh_token)
if payload.get("type") != "refresh":
raise jwt.InvalidTokenError()
user = User.objects.get(id=payload["user_id"])
request.user = user
# create new access token
new_access_token = generate_jwt({'user_id': user.id}, settings.ACCESS_TOKEN_LIFETIME)
request.new_access_token = new_access_token
return
except (jwt.ExpiredSignatureError, jwt.DecodeError, jwt.InvalidTokenError, User.DoesNotExist):
self.set_user_logout(request)
return
self.set_user_logout(request)
def process_response(self, request, response):
if hasattr(request, 'new_access_token'):
access_token_lifetime = settings.ACCESS_TOKEN_LIFETIME
access_expiry = datetime.utcnow() + access_token_lifetime
response.set_cookie(
settings.ACCESS_TOKEN_KEY,
request.new_access_token,
httponly=True,
secure=True,
samesite='Lax',
expires=access_expiry
)
# delete cookie on logout
if getattr(request, 'clear_jwt_cookies', False):
response = render(request, 'index.html', status=401)
response.delete_cookie(settings.ACCESS_TOKEN_KEY)
response.delete_cookie(settings.REFRESH_TOKEN_KEY)
return response
def set_user_logout(self, request):
if request.user and hasattr(request.user, "id"):
try:
user = User.objects.get(id=request.user.id)
user.is_login = False
user.save(update_fields=["is_login"])
logout(request)
except User.DoesNotExist:
pass
except Exception as e:
print('error:', e)
request.user = AnonymousUser()
request.clear_jwt_cookies = True
おわりに
今回はDjangoでの例であり、Django REST Frameworkを使用する場合はdjangorestframework-simplejwt
を使った方法がネット上で多く紹介されているのでそちらを参考にしてみてください。(現在はそちらが主流のようです)
認証まわりに関連して、QRcodeを使用したTOTP(Time-based One-Time Password)を導入する記事も書きましたので、ぜひそちらも合わせて御覧ください!
PythonでQRコード認証(TOTP)を実装する