0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Keycloakを用いたJWT Secured Authorization Response Mode for OAuth 2.0 (JARM) 検証方法 メモ

Posted at

main.py 修正

  • 認可リクエスト時にresponse_mode=query.jwtを指定し、受け取った認可レスポンスJWTをデコード・検証する処理を追加

    import ast
    import json
    import urllib.parse as parse
    import urllib.request as request
    import urllib.error as error
    import os
    import requests
    import uvicorn
    from fastapi import FastAPI
    from starlette.requests import Request
    from starlette.responses import RedirectResponse
    import base64
    from http import HTTPStatus
    import re
    from base64 import b64decode
    import jwt
    from cryptography.hazmat.primitives import serialization
    import hashlib
    from pprint import pprint
    
    # ENV
    # For Test App(FastAPI)
    APP_BASE_URL = os.getenv("APP_BASE_URL")
    APP_CLIENT_ID = os.getenv("APP_CLIENT_ID")
    APP_CLIENT_SECRET = os.getenv('APP_CLIENT_SECRET')
    APP_REDIRECT_URI = os.getenv('APP_REDIRECT_URI')
    
    
    # For Keycloak
    KEYCLOAK_BASE_URL_LOCALHOST = os.getenv("KEYCLOAK_BASE_URL_LOCALHOST")
    KEYCLOAK_REALM_NAME = os.getenv("KEYCLOAK_REALM_NAME")
    KEYCLOAK_BASE_URL_CONTAINER_NAME = os.getenv(
        "KEYCLOAK_BASE_URL_CONTAINER_NAME")
    # Authorization Endpoint
    KEYCLOAK_AUTH_BASE_URL = (
        f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}"
        "/protocol/openid-connect/auth"
    )
    # Token Endpoint
    # ※Setting Container Name Because Communication between Conntainers
    KEYCLOAK_TOKEN_URL = (
        f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}"
        "/protocol/openid-connect/token"
    )
    
    app = FastAPI()
    
    # Redirection To Keycloak Authorization Endpoint
    @app.get("/auth/login")
    async def login() -> RedirectResponse:
        # Generate State
        state = hashlib.sha256(os.urandom(32)).hexdigest()
        # For PKCE
        # Generate code_verifier
        code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
        code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
        # Generate code_challenge
        code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
        code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
        code_challenge = code_challenge.replace('=', '')
    
        # Redirection to Keycloak Authorization Endpoint
        KEYCLOAK_AUTH_URL = KEYCLOAK_AUTH_BASE_URL + '?{}'.format(parse.urlencode({
            'client_id': APP_CLIENT_ID,
            'redirect_uri': APP_REDIRECT_URI,
            'state': state,
            'scope': 'openid',
            'response_type': 'code',
            'response_mode': 'query.jwt', # New
            'code_challenge':code_challenge,
            'code_challenge_method':'S256'
        }))
        
        response = RedirectResponse(KEYCLOAK_AUTH_URL)
        # Save state、code_verifier. ※temporary solution
        response.set_cookie(key="AUTH_STATE", value=state)
        response.set_cookie(key="AUTH_CODE_VERIFIER", value=code_verifier)
        return response
    
    
    # Token Request
    def get_token(code,code_verifier):
    
        params = {
            'client_id': APP_CLIENT_ID,
            'client_secret': APP_CLIENT_SECRET,
            'grant_type': 'authorization_code',
            'redirect_uri': APP_REDIRECT_URI,
            'code': code,
            'code_verifier': code_verifier
        }
        x = requests.post(KEYCLOAK_TOKEN_URL, params, verify=False).content.decode('utf-8')
        pprint(json.loads(x))
        access_token = ast.literal_eval(x)['access_token']
        id_token = ast.literal_eval(x)['id_token']
        username = get_user_name_from_id_token(id_token)
        return ast.literal_eval(x)
    
    # Getting username From id_token
    def get_user_name_from_id_token(id_token):
        public_key = get_public_key()
        payload = jwt.decode(id_token, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID)
        pprint("username:{}".format(payload['preferred_username']))
        return payload['preferred_username']
    
    # Redirection Endpoint
    # Get JWT Authorization Response
    # Validate JWT and Requesting Token Endpoint
    @app.get("/auth/callback")
    async def callback(request: Request, response: str) -> RedirectResponse:
        # Get Public Key From Keycloak
        public_key = get_public_key()
        
        # Decode Authorization response(JWT) 
        payload = jwt.decode(response, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID)
        pprint(payload)
    
        # Validate iss
        if payload['iss'] != f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}":
            return {"error": "iss_verification_failed"}
        # Validate state
        if payload['state'] != request.cookies.get("AUTH_STATE"):
            return {"error": "state_verification_failed"}
        return get_token(payload['code'],  request.cookies.get("AUTH_CODE_VERIFIER"))
    
    #Get Public Key From JWKS Endpoint
    def get_public_key():
        r = requests.get(f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}/")
        r.raise_for_status()
        key_der_base64 = r.json()["public_key"]
        pprint(key_der_base64)
        key_der = b64decode(key_der_base64.encode())
        public_key = serialization.load_der_public_key(key_der)
        return public_key
    
    if __name__ == "__main__":
        uvicorn.run(app, port=8000, loop="asyncio")
    
    

KeyCloak準備

  • コンテナを起動する

    docker-compose up
    
  • Keycloak Admin コンソールにアクセスする。

    http://localhost:8080
    

    ※ログイン情報はdocker-composeに記載

  • クライアント(Clients)を登録する。

    • リダイレクトURIには、"http://localhost:8000/auth/callback"を指定

    • app.envを以下のように指定

      APP_BASE_URL=http://localhost:8000/
      APP_CLIENT_ID=python-client
      APP_CLIENT_SECRET={YOUR_CLIENT_SECRET}
      APP_REDIRECT_URI=http://localhost:8000/auth/callback
      KEYCLOAK_REALM_NAME=master
      KEYCLOAK_BASE_URL_LOCALHOST=http://localhost:8080/
      KEYCLOAK_BASE_URL_CONTAINER_NAME=http://keycloak:8080/
      
  • テストユーザー(Users)を登録する。

  • コンテナを再起動する。

    docker-compose down
    docker-compose build
    docker-compose up
    

動作確認

  • Keycloak Authorization Endpointリダイレクト用エンドポイントにアクセスする。

    http://localhost:8000/auth/login
    
    • 以下のようにresponse_mode=query.jwtをクエリに付与してKeycloak Authorization Endpointにリダイレクトされる。

      http://localhost:8080/auth/realms/master/protocol/openid-connect/auth?client_id=python-client&redirect_uri=http%3A%2F%2Flocalhost%3A8000%2Fauth%2Fcallback&state=f8faac9d97a8fd539a4c2357d3fc8f8e6e12c09ff2706a1871a4eaf71345333c&scope=openid&response_type=code&response_mode=query.jwt&code_challenge=6C3sPbUlb54W0ICg2O3NdRM9Id03yT-tBv-8PU0q20M&code_challenge_method=S256
      
  • ユーザー認証を行う。

  • 次のようにKeycloakからテスト用アプリのリダイレクトURIにリダイレクトされる。

    code=...ではなく、response=...となる。

    http://localhost:8000/auth/callback?response=eyJhbGciOiJSUzI...4eMb8bzg
    
    • responseペイロード

      {
        "exp": 1643510092,
        "iss": "http://localhost:8080/auth/realms/master",
        "aud": "python-client",
        "code": "...",
        "state": "f8faac9d97a8fd539a4c2357d3fc8f8e6e12c09ff2706a1871a4eaf71345333c",
        "session_state": "61f4589d-15ac-40fa-8d4d-31e578866561"
      }
      

参考情報

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?