先日、JWT Secured Authorization Response Mode for OAuth 2.0 (JARM)を調べたため、Keycloakを使って試してみた。メモとして残しておく。
-
過去に作成したコードを修正して検証する。
- 変更点を主として記述する。
- docker-composeなどの設定ファイルはそのまま利用する。
main.py 修正
- 認可リクエスト時に
response_mode=query.jwt
を指定し、受け取った認可レスポンスJWTをデコード・検証する処理を追加
import ast
import json
import urllib.parse as parse
import urllib.request as request
import urllib.error as error
import os
import requests
import uvicorn
from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import RedirectResponse
import base64
from http import HTTPStatus
import re
from base64 import b64decode
import jwt
from cryptography.hazmat.primitives import serialization
import hashlib
from pprint import pprint
# ENV
# For Test App(FastAPI)
APP_BASE_URL = os.getenv("APP_BASE_URL")
APP_CLIENT_ID = os.getenv("APP_CLIENT_ID")
APP_CLIENT_SECRET = os.getenv('APP_CLIENT_SECRET')
APP_REDIRECT_URI = os.getenv('APP_REDIRECT_URI')
# For Keycloak
KEYCLOAK_BASE_URL_LOCALHOST = os.getenv("KEYCLOAK_BASE_URL_LOCALHOST")
KEYCLOAK_REALM_NAME = os.getenv("KEYCLOAK_REALM_NAME")
KEYCLOAK_BASE_URL_CONTAINER_NAME = os.getenv(
"KEYCLOAK_BASE_URL_CONTAINER_NAME")
# Authorization Endpoint
KEYCLOAK_AUTH_BASE_URL = (
f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}"
"/protocol/openid-connect/auth"
)
# Token Endpoint
# ※Setting Container Name Because Communication between Conntainers
KEYCLOAK_TOKEN_URL = (
f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}"
"/protocol/openid-connect/token"
)
app = FastAPI()
# Redirection To Keycloak Authorization Endpoint
@app.get("/auth/login")
async def login() -> RedirectResponse:
# Generate State
state = hashlib.sha256(os.urandom(32)).hexdigest()
# For PKCE
# Generate code_verifier
code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
# Generate code_challenge
code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
code_challenge = code_challenge.replace('=', '')
# Redirection to Keycloak Authorization Endpoint
KEYCLOAK_AUTH_URL = KEYCLOAK_AUTH_BASE_URL + '?{}'.format(parse.urlencode({
'client_id': APP_CLIENT_ID,
'redirect_uri': APP_REDIRECT_URI,
'state': state,
'scope': 'openid',
'response_type': 'code',
'response_mode': 'query.jwt', # New
'code_challenge':code_challenge,
'code_challenge_method':'S256'
}))
response = RedirectResponse(KEYCLOAK_AUTH_URL)
# Save state、code_verifier. ※temporary solution
response.set_cookie(key="AUTH_STATE", value=state)
response.set_cookie(key="AUTH_CODE_VERIFIER", value=code_verifier)
return response
# Token Request
def get_token(code,code_verifier):
params = {
'client_id': APP_CLIENT_ID,
'client_secret': APP_CLIENT_SECRET,
'grant_type': 'authorization_code',
'redirect_uri': APP_REDIRECT_URI,
'code': code,
'code_verifier': code_verifier
}
x = requests.post(KEYCLOAK_TOKEN_URL, params, verify=False).content.decode('utf-8')
pprint(json.loads(x))
access_token = ast.literal_eval(x)['access_token']
id_token = ast.literal_eval(x)['id_token']
username = get_user_name_from_id_token(id_token)
return ast.literal_eval(x)
# Getting username From id_token
def get_user_name_from_id_token(id_token):
public_key = get_public_key()
payload = jwt.decode(id_token, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID)
pprint("username:{}".format(payload['preferred_username']))
return payload['preferred_username']
# Redirection Endpoint
# Get JWT Authorization Response
# Validate JWT and Requesting Token Endpoint
@app.get("/auth/callback")
async def callback(request: Request, response: str) -> RedirectResponse:
# Get Public Key From Keycloak
public_key = get_public_key()
# Decode Authorization response(JWT)
payload = jwt.decode(response, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID)
pprint(payload)
# Validate iss
if payload['iss'] != f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}":
return {"error": "iss_verification_failed"}
# Validate state
if payload['state'] != request.cookies.get("AUTH_STATE"):
return {"error": "state_verification_failed"}
return get_token(payload['code'], request.cookies.get("AUTH_CODE_VERIFIER"))
#Get Public Key From JWKS Endpoint
def get_public_key():
r = requests.get(f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}/")
r.raise_for_status()
key_der_base64 = r.json()["public_key"]
pprint(key_der_base64)
key_der = b64decode(key_der_base64.encode())
public_key = serialization.load_der_public_key(key_der)
return public_key
if __name__ == "__main__":
uvicorn.run(app, port=8000, loop="asyncio")
KeyCloak準備
- コンテナを起動する
docker-compose up
- Keycloak Admin コンソールにアクセスする。
http://localhost:8080
※ログイン情報はdocker-compose
に記載
-
クライアント(
Clients
)を登録する。- リダイレクトURIには、"http://localhost:8000/auth/callback"を指定
-
app.env
を以下のように指定
APP_BASE_URL=http://localhost:8000/ APP_CLIENT_ID=python-client APP_CLIENT_SECRET={YOUR_CLIENT_SECRET} APP_REDIRECT_URI=http://localhost:8000/auth/callback KEYCLOAK_REALM_NAME=master KEYCLOAK_BASE_URL_LOCALHOST=http://localhost:8080/ KEYCLOAK_BASE_URL_CONTAINER_NAME=http://keycloak:8080/
テストユーザー(
Users
)を登録する。コンテナを再起動する。
docker-compose down
docker-compose build
docker-compose up
動作確認
- Keycloak Authorization Endpointリダイレクト用エンドポイントにアクセスする。
http://localhost:8000/auth/login
-
以下のように
response_mode=query.jwt
をクエリに付与してKeycloak Authorization Endpointにリダイレクトされる。http://localhost:8080/auth/realms/master/protocol/openid-connect/auth?client_id=python-client&redirect_uri=http%3A%2F%2Flocalhost%3A8000%2Fauth%2Fcallback&state=f8faac9d97a8fd539a4c2357d3fc8f8e6e12c09ff2706a1871a4eaf71345333c&scope=openid&response_type=code&response_mode=query.jwt&code_challenge=6C3sPbUlb54W0ICg2O3NdRM9Id03yT-tBv-8PU0q20M&code_challenge_method=S256
- ユーザー認証を行う。
- 次のようにKeycloakからテスト用アプリのリダイレクトURIにリダイレクトされる。
※code=...
ではなく、response=...
となる。
http://localhost:8000/auth/callback?response=eyJhbGciOiJSUzI...4eMb8bzg
-
response
ペイロード{ "exp": 1643510092, "iss": "http://localhost:8080/auth/realms/master", "aud": "python-client", "code": "...", "state": "f8faac9d97a8fd539a4c2357d3fc8f8e6e12c09ff2706a1871a4eaf71345333c", "session_state": "61f4589d-15ac-40fa-8d4d-31e578866561" }