0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

python + authlibでOIDCをやる

Last updated at Posted at 2025-11-06

はじめに

皆さんOIDC使っていますか?自分は使っていません。なぜなら使う機会がなかったためです。さて、今回はpythonとAuthlibでOIDCの簡易アプリケーションを作っていきましょう。

他の記事はrefreshToken使ってくれないのが多いので、本記事ではそこも実装していきます。

ライブラリ

  • uvicorn
  • fastapi
  • authlib

How to

今回はmicrosoftのアカウントを使っていきます

setup

from authlib.integrations.starlette_client import OAuth

oauth = OAuth()
microsoft: StarletteOAuth2App = oauth.register(
    name="microsoft",
    client_id=CLIENT_ID,
    client_secret=CLIENT_SECRET,
    server_metadata_url=f"https://login.microsoftonline.com/{TENANT_ID}/v2.0/.well-known/openid-configuration",
    client_kwargs={"scope": "openid profile offline_access"},
)

redirect

@router.get("/auth/oidc")
async def authenticate_oidc_mc(request: Request):
    redirect_uri = request.url_for("auth_callback") # https://...../callback とかでもOK
    return await microsoft.authorize_redirect(request, redirect_uri)

callback

class Session():
    def __init__(self, key: str, refresh_token: str | None = None):
        self.id = str(random())
        self.key = key
        self.refresh_token = refresh_token

# 本来はDBとかに格納する形になりますが今回はキャッシュで持ちます
sessions: dict[str, Session] = {}

@app.get("/auth/oidc/callback")
async def auth_callback(request: Request):
    token = await microsoft.authorize_access_token(request)

    session = Session(dict(token['userinfo']).get("sub"))
    sessions[session.id] = session
    request.session["id"] = session.id
    session.refresh_token = dict(token).get("refresh_token")
    return RedirectResponse("/")

idTokenをチェックする

async def verify_token(self, id_token: str) -> JWTClaims:
    jwks = await self.provider.fetch_jwk_set()
    try:
        decoded_jwt = jwt.decode(s=id_token, key=jwks)
    except Exception as ex:
        print(ex)
        raise HTTPException(status_code=401)

    metadata = await self.provider.load_server_metadata()
    if decoded_jwt["iss"] != metadata["issuer"]:
        raise HTTPException(status_code=401)

    if decoded_jwt["aud"] != self.provider.client_id:
        raise HTTPException(status_code=401)

    exp = datetime.fromtimestamp(decoded_jwt["exp"])
    if exp < datetime.now():
        raise HTTPException(status_code=401)

    return decoded_jwt

refresh

djangoとかだと自動で更新してくれたりするらしい。starlette (fastapi)とかは無理みたい。

@router.get("/auth/oidc/refresh")
async def refresh_oidc_mc(request: Request):
    session: Session | None = sessions.get(request.session.get("id", ""))
    if session is None:
        return HTTPException(status_code=401, detail="session is none")
    
    if session.refresh_token is None:
        return HTTPException(status_code=401, detail="refresh token is none")
    
    async with AsyncOAuth2Client(
        client_id=CLIENT_ID,
        client_secret=CLIENT_SECRET,
    ) as client:
        new_token = await client.refresh_token(
            microsoft.server_metadata["token_endpoint"],
            refresh_token=session.refresh_token,
        )
    session.refresh_token = dict(new_token).get("refresh_token")
    sessions[session.id] = session
    return "ok"

logout

@router.get("/logout")
async def logout(request: Request):
    session: Session | None = sessions.get(request.session.get("id", ""))
    if session is not None:
        del sessions[session.id]
    request.session.clear()
    
    return "logout successfull"
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?