こちらと同様のことを FastAPI で行いました。
ChatGPT に次の依頼をしました。
KeyCloak が 8080 で動いています。これを検証するために、
8081 で動くクライアントと、
8082 で動く API サーバーを作成して下さい。
FastAPI を使って下さい。
Keycloak の用意
Realm: myrealm
Client: myclient01(confidential、クライアントシークレットあり)
ユーザー名・パスワード:任意(例: scott / tiger)
クライアントプログラム
client_app.py
from fastapi import FastAPI, Form
from fastapi.responses import HTMLResponse
import requests
import sys
app = FastAPI()
KEYCLOAK_TOKEN_URL = "http://localhost:8080/realms/myrealm/protocol/openid-connect/token"
API_URL = "http://localhost:8082/protected"
CLIENT_ID = "myclient01"
CLIENT_SECRET = "U7NTfODXUhLd4K9JwAVYpTvX6ghlRIu6"
@app.get("/", response_class=HTMLResponse)
async def login_form():
return """
<form action="/login" method="post">
Username: <input type="text" name="username"/><br>
Password: <input type="password" name="password"/><br>
<input type="submit" value="Login"/>
</form>
"""
@app.post("/login")
async def login(username: str = Form(...), password: str = Form(...)):
sys.stderr.write("*** check aaa ***\n")
data = {
"grant_type": "password",
"client_id": CLIENT_ID,
"client_secret": CLIENT_SECRET,
"username": username,
"password": password,
}
sys.stderr.write("*** check bbb ***\n")
response = requests.post(KEYCLOAK_TOKEN_URL, data=data)
sys.stderr.write("*** check ccc ***\n")
if response.status_code != 200:
return {"error": "ログイン失敗"}
sys.stderr.write("*** check ddd ***\n")
token = response.json()["access_token"]
sys.stderr.write("*** check eee ***\n")
api_response = requests.get(API_URL, headers={"Authorization": f"Bearer {token}"})
sys.stderr.write("*** check fff ***\n")
print("API response text:", api_response.text)
# jx = api_response.json()
# sys.stderr.write(jx + "\n")
sys.stderr.write("*** check ggg ***\n")
try:
return {"api_response": api_response.json()}
except Exception as e:
return {
"error": "API response is not valid JSON",
"status_code": api_response.status_code,
"text": api_response.text,
}
実行方法
uvicorn client_app:app --port 8081
API サーバープログラム
jose を使う為に、Python の仮想環境を使います。
$ which python
/home/uchida/myenv/bin/python
ライブラリーのインストール
pip install fastapi uvicorn requests python-jose[cryptography]
api_server.py
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import jwt, JWTError
import requests
app = FastAPI()
KEYCLOAK_URL = "http://localhost:8080"
REALM = "myrealm"
ALGORITHM = "RS256"
#AUDIENCE = "myclient01"
AUDIENCE = "account"
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# トークンの kid に一致する鍵を取得
def get_public_key(token: str):
try:
jwks_url = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/certs"
jwks = requests.get(jwks_url).json()
headers = jwt.get_unverified_header(token)
kid = headers["kid"]
for key in jwks["keys"]:
if key["kid"] == kid:
return key
raise Exception("一致する kid の鍵が見つかりません")
except Exception as e:
raise HTTPException(status_code=500, detail=f"公開鍵の取得に失敗: {str(e)}")
@app.get("/protected")
async def protected(token: str = Depends(oauth2_scheme)):
try:
public_key = get_public_key(token)
payload = jwt.decode(token, public_key, algorithms=[ALGORITHM], audience=AUDIENCE)
return {"message": "アクセス成功", "username": payload.get("preferred_username")}
except JWTError as e:
raise HTTPException(status_code=401, detail=f"トークン検証失敗: {str(e)}")
except Exception as e:
raise HTTPException(status_code=500, detail=f"内部エラー: {str(e)}")
実行方法
uvicorn api_server:app --port 8082
ブラウザー
http://127.0.0.1:8081 にアクセス
ログイン成功
ログイン失敗
API サーバーのテストスクリプト
get_access_token.sh
URL="http://localhost:8080/realms/myrealm/protocol/openid-connect/token"
#
USR="scott"
PASSWORD="hello9"
#
http --form POST $URL \
"grant_type=password" \
"client_id=myclient01" \
"client_secret=U7NTfODXUhLd4K9JwAVYpTvX6ghlRIu6" \
"username="$USR \
"password="$PASSWORD > access_token.json
api_server_check.sh
rm -f access_token.json
#
./get_access_token.sh
#
URL="http://localhost:8082/protected"
ACCESS_TOKEN=`jq -r .access_token access_token.json`
#
curl -H "Authorization: Bearer "$ACCESS_TOKEN $URL
実行結果
$ ./api_server_check.sh
{"message":"アクセス成功","username":"scott"}
JWT の audience を確認する方法
audience_check.sh
rm -f access_token.json
./get_access_token.sh
#
ATOKEN=`jq -r .access_token access_token.json`
echo $ATOKEN | cut -d "." -f2 | base64 -d | jq .
実行結果
$ ./audience_check.sh | jq .aud
"account"
API サーバーの
AUDIENCE = "account"
と一致している必要があります。