はじめに
この記事では Flask を軸に、OWASP API Security Top 10 (2023) の全リスクをカバーしながら、攻撃シナリオ・改善アプローチ・本番運用で使えるコード例まで一気通貫で整理します。
API1:2023 壊れたオブジェクトレベル認可 (BOLA)
落とし穴
- URL で指定された
user_id
を信頼し、そのまま他者のデータを返す - GraphQL や検索 API でフィルタ未設定のまま全件を返してしまう
# ❌ 脆弱な例: 所有権チェックなし
@app.get("/api/users/<int:user_id>")
@jwt_required()
def get_user(user_id):
return User.query.get_or_404(user_id).to_dict()
セキュア実装
# app/security/ownership.py
from flask import g, abort
from functools import wraps
def require_owner(param_name: str):
"""URL パラメータと JWT の sub を比較して所有権を保証"""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
target_id = kwargs.get(param_name)
if str(target_id) != str(g.current_user.id):
abort(403, description="owner mismatch")
return f(*args, **kwargs)
return wrapper
return decorator
@app.get("/api/users/<int:user_id>")
@jwt_required()
@require_owner("user_id")
def get_user(user_id):
return g.current_user.to_dict(mask_sensitive=True)
API2:2023 壊れた認証 (Broken Authentication)
落とし穴
- 平文パスワード保存
- 有効期限なしのトークン
- HTTPS 強制の欠如
# ❌ 平文パスワードと長寿命トークン
user.password = request.json["password"]
token = jwt.encode({"sub": user.id}, "weak-secret", algorithm="HS256")
セキュア実装
import secrets
from datetime import datetime, timedelta, timezone
PASSWORD_POLICY = {
"min_length": 12,
"requires_symbol": True,
}
def hash_password(password: str) -> bytes:
if len(password) < PASSWORD_POLICY["min_length"]:
raise ValueError("password too short")
return bcrypt.hashpw(password.encode(), bcrypt.gensalt())
def generate_access_token(user_id: int, *, expires_in: int = 900) -> str:
payload = {
"sub": user_id,
"iat": datetime.now(timezone.utc),
"exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in),
"jti": secrets.token_urlsafe(16),
}
return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm="HS256")
チェックポイント
- TLS 経由でのみ発行
- リフレッシュトークンは回転制御し、DB で失効管理
- MFA を
flask-talisman
等で統合 (本稿では割愛)
API3:2023 壊れたオブジェクトプロパティレベル認可
落とし穴
-
User.to_dict()
がis_admin
やsalary
など内部プロパティを丸ごと返却 - 更新 API が想定外のフィールドまで書き換え可能
# ❌ フィールドがダダ漏れ
def to_dict(self):
return self.__dict__
セキュア実装
from marshmallow import Schema, fields
class UserPublicSchema(Schema):
id = fields.Int()
username = fields.Str()
display_name = fields.Str()
class UserUpdateSchema(Schema):
display_name = fields.Str(required=True)
# role や salary などは定義しない
@app.patch("/api/users/me")
@jwt_required()
def update_profile():
payload = UserUpdateSchema().load(request.json)
g.current_user.display_name = payload["display_name"]
db.session.commit()
return UserPublicSchema().dump(g.current_user)
API4:2023 無制限のリソース消費
落とし穴
- 検索 API にレート制限なし
- 巨大ファイルアップロードを制限せず OOM
# ❌ 無制限アクセス
@app.get("/api/search")
def search():
return expensive_query(request.args.get("q", ""))
セキュア実装
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(key_func=get_remote_address, default_limits=["200/minute"])
limiter.init_app(app)
@app.get("/api/search")
@limiter.limit("30/minute;5/second")
def search():
q = request.args.get("q", "")[:128]
return search_service.search(q, page_size=min(int(request.args.get("limit", 20)), 50))
API5:2023 壊れた機能レベル認可
落とし穴
- 管理者 UI と一般ユーザー API が同じ Blueprint でごちゃ混ぜ
- RBAC を忘れて主要操作を全公開
# ❌ 認可チェック無しで機能を公開
@app.post("/api/admin/users")
def create_user():
...
セキュア実装
# app/security/roles.py
def role_required(*roles: str):
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not g.current_user or not g.current_user.has_any_role(roles):
abort(403)
return f(*args, **kwargs)
return wrapper
return decorator
admin_bp = Blueprint("admin", __name__, url_prefix="/api/admin")
@admin_bp.post("/users")
@jwt_required()
@role_required("admin")
def create_user():
...
API6:2023 機密ビジネスフローへの無制限アクセス
落とし穴
- マルチステップの購入フローで状態検証を行わず、ステップをスキップできる
- 内部 API を直接叩いて注文処理を繰り返しトリガ
# ❌ CSRF や状態検証なしの決済実行
@app.post("/api/orders/<int:order_id>/execute")
def execute_order(order_id):
return order_service.execute(order_id)
セキュア実装
from enum import Enum, auto
class OrderState(Enum):
DRAFT = auto()
VALIDATED = auto()
EXECUTED = auto()
@app.post("/api/orders/<int:order_id>/execute")
@jwt_required()
@require_owner("order_id")
def execute_order(order_id):
order = order_service.get(order_id)
if order.state != OrderState.VALIDATED:
abort(409, description="order not validated")
order_service.execute(order)
return {"status": "ok"}, 202
実務 Tips
- イベントソーシングで状態遷移を監査
- ビジネスルールを
policy
層に集約
API7:2023 SSRF (Server Side Request Forgery)
落とし穴
- ユーザー入力 URL をそのまま
requests.get
に流す - メタデータサービス
http://169.254.169.254
へのアクセス防止策なし
# ❌ SSRF の温床
@app.post("/api/fetch-metadata")
def fetch_metadata():
url = request.json["url"]
return requests.get(url).json()
セキュア実装
import ipaddress
from urllib.parse import urlparse
ALLOWED_SCHEMES = {"https"}
ALLOWLIST_HOSTS = {"api.partner.com"}
def validate_url(url: str) -> str:
parsed = urlparse(url)
if parsed.scheme not in ALLOWED_SCHEMES:
raise ValueError("invalid scheme")
host_ip = socket.gethostbyname(parsed.hostname)
ip = ipaddress.ip_address(host_ip)
if ip.is_private or parsed.hostname not in ALLOWLIST_HOSTS:
raise ValueError("host not allowed")
return url
@app.post("/api/fetch-metadata")
def fetch_metadata():
url = validate_url(request.json.get("url", ""))
res = requests.get(url, timeout=3)
res.raise_for_status()
return res.json()
API8:2023 セキュリティ設定ミス
落とし穴
- Flask の
DEBUG=True
で本番運用 - TLS 強制やセキュリティヘッダを未設定
# ❌ デフォルト設定のまま
app = Flask(__name__)
セキュア実装
from flask_talisman import Talisman
app = Flask(__name__)
app.config.from_object("app.config.ProductionConfig")
Talisman(
app,
content_security_policy={
"default-src": "'self'",
"img-src": "'self' data:"
},
force_https=True,
session_cookie_secure=True,
)
@app.after_request
def apply_additional_headers(resp):
resp.headers["X-Content-Type-Options"] = "nosniff"
resp.headers["X-Frame-Options"] = "DENY"
return resp
DevSecOps
-
pip-audit
を CI に組み込み - IaC でセキュリティグループや ALB の TLS 設定をコード化
API9:2023 不適切なインベントリ管理
落とし穴
- 影の API (Shadow API) がドキュメント化されず放置
- バージョン管理されていないエンドポイント
# ❌ 青天井で Blueprint を追加
app.register_blueprint(experimental_bp)
セキュア実装
# app/catalog/registry.py
from dataclasses import dataclass
@dataclass(frozen=True)
class ApiSpec:
path: str
method: str
owner: str
lifecycle: str # production / deprecated
API_REGISTRY: list[ApiSpec] = [
ApiSpec(path="/api/users", method="GET", owner="platform", lifecycle="production"),
ApiSpec(path="/api/admin/users", method="POST", owner="platform", lifecycle="production"),
]
def require_registered(path: str, method: str):
def middleware(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not any(spec.path == path and spec.method == method for spec in API_REGISTRY):
abort(410, description="Endpoint not registered")
return f(*args, **kwargs)
return wrapper
return middleware
CI では API カタログと OpenAPI スキーマを突き合わせ、未登録のルートがないか検査します。
API10:2023 API の安全でない利用
落とし穴
- 外部 API のレスポンスを信頼し検証せず保存
- Webhook を署名検証せずに処理
# ❌ Webhook 署名未検証
@app.post("/api/webhooks/payment")
def webhook():
payload = request.json
process(payload)
return "ok"
セキュア実装
import hmac
import hashlib
def verify_signature(raw_body: bytes, signature: str) -> bool:
secret = current_app.config["PAYMENT_WEBHOOK_SECRET"].encode()
expected = hmac.new(secret, raw_body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature)
@app.post("/api/webhooks/payment")
def webhook():
signature = request.headers.get("X-Signature", "")
if not verify_signature(request.data, signature):
abort(401, description="invalid signature")
payload = PaymentWebhookSchema().load(request.json)
payment_service.handle(payload)
return {"status": "accepted"}, 202
追加対策
- レスポンススキーマ検証 (pydantic/Marshmallow)
- 外部 API の SLA を監視し、タイムアウトとサーキットブレーカを設定