概要
今日、JWTのデコードに関する処理とそれに関するデータベーストランザクション仕様書をmarkdownで書いたので、ざっくり共有する。
多分、どこの会社もこんな感じになっていると思う。気になる点は反映したい。
なお、この仕様はデータベース仕様書の「使用例」の一部として記載するのが望ましいと判断し、追記した。
データベース仕様書に埋め込むにはちょっと重いだろう。
感想
パッと見、難しいかもしれないが、整理すれば理解できる、という感じ。
表現の問題や改良点などをブラッシュアップしていきたい。
copilotやclaudeを使いながらも修正している。
前提知識 / 前提データ
以下のデータは既に仕様書に書かれているものとする。
ダミーデータ
データベーススキーマ, E-R図
データベースに関しては色々考えがあるだろうが、とりあえず上記で固定。
実際はJwtの中身はカラムとして、独立してIssuerやAudienceを埋め込む方が素直かもしれない。
仕様書
How to decode Bearer token by AuthN
Bearerトークンとマスターデータベースを使用した
認証フローに関して説明する。
認証フロー例
Bearerトークンを検証する場合、以下のフローで検証されなくてはいけない。
-
署名検証前デコード(Unverified Decode)
- Bearerトークンを検証するために、事前処理が必要。
- Bearerトークンに含まれる
ペイロードの中から、issを取り出し、その内容を検証する。
-
完全検証 (Full JWT VerificationまたはVerified Decode)
- Idpから公開された公開鍵を使用して、署名検証や認証メタ情報を検証する。
処理例
想定される結果
仕様ベース
| 条件 | 検証段階 | 期待される動作 |
|---|---|---|
| Bearerトークンのフォーマット異常 例: ピリオドが 1 個 / 3 個 / ..
|
PreReadCheck | Exception を返却 理由: トークンデコードエラー |
| Issuer が DB に存在しない | PreReadCheck | Exception を返却 理由: 対応する OidcProvider が存在しない |
| 公開鍵の取得 URL が想定と異なる | PreReadCheck~FullDecodeCheck の途中 ※ 公開鍵取得は PreReadCheck と FullDecodeCheck の間に位置する独立したステップ |
Exception を返却 理由: 公開鍵の検出失敗 |
| Audience が要求先と一致しない | FullDecodeCheck | Exception を返却 理由: Audience 不一致 |
| トークンの有効期限切れ | FullDecodeCheck | Exception を返却 理由: トークン期限切れ |
| 署名検証に失敗 | FullDecodeCheck | Exception を返却 理由: 署名検証失敗 |
| User テーブルに該当ユーザーが存在しない | AuthCheck | Exception を返却 理由: 認証対象ユーザーが存在しない |
ダミーデータ
| 条件 | 期待される動作 |
|---|---|
博麗 霊夢が、認証フローを実行 |
認証成功 |
結月 ゆかりが、認証フローを実行 (別テナントからの実行) |
認証成功 |
ZUN が認証フローを実行 (未登録ユーザー) |
上記いずれかの失敗条件に該当し、認証失敗 |
詳細
API Requestによって、以下のHeader Formatでデータが付与されているものとする。
例として、ユーザー「博麗 霊夢」がログインした場合の例を示す。
Headers:
Authorization: Bearer eyxxxxxx.xxxxx.xxxxx
"署名検証前デコード"で以下のフォーマットでデコードされる。
{
"aud" : "<Audienceの値>",
"iss": "<Issuerの値>",
"sub" : "<Subの値>; 博麗 霊夢に対応するauth0のユーザーsub"
}
OidcProviderテーブルを使用して、以下のSQL文を実行する。
SELECT OidcProviderId, Jwt, AuthEntity, LoginFlow
FROM OidcProvider
WHERE Jwt->>'Issuer' = :issuer;
※ JSON フィールドの参照方法は利用する DBMS に依存するため、
実際の SQL では各 DB の JSON 機能に合わせて実装する。
OidcProviderId : 1
UniqueKey : api-user
OidcSource : auth0
Credential: {
"UserAuth" : {
"ClientId" : "...."
},
"ManagementApi" : {
"ClientId" : "....",
"ClientSecret" : "....",
"ExpiredAt" : "2026-04-27T17:05:50.6386846+00:00",
"AccessToken": "ey..."
}
}
Jwt: {
"Domain" : "...jp.auth0.com",
"Issuer" : "https://.....jp.auth0.com",
"Audience" : "https://..../"
}
AuthAttribute: {}
AuthEntity: user
LoginFlow: oauth
OidcProviderId : 2
UniqueKey : api-device
OidcSource : cognito
Credential: {
"UserAuth" : {
"ClientId" : "...."
}
}
Jwt: {
"Domain" : "...apnortheast-1.amazonaws.com/ap-northeast-1_.......",
"Issuer" : "...apnortheast-1.amazonaws.com/ap-northeast-1_......."
}
AuthAttribute: {}
AuthEntity: device
LoginFlow: oauth
OidcProviderId : 3
UniqueKey : api-consumer
OidcSource : cognito
Credential: {
"UserAuth" : {
"ClientId" : "...."
}
}
Jwt: {
"Domain" : "...apnortheast-1.amazonaws.com/ap-northeast-1_.......",
"Issuer" : "...apnortheast-1.amazonaws.com/ap-northeast-1_......."
}
AuthAttribute: {}
AuthEntity: consumer
LoginFlow: oauth
Jwtの中に含まれるIssuerが含まれるかを検証する。
import jwt
def preread_jwt(oauth_providers: list[OAuthProvider], token: str):
unverified_header = jwt.get_unverified_header(token)
kid = unverified_header.get("kid")
decoded_payload = jwt.decode(token, options={"verify_signature": False})
known_providers = [provider for provider in oauth_providers if provider.jwt.issuer == "<Issuerの値>"]
if len(known_providers) == 0:
raise Exception("No matching OAuth provider found.")
return kid, decoded_payload, known_providers[0]
## oauth_providers: list[OAuthProvider]; プロバイダ一覧
## token: str; Bearerトークン
kid, decoded_payload, known_provider = preread_jwt(oauth_providers, token)
OpenID Configurationを使用して、公開鍵を取得する。
※なお、下記の処理の公開鍵の保持は実設計においてはキャッシュを使用すること。
import jwt
from urllib.request import urlopen
def find_rsa_key(kid: str, issuer: str):
with urlopen(issuer) as response:
jwks = response.read()
rsa_key = None
for key in jwks.get("keys", []):
if key["kid"] == kid:
rsa_key = {
"kty" : key["kty"],
"kid" : key["kid"],
"use" : key["use"],
"n" : key["n"],
"e" : key["e"]
}
if rsa_key is None:
raise Exception("No found matched kid")
return jwt.PyJWT(rsa_key).key
rsa = find_rsa_key(kid, decoded_payload.get("iss"))
公開鍵、audience, issuerを使用して完全検証を行う。
import jwt
def decode_jwt_fully(token: str, oidc_provider: OAuthProvider, public_key: Any):
return jwt.decode(
token,
public_key,
algorithms=["RS256"],
audience=oidc_provider.jwt.audience,
issuer=oidc_provider.jwt.issuer,
options={
"verify_signature" : True,
"verify_exp" : True,
"verify_aud" : True,
"verify_iss" : True
}
)
full_verified_payload = decode_jwt_fully(token, known_provider, public_key)
ここで取得されるPayloadは以下のようになる。
{
"aud" : "<Audienceの値>",
"iss": "<Issuerの値>",
"sub" : "<Subの値>; 博麗 霊夢に対応するauth0のユーザーsub"
}
最後に、取得したPayloadに従って、UserテーブルのCountを
フィルタ条件{sub, known_provider.oidc_provider_id}を加えて
クエリ実行を行なう。
SELECT COUNT(*) FROM User
WHERE OidcProviderId = "known_provider.oidc_provider_id"
AND Sub="full_verified_payload.sub";
件数を評価して、1件であれば認証を完了とし、
それ以外の条件であれば、認証失敗とする。
Cognito利用時の注意点
Cognitoは完全検証の条件が異なる。
CognitoはAudienceの検証値がClientIdとなるため、OidcSourceがcognitoの場合はaudience=oidc_provider.credential.user_auth.client_idを使用する。