0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Keycloakを用いたOpenID Connect nonceパラメータ検証 メモ

Posted at
  • Keycloakを使ってnonceパラメータを使ったリプレイ攻撃対策を試してみた。メモとして残しておく。

  • **過去に作成したコード**を修正して構築する。

    • 変更点を主として記述する。
    • docker-composeなどの設定ファイルはそのまま利用する。

nonceパラメータとは

  • リプレイ攻撃(不正に取得したIDトークンを送りつけて、不正アクセスする攻撃)を防ぐためのパラメータ

nonce.png

main.py 修正

  • nonceを指定し、クライアント側で保存した後に認可リクエストを行う。

  • Keycloak(認可サーバー)側から取得したIDトークンに含まれるnonceとクライアント側で保持するnonceが一致することを確認する。

    import ast
    import json
    import urllib.parse as parse
    import urllib.request as request
    import urllib.error as error
    import os
    import requests
    import uvicorn
    from fastapi import FastAPI
    from starlette.requests import Request
    from starlette.responses import RedirectResponse
    import base64
    from http import HTTPStatus
    import re
    from base64 import b64decode
    import jwt
    from cryptography.hazmat.primitives import serialization
    import hashlib
    from pprint import pprint
    
    # ENV
    # For Test App(FastAPI)
    APP_BASE_URL = os.getenv("APP_BASE_URL")
    APP_CLIENT_ID = os.getenv("APP_CLIENT_ID")
    APP_CLIENT_SECRET = os.getenv('APP_CLIENT_SECRET')
    APP_REDIRECT_URI = os.getenv('APP_REDIRECT_URI')
    
    # For Keycloak
    KEYCLOAK_BASE_URL_LOCALHOST = os.getenv("KEYCLOAK_BASE_URL_LOCALHOST")
    KEYCLOAK_REALM_NAME = os.getenv("KEYCLOAK_REALM_NAME")
    KEYCLOAK_BASE_URL_CONTAINER_NAME = os.getenv(
        "KEYCLOAK_BASE_URL_CONTAINER_NAME")
    # Authorization Endpoint
    KEYCLOAK_AUTH_BASE_URL = (
        f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}"
        "/protocol/openid-connect/auth"
    )
    # Token Endpoint
    # ※Setting Container Name Because Communication between Conntainers
    KEYCLOAK_TOKEN_URL = (
        f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}"
        "/protocol/openid-connect/token"
    )
    
    app = FastAPI()
    
    # Redirection To Keycloak Authorization Endpoint
    @app.get("/auth/login")
    async def login() -> RedirectResponse:
        # Generate State
        state = hashlib.sha256(os.urandom(32)).hexdigest()
    
        # Generate Nonce
        nonce = hashlib.sha256(os.urandom(32)).hexdigest()
    
        # For PKCE
        # Generate code_verifier
        code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
        code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
        # Generate code_challenge
        code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
        code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
        code_challenge = code_challenge.replace('=', '')
    
        # Redirection to Keycloak Authorization Endpoint
        KEYCLOAK_AUTH_URL = KEYCLOAK_AUTH_BASE_URL + '?{}'.format(parse.urlencode({
            'client_id': APP_CLIENT_ID,
            'redirect_uri': APP_REDIRECT_URI,
            'state': state,
            'nonce': nonce, #New
            'scope': 'openid',
            'response_type': 'code',
            'response_mode': 'query.jwt',
            'code_challenge':code_challenge,
            'code_challenge_method':'S256'
        }))
        
        response = RedirectResponse(KEYCLOAK_AUTH_URL)
        # Save state, nonce, code_verifier. ※temporary solution
        response.set_cookie(key="AUTH_STATE", value=state)
        response.set_cookie(key="AUTH_NONCE", value=nonce)
        response.set_cookie(key="AUTH_CODE_VERIFIER", value=code_verifier)
        return response
    
    
    # Token Request
    def get_token(code,code_verifier):
    
        params = {
            'client_id': APP_CLIENT_ID,
            'client_secret': APP_CLIENT_SECRET,
            'grant_type': 'authorization_code',
            'redirect_uri': APP_REDIRECT_URI,
            'code': code,
            'code_verifier': code_verifier
        }
        x = requests.post(KEYCLOAK_TOKEN_URL, params, verify=False).content.decode('utf-8')
        pprint(json.loads(x))
        token_response = ast.literal_eval(x)
        pprint(token_response['id_token'])
        return token_response
    
    # (New)ID Token Validation and Getting payload 
    def get_id_token_payload(id_token: str, request: Request):
        public_key = get_public_key()
        payload = jwt.decode(id_token, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID)
        # Validating Nonce (Cookie and ID token's payload)
        if payload['nonce'] != request.cookies.get("AUTH_NONCE"):
            return {"error": "nonce_verification_failed"}
        pprint("id_token payload:{}".format(payload))
        return payload
        
    
    # Redirection Endpoint
    # Get JWT Authorization Response
    # Validate JWT and Requesting Token Endpoint
    @app.get("/auth/callback")
    async def callback(request: Request, response: str) -> RedirectResponse:
        # Get Public Key From Keycloak
        public_key = get_public_key()
        
        # Decode Authorization response(JWT) 
        payload = jwt.decode(response, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID)
        pprint(payload)
    
        # Validate iss
        if payload['iss'] != f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}":
            return {"error": "iss_verification_failed"}
        # Validate state
        if payload['state'] != request.cookies.get("AUTH_STATE"):
            return {"error": "state_verification_failed"}
        # Token Request
        token_response = get_token(payload['code'],  request.cookies.get("AUTH_CODE_VERIFIER"))
        # ID Token Validation and Getting payload 
        id_token_payload =  get_id_token_payload(token_response['id_token'], request)
        return id_token_payload
    
    #Get Public Key From Keycloak's JWKS Endpoint
    def get_public_key():
        r = requests.get(f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}/")
        r.raise_for_status()
        key_der_base64 = r.json()["public_key"]
        pprint(key_der_base64)
        key_der = b64decode(key_der_base64.encode())
        public_key = serialization.load_der_public_key(key_der)
        return public_key
    
    if __name__ == "__main__":
        uvicorn.run(app, port=8000, loop="asyncio")
    
    

Keycloak準備

  • コンテナを起動する

    docker-compose up
    
  • Keycloak Admin コンソールにアクセスする。

    http://localhost:8080
    

    ※ログイン情報はdocker-composeに記載

  • クライアント(Clients)を登録する。

    • リダイレクトURIには、"http://localhost:8000/auth/callback"を指定

    • app.envを以下のように指定

      APP_BASE_URL=http://localhost:8000/
      APP_CLIENT_ID=python-client
      APP_CLIENT_SECRET={YOUR_CLIENT_SECRET}
      APP_REDIRECT_URI=http://localhost:8000/auth/callback
      KEYCLOAK_REALM_NAME=master
      KEYCLOAK_BASE_URL_LOCALHOST=http://localhost:8080/
      KEYCLOAK_BASE_URL_CONTAINER_NAME=http://keycloak:8080/
      
  • テストユーザー(Users)を登録する。

  • コンテナを再起動する。

    docker-compose down
    docker-compose build
    docker-compose up
    

動作確認

  • テストアプリ(Python/FastAPI)のKeycloak Authorization Endpointへのリダイレクト用エンドポイントにアクセスする。

    http://localhost:8000/auth/login
    
    • 以下のようにnonceパラメータをクエリに付与してKeycloak Authorization Endpointにリダイレクトされる。

      http://localhost:8080/auth/realms/master/protocol/openid-connect/auth?client_id=python-client&redirect_uri=http%3A%2F%2Flocalhost%3A8000%2Fauth%2Fcallback&state=3b2b2eb6b965661e3a1107c40c5744e4a5a9d75d5c8c3dbafc6596707ca0c91d&nonce=27e1700b2f17baf91ba11af2c83014f25a9b4ac035e5c3ee475037427efc0934&scope=openid&response_type=code&response_mode=query.jwt&code_challenge=jOpKGQU7XXN14XbMkPc6ux8Pn6DuuGE_4paLQ3nzkY0&code_challenge_method=S256
      
  • ユーザー認証を行う。

  • 次のようにKeycloakからテスト用アプリのリダイレクトURIにリダイレクトされ、テスト用アプリはトークンリクエストを行い、IDトークン取得+nonce検証を行う。

    http://localhost:8000/auth/callback?response=...
    
    • id_tokenペイロード※認可リクエストに指定したnonceと同じ値が含まれている。

      {
      "exp": 1644552176,
      "iat": 1644552116,
      "auth_time": 1644552115,
      "jti": "ffe97609-2bb9-4f8e-9334-0cc7fe38c84c",
      "iss": "http://localhost:8080/auth/realms/master",
      "aud": "python-client",
      "sub": "...",
      "typ": "ID",
      "azp": "python-client",
      "nonce": "27e1700b2f17baf91ba11af2c83014f25a9b4ac035e5c3ee475037427efc0934",
      "session_state": "b0eb4b70-8c5c-4679-bfca-0bce660f21de",
      "at_hash": "-pNOX4FWOMiUmAP4ANxCYw",
      "acr": "1",
      "sid": "b0eb4b70-8c5c-4679-bfca-0bce660f21de",
      "email_verified": false,
      "preferred_username": "..."
      }
      

      ※クライアントで管理するnonceとIDトークンに含まれるnonceが一致する場合、ブラウザに上記のようなIDトークンペイロードが表示される。

参考情報

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?