-
Keycloakを使って
nonceパラメータを使ったリプレイ攻撃対策を試してみた。メモとして残しておく。 -
**過去に作成したコード**を修正して構築する。
- 変更点を主として記述する。
- docker-composeなどの設定ファイルはそのまま利用する。
nonceパラメータとは
- リプレイ攻撃(不正に取得したIDトークンを送りつけて、不正アクセスする攻撃)を防ぐためのパラメータ
main.py 修正
-
nonceを指定し、クライアント側で保存した後に認可リクエストを行う。 -
Keycloak(認可サーバー)側から取得したIDトークンに含まれる
nonceとクライアント側で保持するnonceが一致することを確認する。import ast import json import urllib.parse as parse import urllib.request as request import urllib.error as error import os import requests import uvicorn from fastapi import FastAPI from starlette.requests import Request from starlette.responses import RedirectResponse import base64 from http import HTTPStatus import re from base64 import b64decode import jwt from cryptography.hazmat.primitives import serialization import hashlib from pprint import pprint # ENV # For Test App(FastAPI) APP_BASE_URL = os.getenv("APP_BASE_URL") APP_CLIENT_ID = os.getenv("APP_CLIENT_ID") APP_CLIENT_SECRET = os.getenv('APP_CLIENT_SECRET') APP_REDIRECT_URI = os.getenv('APP_REDIRECT_URI') # For Keycloak KEYCLOAK_BASE_URL_LOCALHOST = os.getenv("KEYCLOAK_BASE_URL_LOCALHOST") KEYCLOAK_REALM_NAME = os.getenv("KEYCLOAK_REALM_NAME") KEYCLOAK_BASE_URL_CONTAINER_NAME = os.getenv( "KEYCLOAK_BASE_URL_CONTAINER_NAME") # Authorization Endpoint KEYCLOAK_AUTH_BASE_URL = ( f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}" "/protocol/openid-connect/auth" ) # Token Endpoint # ※Setting Container Name Because Communication between Conntainers KEYCLOAK_TOKEN_URL = ( f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}" "/protocol/openid-connect/token" ) app = FastAPI() # Redirection To Keycloak Authorization Endpoint @app.get("/auth/login") async def login() -> RedirectResponse: # Generate State state = hashlib.sha256(os.urandom(32)).hexdigest() # Generate Nonce nonce = hashlib.sha256(os.urandom(32)).hexdigest() # For PKCE # Generate code_verifier code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8') code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier) # Generate code_challenge code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest() code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8') code_challenge = code_challenge.replace('=', '') # Redirection to Keycloak Authorization Endpoint KEYCLOAK_AUTH_URL = KEYCLOAK_AUTH_BASE_URL + '?{}'.format(parse.urlencode({ 'client_id': APP_CLIENT_ID, 'redirect_uri': APP_REDIRECT_URI, 'state': state, 'nonce': nonce, #New 'scope': 'openid', 'response_type': 'code', 'response_mode': 'query.jwt', 'code_challenge':code_challenge, 'code_challenge_method':'S256' })) response = RedirectResponse(KEYCLOAK_AUTH_URL) # Save state, nonce, code_verifier. ※temporary solution response.set_cookie(key="AUTH_STATE", value=state) response.set_cookie(key="AUTH_NONCE", value=nonce) response.set_cookie(key="AUTH_CODE_VERIFIER", value=code_verifier) return response # Token Request def get_token(code,code_verifier): params = { 'client_id': APP_CLIENT_ID, 'client_secret': APP_CLIENT_SECRET, 'grant_type': 'authorization_code', 'redirect_uri': APP_REDIRECT_URI, 'code': code, 'code_verifier': code_verifier } x = requests.post(KEYCLOAK_TOKEN_URL, params, verify=False).content.decode('utf-8') pprint(json.loads(x)) token_response = ast.literal_eval(x) pprint(token_response['id_token']) return token_response # (New)ID Token Validation and Getting payload def get_id_token_payload(id_token: str, request: Request): public_key = get_public_key() payload = jwt.decode(id_token, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID) # Validating Nonce (Cookie and ID token's payload) if payload['nonce'] != request.cookies.get("AUTH_NONCE"): return {"error": "nonce_verification_failed"} pprint("id_token payload:{}".format(payload)) return payload # Redirection Endpoint # Get JWT Authorization Response # Validate JWT and Requesting Token Endpoint @app.get("/auth/callback") async def callback(request: Request, response: str) -> RedirectResponse: # Get Public Key From Keycloak public_key = get_public_key() # Decode Authorization response(JWT) payload = jwt.decode(response, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID) pprint(payload) # Validate iss if payload['iss'] != f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}": return {"error": "iss_verification_failed"} # Validate state if payload['state'] != request.cookies.get("AUTH_STATE"): return {"error": "state_verification_failed"} # Token Request token_response = get_token(payload['code'], request.cookies.get("AUTH_CODE_VERIFIER")) # ID Token Validation and Getting payload id_token_payload = get_id_token_payload(token_response['id_token'], request) return id_token_payload #Get Public Key From Keycloak's JWKS Endpoint def get_public_key(): r = requests.get(f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}/") r.raise_for_status() key_der_base64 = r.json()["public_key"] pprint(key_der_base64) key_der = b64decode(key_der_base64.encode()) public_key = serialization.load_der_public_key(key_der) return public_key if __name__ == "__main__": uvicorn.run(app, port=8000, loop="asyncio")
Keycloak準備
-
コンテナを起動する
docker-compose up -
Keycloak Admin コンソールにアクセスする。
http://localhost:8080※ログイン情報は
docker-composeに記載 -
クライアント(
Clients)を登録する。-
リダイレクトURIには、"http://localhost:8000/auth/callback"を指定
-
app.envを以下のように指定APP_BASE_URL=http://localhost:8000/ APP_CLIENT_ID=python-client APP_CLIENT_SECRET={YOUR_CLIENT_SECRET} APP_REDIRECT_URI=http://localhost:8000/auth/callback KEYCLOAK_REALM_NAME=master KEYCLOAK_BASE_URL_LOCALHOST=http://localhost:8080/ KEYCLOAK_BASE_URL_CONTAINER_NAME=http://keycloak:8080/
-
-
テストユーザー(
Users)を登録する。 -
コンテナを再起動する。
docker-compose down docker-compose build docker-compose up
動作確認
-
テストアプリ(Python/FastAPI)のKeycloak Authorization Endpointへのリダイレクト用エンドポイントにアクセスする。
http://localhost:8000/auth/login-
以下のように
nonceパラメータをクエリに付与してKeycloak Authorization Endpointにリダイレクトされる。http://localhost:8080/auth/realms/master/protocol/openid-connect/auth?client_id=python-client&redirect_uri=http%3A%2F%2Flocalhost%3A8000%2Fauth%2Fcallback&state=3b2b2eb6b965661e3a1107c40c5744e4a5a9d75d5c8c3dbafc6596707ca0c91d&nonce=27e1700b2f17baf91ba11af2c83014f25a9b4ac035e5c3ee475037427efc0934&scope=openid&response_type=code&response_mode=query.jwt&code_challenge=jOpKGQU7XXN14XbMkPc6ux8Pn6DuuGE_4paLQ3nzkY0&code_challenge_method=S256
-
-
ユーザー認証を行う。
-
次のようにKeycloakからテスト用アプリのリダイレクトURIにリダイレクトされ、テスト用アプリはトークンリクエストを行い、IDトークン取得+nonce検証を行う。
http://localhost:8000/auth/callback?response=...-
id_tokenペイロード※認可リクエストに指定したnonceと同じ値が含まれている。{ "exp": 1644552176, "iat": 1644552116, "auth_time": 1644552115, "jti": "ffe97609-2bb9-4f8e-9334-0cc7fe38c84c", "iss": "http://localhost:8080/auth/realms/master", "aud": "python-client", "sub": "...", "typ": "ID", "azp": "python-client", "nonce": "27e1700b2f17baf91ba11af2c83014f25a9b4ac035e5c3ee475037427efc0934", "session_state": "b0eb4b70-8c5c-4679-bfca-0bce660f21de", "at_hash": "-pNOX4FWOMiUmAP4ANxCYw", "acr": "1", "sid": "b0eb4b70-8c5c-4679-bfca-0bce660f21de", "email_verified": false, "preferred_username": "..." }※クライアントで管理する
nonceとIDトークンに含まれるnonceが一致する場合、ブラウザに上記のようなIDトークンペイロードが表示される。
-
