サブスクリプション課金で 「クライアントの言うことを信じてはいけない」 はもはや常識ですが、StoreKit 2 で具体的にどうサーバー側検証するかは情報が散らばっています。AWS Lambda(Python)で Apple JWS を App Store Server API を呼ばずに 暗号検証して、expiresDate まで含めて自前で判断する構成を、恋愛メッセージ分析アプリ Relora の実装をベースに解説します。
この記事で分かること
- StoreKit 2 の
jwsRepresentationを Apple Root CA まで遡って検証する方法 - PyJWT + cryptography での ES256 署名検証 / x5c 証明書チェーン検証
- DynamoDB に検証済みプランを保存して Bedrock 呼び出し時にサーバー単独で plan 判定 する設計
- Sonnet 上限到達時の Qwen3 自動フォールバック
- 「StoreKit 2 のクライアント検証だけ」の何が危険か
全体構成
[iOS]
StoreKit 2: Transaction.currentEntitlements
└─ jwsRepresentation (JWS 文字列)
│
│ POST /v1/update-subscription
│ Authorization: Bearer <Cognito AccessToken>
▼
[API Gateway → Lambda]
apple_jws.verify_and_decode(signed_data)
├─ x5c 証明書チェーン(リーフ → 中間 → ルート)の署名検証
├─ ルートが Apple Root CA G3 か(CN/Org検証)
├─ リーフ公開鍵で JWS 署名検証(ES256)
└─ ペイロード decode
▼
DynamoDB: USER#xxx / SUBSCRIPTION
{ plan, productId, expiresDate, rawJWS }
▼
/v1/analyze 時にこのレコードを参照して plan 判定
ポイントは 「クライアントが送ってくる tier パラメータは無視する」 こと。Lambda は DynamoDB を見て、現時点で課金が有効かどうかを自分で判断します。
クライアント側: jwsRepresentation を取り出す
StoreKit 2 では Transaction.currentEntitlements を回せば、有効なすべてのトランザクションが取れます。VerificationResult に対して .jwsRepresentation プロパティが生えていて、これがそのまま Apple 署名済みの JWS 文字列です。
private func refreshStatus(forceSync: Bool = false) async {
var detectedPlan: SubscriptionPlan = .free
var activeJWS: String?
for await result in Transaction.currentEntitlements {
guard let transaction = try? checkVerified(result),
transaction.revocationDate == nil else { continue }
switch transaction.productID {
case Self.premiumProductID:
detectedPlan = .premium
activeJWS = result.jwsRepresentation
case Self.standardProductID:
if detectedPlan != .premium {
detectedPlan = .standard
activeJWS = result.jwsRepresentation
}
default:
break
}
}
// プラン変更時に限ってサーバーに同期
if detectedPlan != lastSyncedPlan || forceSync {
if await syncSubscriptionToServer(jwsRepresentation: activeJWS) {
lastSyncedPlan = detectedPlan
}
}
}
注意: result.jwsRepresentation は VerificationResult<Transaction> のプロパティ で、検証成功/失敗にかかわらず取れます。クライアント側の checkVerified は通信障害判定のために使い、サーバー側で改めて暗号検証します。
Transaction.updates のリスナーも仕込んでおき、課金更新・取消・払い戻しが起きたら即サーバーに同期します。
private func listenForTransactions() -> Task<Void, Error> {
Task.detached { @MainActor in
for await result in Transaction.updates {
if let transaction = try? self.checkVerified(result) {
await transaction.finish()
await self.refreshStatus(forceSync: true)
}
}
}
}
サーバー側: Apple JWS の暗号検証
JWS は header.payload.signature の3パートで、header の x5c フィールドに 証明書チェーン(リーフ → 中間 → Apple Root CA G3)が base64 で並んでいます。検証手順は以下です。
- ヘッダーを base64url デコードして
x5cを取り出す - 各証明書を DER として読み込む
- 末尾(ルート)が Apple Root CA G3 であることを確認
- 各証明書が「親で署名されている」ことを順に検証
- ルートが自己署名であることを確認
- リーフ証明書の公開鍵で JWS 自体の署名を検証(ES256)
PyJWT は 6番だけは標準で対応していますが、1〜5は自前で書きます。
import base64
import json
import jwt # PyJWT
from cryptography import x509
from cryptography.hazmat.primitives.asymmetric import ec
_APPLE_ROOT_CA_CN = "Apple Root CA - G3"
_APPLE_ROOT_CA_ORG = "Apple Inc."
def verify_and_decode(signed_data: str) -> dict:
parts = signed_data.split(".")
if len(parts) != 3:
raise ValueError("Invalid JWS format: expected 3 parts")
header_data = json.loads(_decode_b64url(parts[0]))
if header_data.get("alg") != "ES256":
raise ValueError(f"Unsupported algorithm: {header_data.get('alg')}")
x5c = header_data.get("x5c")
if not x5c or len(x5c) < 2:
raise ValueError("Missing or insufficient x5c certificate chain")
certs = [x509.load_der_x509_certificate(base64.b64decode(c)) for c in x5c]
# ルート検証(Apple Root CA G3)
_verify_apple_root(certs[-1])
# チェーン署名検証(リーフ → 中間 → ルート)
for i in range(len(certs) - 1):
_verify_cert_signed_by(certs[i], certs[i + 1])
# ルート自己署名
_verify_cert_signed_by(certs[-1], certs[-1])
# リーフ公開鍵で JWS 署名検証
payload = jwt.decode(
signed_data,
certs[0].public_key(),
algorithms=["ES256"],
options={"verify_aud": False, "verify_iss": False, "verify_exp": False},
)
return payload
x5c チェーンの署名検証
ECDSA 証明書を cryptography ライブラリで検証するヘルパーです。
def _verify_cert_signed_by(child, parent):
parent_public_key = parent.public_key()
if isinstance(parent_public_key, ec.EllipticCurvePublicKey):
parent_public_key.verify(
child.signature,
child.tbs_certificate_bytes,
ec.ECDSA(child.signature_hash_algorithm),
)
else:
parent_public_key.verify(
child.signature,
child.tbs_certificate_bytes,
child.signature_hash_algorithm,
)
tbs_certificate_bytes(To-Be-Signed バイト列)と signature を分離して、親の公開鍵で検証します。
ルート CA の固定
「ルートが Apple のものであれば信頼する」では弱いです。ルートの Common Name と Organization を文字列で固定してエンドツーエンドの信頼を作ります。
def _verify_apple_root(cert):
cn = cert.subject.get_attributes_for_oid(x509.oid.NameOID.COMMON_NAME)
org = cert.subject.get_attributes_for_oid(x509.oid.NameOID.ORGANIZATION_NAME)
if _APPLE_ROOT_CA_CN not in cn[0].value:
raise ValueError("Root CN mismatch")
if _APPLE_ROOT_CA_ORG not in org[0].value:
raise ValueError("Root Org mismatch")
将来 Apple が G4 以降に切り替えたらここを更新します。本来は CRL/OCSP も見るべきですが、Apple の根本ルートが失効するシナリオは事実上無視できるとして簡略化しています。
ペイロード検証: bundleId と productId のホワイトリスト
検証済みペイロードに対して、他アプリの JWS を流用されていないか と 想定する商品IDか を確認します。
_KNOWN_PRODUCTS = {
"com.mnaoki.Relora.standard.monthly": "standard",
"com.mnaoki.Relora.premium.monthly": "premium",
}
_EXPECTED_BUNDLE_ID = "com.m-naoki-m.Relora"
def _handle_update_subscription(event, context):
user_id = _get_user_id(event)
body = json.loads(event.get("body", "{}"))
signed_data = body.get("signedTransactionData")
if not signed_data:
# サブスクなし → Free に戻す
usage_table.delete_item(
Key={"PK": f"USER#{user_id}", "SK": "SUBSCRIPTION"}
)
return _ok({"plan": "free"})
try:
payload = verify_and_decode(signed_data)
except ValueError as e:
print(f"[WARN] JWS verification failed for user {user_id}: {e}")
return _error(400, "Invalid subscription data")
if payload.get("bundleId") != _EXPECTED_BUNDLE_ID:
return _error(400, "Invalid subscription data")
plan = _KNOWN_PRODUCTS.get(payload.get("productId"), "free")
if plan == "free":
return _error(400, "Invalid subscription data")
expires_ms = payload.get("expiresDate")
if expires_ms and time.time() > (int(expires_ms) / 1000):
plan = "free"
item = {
"PK": f"USER#{user_id}",
"SK": "SUBSCRIPTION",
"plan": plan,
"productId": payload["productId"],
"updatedAt": int(time.time()),
"rawJWS": signed_data,
}
if expires_ms:
item["expiresDate"] = int(expires_ms)
item["ttl"] = int(expires_ms) // 1000 + 86400 * 7
usage_table.put_item(Item=item)
return _ok({"plan": plan})
rawJWS も保存しているのは、後から「このユーザーの最後の購入レシートをもう一度検証したい」というデバッグ用途です。
分析 API 側: クライアントの tier を信じない
/v1/analyze のハンドラ冒頭で、DynamoDB から SUBSCRIPTION レコードを引いて plan を確定させます。
sub_response = usage_table.get_item(
Key={"PK": f"USER#{user_id}", "SK": "SUBSCRIPTION"}
)
sub_item = sub_response.get("Item", {})
plan = sub_item.get("plan", "free")
expires_ms = sub_item.get("expiresDate")
if expires_ms and time.time() > (int(expires_ms) / 1000):
plan = "free" # 期限切れは即降格
if plan == "free" and tier == "premium":
return _error(403, "Sonnet requires Standard or Premium plan")
クライアントが tier=premium を送ってきても、サーバーが知っている plan が free なら 403。「課金してないのに Sonnet を呼ぼうとする」を Lambda 1ファイルで完全に塞げます。
DynamoDB アトミックでフォールバック制御
Sonnet(premium tier)の日次上限を超えたら自動で Qwen3(free tier)にフォールバックする設計です。これは Lambda の DynamoDB 条件付き更新で実装します。
DAILY_LIMITS = {
"free": {"free": 99999, "premium": 0},
"standard": {"free": 99999, "premium": 20},
"premium": {"free": 99999, "premium": 50},
}
count_attr = f"count_{tier}"
ttl_value = int(time.time()) + 86400 * 7
try:
update_result = usage_table.update_item(
Key={"PK": f"USER#{user_id}", "SK": f"DATE#{today}"},
UpdateExpression="SET #cnt = if_not_exists(#cnt, :zero) + :one, #ttl = :ttl",
ConditionExpression="attribute_not_exists(#cnt) OR #cnt < :limit",
ExpressionAttributeNames={"#cnt": count_attr, "#ttl": "ttl"},
ExpressionAttributeValues={
":one": 1, ":zero": 0,
":ttl": ttl_value,
":limit": daily_limit,
},
ReturnValues="ALL_NEW",
)
except usage_table.meta.client.exceptions.ConditionalCheckFailedException:
return _error(429, "Daily limit reached")
ConditionalCheckFailedException が起きたら、クライアント側で tier="free" を再指定して再度呼び出させます。これでサーバー側のロジックは「上限なら拒否」のシンプルな実装で済み、複雑なフォールバックを Lambda に持ち込まなくて済みます。
「クライアント検証だけ」の何が危険か
StoreKit 2 はクライアント側で VerificationResult を返してくれて、署名済みかどうか判定できます。これだけで十分に見えますが、攻撃者の視点ではざっくり次のことが可能です。
-
MITM プロキシでアプリのレスポンスを書き換える: ジェイルブレイクや mitmproxy を仕込めば、
StoreKit 2の戻り値そのものは触れなくても、Bedrock を呼ぶ通信に直接tier=premiumを差し込むのは容易 -
アプリのバイナリを再署名して
currentPlan = .premiumを強制: TestFlight 配布や Sideloading 時代の懸念 - 解約直後のキャッシュを送り続ける: クライアントが「期限切れ」を見落とすケースが普通に起きる
つまり 「課金状態を最終的に決める権利は常にサーバー」 にしないと、フリーミアム設計はいつか崩されます。本記事の構成は、サーバーが Apple Root CA まで遡った検証結果のみを信頼する形に揃えています。
制約と注意点
- App Store Server API(旧 verifyReceipt)を使っていない: API キーや収益データを取りに行く必要があれば併用が必要。Reloraは「いま課金しているか」のみを判定すれば足りるので JWS 検証だけ
-
オフライン購入の整合性: クライアントがオフライン中に購入完了した場合、サーバー同期は復帰時。その間
tier=premiumで叩いても 403 になる ─ 数秒〜数分の不整合は許容 - JWS 形式変更リスク: Apple が x5c 形式や ES256 以外を導入したら追従が必要
まとめ
- StoreKit 2 の
jwsRepresentationは Apple Root CA G3 まで遡って暗号検証できる - PyJWT + cryptography で App Store Server API なしの検証が可能
- 検証済み plan を DynamoDB に保存し、
/v1/analyzeではそれだけを信頼する - クライアントが送る
tierは決定権を持たない - アトミック条件付き更新で日次上限と Qwen3 フォールバックを両立
関連記事:
Relora(App Store): https://apps.apple.com/app/relora/id6762029713