LoginSignup
0
0

More than 1 year has passed since last update.

Keycloakを用いたJWT Secured Authorization Response Mode for OAuth 2.0 (JARM) 検証方法 メモ

Posted at

main.py 修正

  • 認可リクエスト時にresponse_mode=query.jwtを指定し、受け取った認可レスポンスJWTをデコード・検証する処理を追加
  import ast
  import json
  import urllib.parse as parse
  import urllib.request as request
  import urllib.error as error
  import os
  import requests
  import uvicorn
  from fastapi import FastAPI
  from starlette.requests import Request
  from starlette.responses import RedirectResponse
  import base64
  from http import HTTPStatus
  import re
  from base64 import b64decode
  import jwt
  from cryptography.hazmat.primitives import serialization
  import hashlib
  from pprint import pprint

  # ENV
  # For Test App(FastAPI)
  APP_BASE_URL = os.getenv("APP_BASE_URL")
  APP_CLIENT_ID = os.getenv("APP_CLIENT_ID")
  APP_CLIENT_SECRET = os.getenv('APP_CLIENT_SECRET')
  APP_REDIRECT_URI = os.getenv('APP_REDIRECT_URI')


  # For Keycloak
  KEYCLOAK_BASE_URL_LOCALHOST = os.getenv("KEYCLOAK_BASE_URL_LOCALHOST")
  KEYCLOAK_REALM_NAME = os.getenv("KEYCLOAK_REALM_NAME")
  KEYCLOAK_BASE_URL_CONTAINER_NAME = os.getenv(
      "KEYCLOAK_BASE_URL_CONTAINER_NAME")
  # Authorization Endpoint
  KEYCLOAK_AUTH_BASE_URL = (
      f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}"
      "/protocol/openid-connect/auth"
  )
  # Token Endpoint
  # ※Setting Container Name Because Communication between Conntainers
  KEYCLOAK_TOKEN_URL = (
      f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}"
      "/protocol/openid-connect/token"
  )

  app = FastAPI()

  # Redirection To Keycloak Authorization Endpoint
  @app.get("/auth/login")
  async def login() -> RedirectResponse:
      # Generate State
      state = hashlib.sha256(os.urandom(32)).hexdigest()
      # For PKCE
      # Generate code_verifier
      code_verifier = base64.urlsafe_b64encode(os.urandom(40)).decode('utf-8')
      code_verifier = re.sub('[^a-zA-Z0-9]+', '', code_verifier)
      # Generate code_challenge
      code_challenge = hashlib.sha256(code_verifier.encode('utf-8')).digest()
      code_challenge = base64.urlsafe_b64encode(code_challenge).decode('utf-8')
      code_challenge = code_challenge.replace('=', '')

      # Redirection to Keycloak Authorization Endpoint
      KEYCLOAK_AUTH_URL = KEYCLOAK_AUTH_BASE_URL + '?{}'.format(parse.urlencode({
          'client_id': APP_CLIENT_ID,
          'redirect_uri': APP_REDIRECT_URI,
          'state': state,
          'scope': 'openid',
          'response_type': 'code',
          'response_mode': 'query.jwt', # New
          'code_challenge':code_challenge,
          'code_challenge_method':'S256'
      }))

      response = RedirectResponse(KEYCLOAK_AUTH_URL)
      # Save state、code_verifier. ※temporary solution
      response.set_cookie(key="AUTH_STATE", value=state)
      response.set_cookie(key="AUTH_CODE_VERIFIER", value=code_verifier)
      return response


  # Token Request
  def get_token(code,code_verifier):

      params = {
          'client_id': APP_CLIENT_ID,
          'client_secret': APP_CLIENT_SECRET,
          'grant_type': 'authorization_code',
          'redirect_uri': APP_REDIRECT_URI,
          'code': code,
          'code_verifier': code_verifier
      }
      x = requests.post(KEYCLOAK_TOKEN_URL, params, verify=False).content.decode('utf-8')
      pprint(json.loads(x))
      access_token = ast.literal_eval(x)['access_token']
      id_token = ast.literal_eval(x)['id_token']
      username = get_user_name_from_id_token(id_token)
      return ast.literal_eval(x)

  # Getting username From id_token
  def get_user_name_from_id_token(id_token):
      public_key = get_public_key()
      payload = jwt.decode(id_token, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID)
      pprint("username:{}".format(payload['preferred_username']))
      return payload['preferred_username']

  # Redirection Endpoint
  # Get JWT Authorization Response
  # Validate JWT and Requesting Token Endpoint
  @app.get("/auth/callback")
  async def callback(request: Request, response: str) -> RedirectResponse:
      # Get Public Key From Keycloak
      public_key = get_public_key()

      # Decode Authorization response(JWT) 
      payload = jwt.decode(response, public_key, algorithms=["RS256"],audience=APP_CLIENT_ID)
      pprint(payload)

      # Validate iss
      if payload['iss'] != f"{KEYCLOAK_BASE_URL_LOCALHOST}auth/realms/{KEYCLOAK_REALM_NAME}":
          return {"error": "iss_verification_failed"}
      # Validate state
      if payload['state'] != request.cookies.get("AUTH_STATE"):
          return {"error": "state_verification_failed"}
      return get_token(payload['code'],  request.cookies.get("AUTH_CODE_VERIFIER"))

  #Get Public Key From JWKS Endpoint
  def get_public_key():
      r = requests.get(f"{KEYCLOAK_BASE_URL_CONTAINER_NAME}auth/realms/{KEYCLOAK_REALM_NAME}/")
      r.raise_for_status()
      key_der_base64 = r.json()["public_key"]
      pprint(key_der_base64)
      key_der = b64decode(key_der_base64.encode())
      public_key = serialization.load_der_public_key(key_der)
      return public_key

  if __name__ == "__main__":
      uvicorn.run(app, port=8000, loop="asyncio")

KeyCloak準備

  • コンテナを起動する
  docker-compose up
  • Keycloak Admin コンソールにアクセスする。
  http://localhost:8080

※ログイン情報はdocker-composeに記載

  • クライアント(Clients)を登録する。

    APP_BASE_URL=http://localhost:8000/
    APP_CLIENT_ID=python-client
    APP_CLIENT_SECRET={YOUR_CLIENT_SECRET}
    APP_REDIRECT_URI=http://localhost:8000/auth/callback
    KEYCLOAK_REALM_NAME=master
    KEYCLOAK_BASE_URL_LOCALHOST=http://localhost:8080/
    KEYCLOAK_BASE_URL_CONTAINER_NAME=http://keycloak:8080/
    
  • テストユーザー(Users)を登録する。

  • コンテナを再起動する。

  docker-compose down
  docker-compose build
  docker-compose up

動作確認

  • Keycloak Authorization Endpointリダイレクト用エンドポイントにアクセスする。
  http://localhost:8000/auth/login
  • 以下のようにresponse_mode=query.jwtをクエリに付与してKeycloak Authorization Endpointにリダイレクトされる。

    http://localhost:8080/auth/realms/master/protocol/openid-connect/auth?client_id=python-client&redirect_uri=http%3A%2F%2Flocalhost%3A8000%2Fauth%2Fcallback&state=f8faac9d97a8fd539a4c2357d3fc8f8e6e12c09ff2706a1871a4eaf71345333c&scope=openid&response_type=code&response_mode=query.jwt&code_challenge=6C3sPbUlb54W0ICg2O3NdRM9Id03yT-tBv-8PU0q20M&code_challenge_method=S256
    
    • ユーザー認証を行う。
    • 次のようにKeycloakからテスト用アプリのリダイレクトURIにリダイレクトされる。

code=...ではなく、response=...となる。

  http://localhost:8000/auth/callback?response=eyJhbGciOiJSUzI...4eMb8bzg
  • responseペイロード

    {
      "exp": 1643510092,
      "iss": "http://localhost:8080/auth/realms/master",
      "aud": "python-client",
      "code": "...",
      "state": "f8faac9d97a8fd539a4c2357d3fc8f8e6e12c09ff2706a1871a4eaf71345333c",
      "session_state": "61f4589d-15ac-40fa-8d4d-31e578866561"
    }
    

参考情報

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0