はじめに
皆さんOIDC使っていますか?自分は使っていません。なぜなら使う機会がなかったためです。さて、今回はpythonとAuthlibでOIDCの簡易アプリケーションを作っていきましょう。
他の記事はrefreshToken使ってくれないのが多いので、本記事ではそこも実装していきます。
ライブラリ
- uvicorn
- fastapi
- authlib
How to
今回はmicrosoftのアカウントを使っていきます
setup
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
microsoft: StarletteOAuth2App = oauth.register(
name="microsoft",
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
server_metadata_url=f"https://login.microsoftonline.com/{TENANT_ID}/v2.0/.well-known/openid-configuration",
client_kwargs={"scope": "openid profile offline_access"},
)
redirect
@router.get("/auth/oidc")
async def authenticate_oidc_mc(request: Request):
redirect_uri = request.url_for("auth_callback") # https://...../callback とかでもOK
return await microsoft.authorize_redirect(request, redirect_uri)
callback
class Session():
def __init__(self, key: str, refresh_token: str | None = None):
self.id = str(random())
self.key = key
self.refresh_token = refresh_token
# 本来はDBとかに格納する形になりますが今回はキャッシュで持ちます
sessions: dict[str, Session] = {}
@app.get("/auth/oidc/callback")
async def auth_callback(request: Request):
token = await microsoft.authorize_access_token(request)
session = Session(dict(token['userinfo']).get("sub"))
sessions[session.id] = session
request.session["id"] = session.id
session.refresh_token = dict(token).get("refresh_token")
return RedirectResponse("/")
idTokenをチェックする
async def verify_token(self, id_token: str) -> JWTClaims:
jwks = await self.provider.fetch_jwk_set()
try:
decoded_jwt = jwt.decode(s=id_token, key=jwks)
except Exception as ex:
print(ex)
raise HTTPException(status_code=401)
metadata = await self.provider.load_server_metadata()
if decoded_jwt["iss"] != metadata["issuer"]:
raise HTTPException(status_code=401)
if decoded_jwt["aud"] != self.provider.client_id:
raise HTTPException(status_code=401)
exp = datetime.fromtimestamp(decoded_jwt["exp"])
if exp < datetime.now():
raise HTTPException(status_code=401)
return decoded_jwt
refresh
djangoとかだと自動で更新してくれたりするらしい。starlette (fastapi)とかは無理みたい。
@router.get("/auth/oidc/refresh")
async def refresh_oidc_mc(request: Request):
session: Session | None = sessions.get(request.session.get("id", ""))
if session is None:
return HTTPException(status_code=401, detail="session is none")
if session.refresh_token is None:
return HTTPException(status_code=401, detail="refresh token is none")
async with AsyncOAuth2Client(
client_id=CLIENT_ID,
client_secret=CLIENT_SECRET,
) as client:
new_token = await client.refresh_token(
microsoft.server_metadata["token_endpoint"],
refresh_token=session.refresh_token,
)
session.refresh_token = dict(new_token).get("refresh_token")
sessions[session.id] = session
return "ok"
logout
@router.get("/logout")
async def logout(request: Request):
session: Session | None = sessions.get(request.session.get("id", ""))
if session is not None:
del sessions[session.id]
request.session.clear()
return "logout successfull"