0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Keycloak の認証と認可 (FastAPI)

Last updated at Posted at 2025-05-09

こちらと同様のことを FastAPI で行いました。

ChatGPT に次の依頼をしました。

KeyCloak が 8080 で動いています。これを検証するために、
8081 で動くクライアントと、
8082 で動く API サーバーを作成して下さい。
FastAPI を使って下さい。

Keycloak の用意

Realm: myrealm

Client: myclient01(confidential、クライアントシークレットあり)

ユーザー名・パスワード:任意(例: scott / tiger)

クライアントプログラム

client_app.py
from fastapi import FastAPI, Form
from fastapi.responses import HTMLResponse
import requests
import sys

app = FastAPI()

KEYCLOAK_TOKEN_URL = "http://localhost:8080/realms/myrealm/protocol/openid-connect/token"
API_URL = "http://localhost:8082/protected"
CLIENT_ID = "myclient01"
CLIENT_SECRET = "U7NTfODXUhLd4K9JwAVYpTvX6ghlRIu6"

@app.get("/", response_class=HTMLResponse)
async def login_form():
    return """
    <form action="/login" method="post">
        Username: <input type="text" name="username"/><br>
        Password: <input type="password" name="password"/><br>
        <input type="submit" value="Login"/>
    </form>
    """

@app.post("/login")
async def login(username: str = Form(...), password: str = Form(...)):
    sys.stderr.write("*** check aaa ***\n")
    data = {
        "grant_type": "password",
        "client_id": CLIENT_ID,
        "client_secret": CLIENT_SECRET,
        "username": username,
        "password": password,
    }
    sys.stderr.write("*** check bbb ***\n")
    response = requests.post(KEYCLOAK_TOKEN_URL, data=data)
    sys.stderr.write("*** check ccc ***\n")
    if response.status_code != 200:
        return {"error": "ログイン失敗"}

    sys.stderr.write("*** check ddd ***\n")
    token = response.json()["access_token"]
    sys.stderr.write("*** check eee ***\n")
    api_response = requests.get(API_URL, headers={"Authorization": f"Bearer {token}"})
    sys.stderr.write("*** check fff ***\n")
    print("API response text:", api_response.text)
#    jx = api_response.json()
#    sys.stderr.write(jx + "\n")
    sys.stderr.write("*** check ggg ***\n")
    try:
        return {"api_response": api_response.json()}
    except Exception as e:
        return {
            "error": "API response is not valid JSON",
            "status_code": api_response.status_code,
            "text": api_response.text,
        }

実行方法

uvicorn client_app:app --port 8081

API サーバープログラム

jose を使う為に、Python の仮想環境を使います。

$ which python
/home/uchida/myenv/bin/python

ライブラリーのインストール

pip install fastapi uvicorn requests python-jose[cryptography]
api_server.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
import requests

app = FastAPI()

KEYCLOAK_URL = "http://localhost:8080"
REALM = "myrealm"
ALGORITHM = "RS256"
#AUDIENCE = "myclient01"
AUDIENCE = "account"

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# トークンの kid に一致する鍵を取得
def get_public_key(token: str):
    try:
        jwks_url = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/certs"
        jwks = requests.get(jwks_url).json()
        headers = jwt.get_unverified_header(token)
        kid = headers["kid"]

        for key in jwks["keys"]:
            if key["kid"] == kid:
                return key
        raise Exception("一致する kid の鍵が見つかりません")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"公開鍵の取得に失敗: {str(e)}")

@app.get("/protected")
async def protected(token: str = Depends(oauth2_scheme)):
    try:
        public_key = get_public_key(token)
        payload = jwt.decode(token, public_key, algorithms=[ALGORITHM], audience=AUDIENCE)
        return {"message": "アクセス成功", "username": payload.get("preferred_username")}
    except JWTError as e:
        raise HTTPException(status_code=401, detail=f"トークン検証失敗: {str(e)}")
    except Exception as e:
        raise HTTPException(status_code=500, detail=f"内部エラー: {str(e)}")

実行方法

uvicorn api_server:app --port 8082

ブラウザー

http://127.0.0.1:8081 にアクセス

image.png

ログイン成功

image.png

ログイン失敗

image.png

API サーバーのテストスクリプト

get_access_token.sh
URL="http://localhost:8080/realms/myrealm/protocol/openid-connect/token"
#
USR="scott"
PASSWORD="hello9"
#
http --form POST $URL \
	"grant_type=password" \
	"client_id=myclient01" \
	"client_secret=U7NTfODXUhLd4K9JwAVYpTvX6ghlRIu6" \
	"username="$USR \
	"password="$PASSWORD > access_token.json
api_server_check.sh
rm -f access_token.json
#
./get_access_token.sh
#
URL="http://localhost:8082/protected"
ACCESS_TOKEN=`jq -r .access_token access_token.json`
#
curl -H "Authorization: Bearer "$ACCESS_TOKEN $URL

実行結果

$ ./api_server_check.sh 
{"message":"アクセス成功","username":"scott"}

JWT の audience を確認する方法

audience_check.sh
rm -f access_token.json
./get_access_token.sh
#
ATOKEN=`jq -r .access_token access_token.json`
echo $ATOKEN  | cut -d "." -f2 | base64 -d | jq .

実行結果

$ ./audience_check.sh | jq .aud
"account"

API サーバーの

AUDIENCE = "account"

と一致している必要があります。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?