0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Keycloak の認証と認可

Last updated at Posted at 2025-05-06

ChatGPT に次の依頼をしました。

KeyCloak が 8080 で動いています。これを検証するために、
8081 で動くクライアントと、
8082 で動く API サーバーを作成して下さい。
言語は、Python でお願いします。

ChatGPT の回答

1) クライアントアプリケーション(ポート 8081)

Web UI(Flask)を使用

Keycloak にログインしてトークンを取得

トークンを使って API サーバー(8082)にリクエストを送信

2) API サーバー(ポート 8082)

受信したトークンを Keycloak に検証させる

トークンが有効であれば保護されたリソースを返す

Keycloak の用意

Realm: myrealm

Client: myclient01(confidential、クライアントシークレットあり)

ユーザー名・パスワード:任意(例: scott / tiger)

CLIENT_ID と CLIENT_SECRET が有効かを確認

check01.py
#! /usr/bin/python

import requests

# Keycloak の情報
KEYCLOAK_URL = "http://localhost:8080"
REALM = "myrealm"
CLIENT_ID = "myclient01"
CLIENT_SECRET = "U7NTfODXUhLd4K9JwAVYpTvX6ghlRIu6"

# トークン取得URL
TOKEN_URL = f"{KEYCLOAK_URL}/realms/{REALM}/protocol/openid-connect/token"

# client_credentials フローでトークン要求
data = {
    "grant_type": "client_credentials",
    "client_id": CLIENT_ID,
    "client_secret": CLIENT_SECRET
}

response = requests.post(TOKEN_URL, data=data)

if response.status_code == 200:
    token_data = response.json()
    print("✅ CLIENT_ID と CLIENT_SECRET は有効です。")
    print("アクセストークン (一部):", token_data['access_token'][:60] + "...")
else:
    print("❌ CLIENT_ID または CLIENT_SECRET が無効です。")
    print("ステータスコード:", response.status_code)
    print("レスポンス:", response.text)

実行結果

$ ./check01.py 
✅ CLIENT_ID と CLIENT_SECRET は有効です。
アクセストークン (一部): eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJoMGtrZU55...

クライアントプログラム

client_app.py
# client_app.py
from flask import Flask, request, redirect, session, url_for
import requests
import os
import sys

app = Flask(__name__)
app.secret_key = os.urandom(24)

KEYCLOAK_BASE = 'http://localhost:8080/realms/myrealm'
CLIENT_ID = 'myclient01'
CLIENT_SECRET = 'U7NTfODXUhLd4K9JwAVYpTvX6ghlRIu6'  # ←Keycloakで取得
REDIRECT_URI = 'http://localhost:8081/callback'

@app.route('/')
def index():
    sys.stderr.write("*** check *** aaa\n")
    return '<a href="/login">Login with Keycloak</a>'

@app.route('/login')
def login():
    sys.stderr.write("*** check *** bbb\n")
    sys.stderr.write(CLIENT_ID + "\n")
    sys.stderr.write(REDIRECT_URI + "\n")
    return redirect(
        f"{KEYCLOAK_BASE}/protocol/openid-connect/auth"
        f"?client_id={CLIENT_ID}"
        f"&response_type=code"
        f"&scope=openid"
        f"&redirect_uri={REDIRECT_URI}"
    )

@app.route('/callback')
def callback():
    sys.stderr.write("*** check *** callback\n")
    code = request.args.get('code')
    token_url = f"{KEYCLOAK_BASE}/protocol/openid-connect/token"
    data = {
        'grant_type': 'authorization_code',
        'code': code,
        'redirect_uri': REDIRECT_URI,
        'client_id': CLIENT_ID,
        'client_secret': CLIENT_SECRET
    }
    token_response = requests.post(token_url, data=data)
    token_json = token_response.json()
    access_token = token_json.get('access_token')
    session['access_token'] = access_token
    return redirect(url_for('protected'))

@app.route('/protected')
def protected():
    sys.stderr.write("*** check *** protected\n")
    access_token = session.get('access_token')
    if not access_token:
        return redirect(url_for('login'))

    api_response = requests.get(
        'http://localhost:8082/api/protected',
        headers={'Authorization': f'Bearer {access_token}'}
    )
    return f"API Response: {api_response.text}"

if __name__ == '__main__':
    app.run(port=8081)

API サーバー プログラム

api_server.py
# api_server.py
from flask import Flask, request, jsonify
import requests
import sys

app = Flask(__name__)

KEYCLOAK_INTROSPECT_URL = 'http://localhost:8080/realms/myrealm/protocol/openid-connect/token/introspect'
CLIENT_ID = 'myclient01'
CLIENT_SECRET = 'U7NTfODXUhLd4K9JwAVYpTvX6ghlRIu6'

@app.route('/api/protected')
def protected():
    sys.stderr.write("/api/protected ***\n")
    auth_header = request.headers.get('Authorization')
    sys.stderr.write("/api/protected bbb ***\n")
    if not auth_header or not auth_header.startswith('Bearer '):
        return jsonify({'error': 'Missing token'}), 401

    token = auth_header.split(' ')[1]
    introspect_response = requests.post(
        KEYCLOAK_INTROSPECT_URL,
        data={'token': token},
        auth=(CLIENT_ID, CLIENT_SECRET)
    )

    token_info = introspect_response.json()
    if token_info.get('active'):
        return jsonify({'message': 'Access granted', 'username': token_info.get('preferred_username')})
    else:
        return jsonify({'error': 'Invalid token'}), 401

if __name__ == '__main__':
    app.run(port=8082)

実行

クライアント と API サーバーの両方を動かす

python client_app.py
python api_server.py

ブラウザーでアクセス

http://127.0.0.1:8081
image.png

クリック
image.png

注意

Keycloak の Client の設定に、
http://localhost:8081/*
を入れる必要があります。

image.png

次の設定でうまく行きました。

image.png

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?