3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

APIをセキュアにするためのベストプラクティス

Posted at

はじめに

この記事では Flask を軸に、OWASP API Security Top 10 (2023) の全リスクをカバーしながら、攻撃シナリオ・改善アプローチ・本番運用で使えるコード例まで一気通貫で整理します。

API1:2023 壊れたオブジェクトレベル認可 (BOLA)

落とし穴

  • URL で指定された user_id を信頼し、そのまま他者のデータを返す
  • GraphQL や検索 API でフィルタ未設定のまま全件を返してしまう
# ❌ 脆弱な例: 所有権チェックなし
@app.get("/api/users/<int:user_id>")
@jwt_required()
def get_user(user_id):
    return User.query.get_or_404(user_id).to_dict()

セキュア実装

# app/security/ownership.py
from flask import g, abort
from functools import wraps

def require_owner(param_name: str):
    """URL パラメータと JWT の sub を比較して所有権を保証"""

    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            target_id = kwargs.get(param_name)
            if str(target_id) != str(g.current_user.id):
                abort(403, description="owner mismatch")
            return f(*args, **kwargs)

        return wrapper

    return decorator


@app.get("/api/users/<int:user_id>")
@jwt_required()
@require_owner("user_id")
def get_user(user_id):
    return g.current_user.to_dict(mask_sensitive=True)

API2:2023 壊れた認証 (Broken Authentication)

落とし穴

  • 平文パスワード保存
  • 有効期限なしのトークン
  • HTTPS 強制の欠如
# ❌ 平文パスワードと長寿命トークン
user.password = request.json["password"]
token = jwt.encode({"sub": user.id}, "weak-secret", algorithm="HS256")

セキュア実装

import secrets
from datetime import datetime, timedelta, timezone

PASSWORD_POLICY = {
    "min_length": 12,
    "requires_symbol": True,
}


def hash_password(password: str) -> bytes:
    if len(password) < PASSWORD_POLICY["min_length"]:
        raise ValueError("password too short")
    return bcrypt.hashpw(password.encode(), bcrypt.gensalt())


def generate_access_token(user_id: int, *, expires_in: int = 900) -> str:
    payload = {
        "sub": user_id,
        "iat": datetime.now(timezone.utc),
        "exp": datetime.now(timezone.utc) + timedelta(seconds=expires_in),
        "jti": secrets.token_urlsafe(16),
    }
    return jwt.encode(payload, current_app.config["SECRET_KEY"], algorithm="HS256")

チェックポイント

  • TLS 経由でのみ発行
  • リフレッシュトークンは回転制御し、DB で失効管理
  • MFA を flask-talisman 等で統合 (本稿では割愛)

API3:2023 壊れたオブジェクトプロパティレベル認可

落とし穴

  • User.to_dict()is_adminsalary など内部プロパティを丸ごと返却
  • 更新 API が想定外のフィールドまで書き換え可能
# ❌ フィールドがダダ漏れ
def to_dict(self):
    return self.__dict__

セキュア実装

from marshmallow import Schema, fields


class UserPublicSchema(Schema):
    id = fields.Int()
    username = fields.Str()
    display_name = fields.Str()


class UserUpdateSchema(Schema):
    display_name = fields.Str(required=True)
    # role や salary などは定義しない


@app.patch("/api/users/me")
@jwt_required()
def update_profile():
    payload = UserUpdateSchema().load(request.json)
    g.current_user.display_name = payload["display_name"]
    db.session.commit()
    return UserPublicSchema().dump(g.current_user)

API4:2023 無制限のリソース消費

落とし穴

  • 検索 API にレート制限なし
  • 巨大ファイルアップロードを制限せず OOM
# ❌ 無制限アクセス
@app.get("/api/search")
def search():
    return expensive_query(request.args.get("q", ""))

セキュア実装

from flask_limiter import Limiter
from flask_limiter.util import get_remote_address

limiter = Limiter(key_func=get_remote_address, default_limits=["200/minute"])
limiter.init_app(app)


@app.get("/api/search")
@limiter.limit("30/minute;5/second")
def search():
    q = request.args.get("q", "")[:128]
    return search_service.search(q, page_size=min(int(request.args.get("limit", 20)), 50))

API5:2023 壊れた機能レベル認可

落とし穴

  • 管理者 UI と一般ユーザー API が同じ Blueprint でごちゃ混ぜ
  • RBAC を忘れて主要操作を全公開
# ❌ 認可チェック無しで機能を公開
@app.post("/api/admin/users")
def create_user():
    ...

セキュア実装

# app/security/roles.py
def role_required(*roles: str):
    def decorator(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            if not g.current_user or not g.current_user.has_any_role(roles):
                abort(403)
            return f(*args, **kwargs)

        return wrapper

    return decorator


admin_bp = Blueprint("admin", __name__, url_prefix="/api/admin")


@admin_bp.post("/users")
@jwt_required()
@role_required("admin")
def create_user():
    ...

API6:2023 機密ビジネスフローへの無制限アクセス

落とし穴

  • マルチステップの購入フローで状態検証を行わず、ステップをスキップできる
  • 内部 API を直接叩いて注文処理を繰り返しトリガ
# ❌ CSRF や状態検証なしの決済実行
@app.post("/api/orders/<int:order_id>/execute")
def execute_order(order_id):
    return order_service.execute(order_id)

セキュア実装

from enum import Enum, auto


class OrderState(Enum):
    DRAFT = auto()
    VALIDATED = auto()
    EXECUTED = auto()


@app.post("/api/orders/<int:order_id>/execute")
@jwt_required()
@require_owner("order_id")
def execute_order(order_id):
    order = order_service.get(order_id)
    if order.state != OrderState.VALIDATED:
        abort(409, description="order not validated")
    order_service.execute(order)
    return {"status": "ok"}, 202

実務 Tips

  • イベントソーシングで状態遷移を監査
  • ビジネスルールを policy 層に集約

API7:2023 SSRF (Server Side Request Forgery)

落とし穴

  • ユーザー入力 URL をそのまま requests.get に流す
  • メタデータサービス http://169.254.169.254 へのアクセス防止策なし
# ❌ SSRF の温床
@app.post("/api/fetch-metadata")
def fetch_metadata():
    url = request.json["url"]
    return requests.get(url).json()

セキュア実装

import ipaddress
from urllib.parse import urlparse

ALLOWED_SCHEMES = {"https"}
ALLOWLIST_HOSTS = {"api.partner.com"}


def validate_url(url: str) -> str:
    parsed = urlparse(url)
    if parsed.scheme not in ALLOWED_SCHEMES:
        raise ValueError("invalid scheme")
    host_ip = socket.gethostbyname(parsed.hostname)
    ip = ipaddress.ip_address(host_ip)
    if ip.is_private or parsed.hostname not in ALLOWLIST_HOSTS:
        raise ValueError("host not allowed")
    return url


@app.post("/api/fetch-metadata")
def fetch_metadata():
    url = validate_url(request.json.get("url", ""))
    res = requests.get(url, timeout=3)
    res.raise_for_status()
    return res.json()

API8:2023 セキュリティ設定ミス

落とし穴

  • Flask の DEBUG=True で本番運用
  • TLS 強制やセキュリティヘッダを未設定
# ❌ デフォルト設定のまま
app = Flask(__name__)

セキュア実装

from flask_talisman import Talisman

app = Flask(__name__)
app.config.from_object("app.config.ProductionConfig")
Talisman(
    app,
    content_security_policy={
        "default-src": "'self'",
        "img-src": "'self' data:"
    },
    force_https=True,
    session_cookie_secure=True,
)


@app.after_request
def apply_additional_headers(resp):
    resp.headers["X-Content-Type-Options"] = "nosniff"
    resp.headers["X-Frame-Options"] = "DENY"
    return resp

DevSecOps

  • pip-audit を CI に組み込み
  • IaC でセキュリティグループや ALB の TLS 設定をコード化

API9:2023 不適切なインベントリ管理

落とし穴

  • 影の API (Shadow API) がドキュメント化されず放置
  • バージョン管理されていないエンドポイント
# ❌ 青天井で Blueprint を追加
app.register_blueprint(experimental_bp)

セキュア実装

# app/catalog/registry.py
from dataclasses import dataclass


@dataclass(frozen=True)
class ApiSpec:
    path: str
    method: str
    owner: str
    lifecycle: str  # production / deprecated


API_REGISTRY: list[ApiSpec] = [
    ApiSpec(path="/api/users", method="GET", owner="platform", lifecycle="production"),
    ApiSpec(path="/api/admin/users", method="POST", owner="platform", lifecycle="production"),
]


def require_registered(path: str, method: str):
    def middleware(f):
        @wraps(f)
        def wrapper(*args, **kwargs):
            if not any(spec.path == path and spec.method == method for spec in API_REGISTRY):
                abort(410, description="Endpoint not registered")
            return f(*args, **kwargs)

        return wrapper

    return middleware

CI では API カタログと OpenAPI スキーマを突き合わせ、未登録のルートがないか検査します。


API10:2023 API の安全でない利用

落とし穴

  • 外部 API のレスポンスを信頼し検証せず保存
  • Webhook を署名検証せずに処理
# ❌ Webhook 署名未検証
@app.post("/api/webhooks/payment")
def webhook():
    payload = request.json
    process(payload)
    return "ok"

セキュア実装

import hmac
import hashlib

def verify_signature(raw_body: bytes, signature: str) -> bool:
    secret = current_app.config["PAYMENT_WEBHOOK_SECRET"].encode()
    expected = hmac.new(secret, raw_body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, signature)


@app.post("/api/webhooks/payment")
def webhook():
    signature = request.headers.get("X-Signature", "")
    if not verify_signature(request.data, signature):
        abort(401, description="invalid signature")
    payload = PaymentWebhookSchema().load(request.json)
    payment_service.handle(payload)
    return {"status": "accepted"}, 202

追加対策

  • レスポンススキーマ検証 (pydantic/Marshmallow)
  • 外部 API の SLA を監視し、タイムアウトとサーキットブレーカを設定

参考文献

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?