1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Lambda・Step Functionsで実装するサーバーレス設計パターン実践ガイド

1
Last updated at Posted at 2026-03-10

AWS Lambda・Step Functionsで実装するサーバーレス設計パターン実践ガイド

この記事でわかること

  • サーバーレスアーキテクチャで頻出する6つの設計パターンとその使い分け
  • オーケストレーション(Step Functions)とコレオグラフィ(EventBridge)の選定基準
  • Sagaパターン・CQRS・Fan-Outの具体的な実装方法とAWSサービス構成
  • コールドスタート対策やコスト最適化など、本番運用で必要なチューニング手法
  • 陥りやすい4つのアンチパターンとその回避策

対象読者

  • 想定読者: サーバーレスアーキテクチャを本番環境で採用しようとしているMLエンジニア・バックエンドエンジニア
  • 必要な前提知識:
    • AWS Lambda、API Gatewayの基本的な使い方
    • Python 3.12+ またはNode.js 20.xの基礎文法
    • REST APIとイベント駆動アーキテクチャの基本概念
    • 推論パイプラインやバッチ処理の設計経験があるとより理解が深まります

結論・成果

サーバーレス設計パターンを適切に選択・組み合わせることで、AWS公式ドキュメントの報告によると、同期的なLambda連鎖呼び出しをStep Functionsによるオーケストレーションに置き換えた場合、Lambda関数の実行時間に対する課金が待ち時間分削減されます。また、SQSによる非同期デカップリングを導入した構成では、負荷のピーク時にもバックプレッシャー制御が効くため、DynamoDBのスロットリング発生率を抑えつつスループットを維持できます。

本記事では、MLエンジニアの日常業務(推論API、バッチ処理、データパイプライン)を題材に、6つの設計パターンを具体的なAWSサービス構成とコード例で解説します。

イベント駆動パターンを実装する

サーバーレスアーキテクチャの中核はイベント駆動(Event-Driven)設計です。MLエンジニアにとっては、学習データのアップロードをトリガーに前処理パイプラインが起動する仕組みや、推論リクエストの非同期キューイングなどが身近な例でしょう。ここでは2つの主要パターンを見ていきます。

Fan-Out/Fan-Inパターン

大量のデータを並列処理する場面で威力を発揮するパターンです。1つのイベントが複数のLambda関数を同時に起動し、それぞれが独立して処理を行います。MLの文脈では、大規模な推論バッチを分割して並列実行するケースが典型的です。

以下は、S3に画像ファイルがアップロードされたら、SNS経由で複数のLambda Workerが並列に推論処理を行う実装例です。

# fan_out_handler.py - Fan-Out: S3アップロードをトリガーに並列処理を起動
import json
import boto3

s3 = boto3.client("s3")
sns = boto3.client("sns")

TOPIC_ARN = "arn:aws:sns:ap-northeast-1:123456789012:inference-fan-out"

def handler(event, context):
    """S3イベントを受け取り、ファイルをチャンク分割してSNSに配信する"""
    for record in event["Records"]:
        bucket = record["s3"]["bucket"]["name"]
        key = record["s3"]["object"]["key"]

        # メタデータからチャンク数を決定(デフォルト: 10分割)
        head = s3.head_object(Bucket=bucket, Key=key)
        file_size = head["ContentLength"]
        chunk_count = max(1, file_size // (50 * 1024 * 1024))  # 50MB単位

        for i in range(chunk_count):
            message = {
                "bucket": bucket,
                "key": key,
                "chunk_index": i,
                "total_chunks": chunk_count,
            }
            sns.publish(
                TopicArn=TOPIC_ARN,
                Message=json.dumps(message),
                MessageAttributes={
                    "chunk_index": {
                        "DataType": "Number",
                        "StringValue": str(i),
                    }
                },
            )

    return {"statusCode": 200, "body": f"Dispatched {chunk_count} chunks"}
# fan_in_worker.py - Fan-In: 各Workerの処理結果をDynamoDBに集約
import json
import boto3

dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table("inference-results")

def handler(event, context):
    """SNSメッセージを受け取り、チャンク単位で推論処理を実行する"""
    for record in event["Records"]:
        message = json.loads(record["Sns"]["Message"])

        # ここに実際の推論ロジックを実装
        # 例: SageMaker Endpointの呼び出し、画像分類など
        result = run_inference(
            bucket=message["bucket"],
            key=message["key"],
            chunk_index=message["chunk_index"],
        )

        # 結果をDynamoDBに書き込み(冪等性のためchunk_indexをキーに使用)
        table.put_item(
            Item={
                "job_id": f"{message['key']}",
                "chunk_index": message["chunk_index"],
                "total_chunks": message["total_chunks"],
                "result": json.dumps(result),
                "status": "completed",
            }
        )

    return {"statusCode": 200}


def run_inference(bucket, key, chunk_index):
    """推論ロジックのスタブ(実際にはSageMaker Endpoint呼び出し等)"""
    return {"prediction": "cat", "confidence": 0.95}

なぜこの実装を選んだか:

  • SNSを挟むことで、Producer(S3トリガー)とConsumer(Worker Lambda)が完全に分離される
  • Worker数はSNSサブスクリプションの追加だけでスケールできる
  • DynamoDBのput_itemは冪等性があるため、リトライ時にも安全

注意点:

Fan-Outパターンでは、並列度がLambdaの同時実行数上限(デフォルト1,000)に制約されます。大規模バッチ処理では、事前にAWSサポートへ上限緩和を申請するか、SQSキューで流量制御を挟む設計を検討してください。

イベントソーシングパターン

状態の変更をイベントとして記録し、任意の時点の状態を再構築できるパターンです。MLパイプラインにおけるデータリネージ(どのデータがどの前処理を経たか)の追跡に有効です。

# event_store.py - DynamoDB Streamsを活用したイベントソーシング
import json
import time
import uuid
import boto3

dynamodb = boto3.resource("dynamodb")
event_table = dynamodb.Table("ml-pipeline-events")

def append_event(aggregate_id: str, event_type: str, payload: dict) -> dict:
    """イベントをイベントストアに追記する(追記のみ、更新・削除なし)"""
    event = {
        "aggregate_id": aggregate_id,
        "event_id": str(uuid.uuid4()),
        "event_type": event_type,
        "payload": json.dumps(payload),
        "timestamp": int(time.time() * 1000),
        "version": get_next_version(aggregate_id),
    }
    event_table.put_item(Item=event)
    return event


def get_next_version(aggregate_id: str) -> int:
    """楽観的ロックのためにバージョン番号を取得する"""
    response = event_table.query(
        KeyConditionExpression="aggregate_id = :aid",
        ExpressionAttributeValues={":aid": aggregate_id},
        ScanIndexForward=False,
        Limit=1,
    )
    if response["Items"]:
        return response["Items"][0]["version"] + 1
    return 1


def rebuild_state(aggregate_id: str) -> dict:
    """イベントを時系列で再生して現在の状態を構築する"""
    response = event_table.query(
        KeyConditionExpression="aggregate_id = :aid",
        ExpressionAttributeValues={":aid": aggregate_id},
        ScanIndexForward=True,
    )

    state = {"status": "initialized", "history": []}
    for item in response["Items"]:
        event_type = item["event_type"]
        payload = json.loads(item["payload"])

        if event_type == "DataUploaded":
            state["status"] = "data_ready"
            state["data_location"] = payload["s3_path"]
        elif event_type == "PreprocessingStarted":
            state["status"] = "preprocessing"
        elif event_type == "PreprocessingCompleted":
            state["status"] = "preprocessed"
            state["processed_location"] = payload["output_path"]
        elif event_type == "InferenceCompleted":
            state["status"] = "completed"
            state["result"] = payload["result"]

        state["history"].append(event_type)

    return state

注意点:

イベントソーシングはイベント数が増加するとrebuild_stateのレイテンシが増大します。本番環境では定期的にスナップショットを作成し、スナップショット以降のイベントのみ再生する設計が必要です。DynamoDB Streamsの保持期間は24時間のため、長期保存にはS3やKinesisへの転送パイプラインを別途構築してください。

ワークフローパターンを設計する

複数のLambda関数を連携させる場面では、オーケストレーションコレオグラフィの2つのアプローチがあります。この選択は、サーバーレスアーキテクチャの設計で最も重要な意思決定の1つです。

オーケストレーション vs コレオグラフィの選定基準

観点 オーケストレーション(Step Functions) コレオグラフィ(EventBridge)
制御方式 中央コントローラーが実行順序を管理 各サービスが独立してイベントに反応
可視性 ワークフロー全体をビジュアルに把握可能 分散したイベントフローの追跡が必要
エラー処理 組み込みのリトライ・キャッチ・ロールバック 各サービスが個別にエラー処理を実装
結合度 ワークフロー定義に依存(中程度) イベントスキーマのみに依存(低い)
適用場面 順序保証・補償トランザクションが必要な処理 独立した複数サービスへの通知・連携
コスト 状態遷移ごとに課金(Standard: 約$0.025/1000遷移) イベント発行ごとに課金(約$1/100万イベント)
実行時間上限 Standard: 1年、Express: 5分 制限なし(各Lambdaは15分)

使い分けの指針:

  • Step Functionsを選ぶ場面: 注文処理→決済→在庫更新のように、処理の順序保証失敗時のロールバックが必要なケース
  • EventBridgeを選ぶ場面: ユーザー登録後に「メール送信」「ログ記録」「分析イベント送信」を並行して行うように、各処理が独立しているケース

AWS公式ブログのStreamlining AWS Serverless workflowsでも、Lambda間の同期呼び出しをStep Functionsに移行する手法が推奨されています。

Sagaパターンの実装

分散トランザクションを管理するSagaパターンは、Step Functionsとの相性が非常に良いパターンです。MLエンジニアにとっては、モデルのデプロイパイプライン(モデル検証→エンドポイント作成→トラフィック切り替え→モニタリング設定)のように、各ステップの失敗時に補償アクション(ロールバック)が必要なワークフローで活用できます。

以下は、Step FunctionsのASL(Amazon States Language)でSagaパターンを定義した例です。

{
  "Comment": "MLモデルデプロイのSagaパターン",
  "StartAt": "ValidateModel",
  "States": {
    "ValidateModel": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:validate-model",
      "ResultPath": "$.validation",
      "Next": "CreateEndpoint",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "DeployFailed",
          "ResultPath": "$.error"
        }
      ]
    },
    "CreateEndpoint": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:create-endpoint",
      "ResultPath": "$.endpoint",
      "Next": "ShiftTraffic",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "RollbackEndpoint",
          "ResultPath": "$.error"
        }
      ]
    },
    "ShiftTraffic": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:shift-traffic",
      "ResultPath": "$.traffic",
      "Next": "SetupMonitoring",
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "RollbackTraffic",
          "ResultPath": "$.error"
        }
      ]
    },
    "SetupMonitoring": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:setup-monitoring",
      "End": true,
      "Catch": [
        {
          "ErrorEquals": ["States.ALL"],
          "Next": "RollbackTraffic",
          "ResultPath": "$.error"
        }
      ]
    },
    "RollbackTraffic": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:rollback-traffic",
      "Next": "RollbackEndpoint"
    },
    "RollbackEndpoint": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:ap-northeast-1:123456789012:function:rollback-endpoint",
      "Next": "DeployFailed"
    },
    "DeployFailed": {
      "Type": "Fail",
      "Error": "DeploymentRollback",
      "Cause": "Saga compensation completed - deployment rolled back"
    }
  }
}

なぜStep Functionsを選んだか:

  • ASLのCatch句で各ステップの失敗を捕捉し、補償アクションへ遷移できる
  • ワークフローの実行状態がStep Functionsコンソールでビジュアルに確認できる
  • Lambda関数のタイムアウト(15分)を超える長時間ワークフロー(Standard Workflowは最大1年)に対応

注意点:

Sagaパターンの補償アクション(ロールバック)は、冪等に設計することが必須です。ネットワーク障害等でリトライが発生した場合、同じ補償アクションが複数回実行されても整合性が保たれる必要があります。例えば、エンドポイント削除のロールバック関数は「存在しない場合はスキップする」ロジックを含めてください。

CQRSパターンでデータアクセスを最適化する

CQRS(Command Query Responsibility Segregation)は、書き込み(Command)と読み取り(Query)の責務を分離するパターンです。MLシステムでは、モデルの学習結果の書き込み(高い整合性が必要)と、ダッシュボードからのメトリクス参照(高いスループットが必要)で異なる要件を持つことが多く、CQRSが有効に機能します。

# command_handler.py - Command側: モデルメトリクスの書き込み
import json
import time
import uuid
import boto3

dynamodb = boto3.resource("dynamodb")
write_table = dynamodb.Table("model-metrics-write")

def handler(event, context):
    """モデルの評価メトリクスを書き込む(Command)"""
    body = json.loads(event["body"])

    item = {
        "metric_id": str(uuid.uuid4()),
        "model_id": body["model_id"],
        "model_version": body["version"],
        "accuracy": body["accuracy"],
        "latency_p99_ms": body["latency_p99_ms"],
        "timestamp": int(time.time() * 1000),
        "event_type": "MetricRecorded",
    }

    write_table.put_item(Item=item)

    return {
        "statusCode": 201,
        "body": json.dumps({"metric_id": item["metric_id"]}),
    }
# stream_projector.py - DynamoDB Streamsで読み取り用テーブルに射影する
import json
import boto3
from decimal import Decimal

dynamodb = boto3.resource("dynamodb")
read_table = dynamodb.Table("model-metrics-read")

def handler(event, context):
    """DynamoDB Streamsイベントを受けて読み取り用テーブルを更新する"""
    for record in event["Records"]:
        if record["eventName"] not in ("INSERT", "MODIFY"):
            continue

        new_image = record["dynamodb"]["NewImage"]
        model_id = new_image["model_id"]["S"]
        model_version = new_image["model_version"]["S"]

        # 読み取り用テーブルにモデル単位で集約した最新メトリクスを射影
        read_table.update_item(
            Key={"model_id": model_id},
            UpdateExpression=(
                "SET latest_version = :ver, "
                "latest_accuracy = :acc, "
                "latest_latency_p99_ms = :lat, "
                "updated_at = :ts"
            ),
            ExpressionAttributeValues={
                ":ver": model_version,
                ":acc": new_image["accuracy"]["N"],
                ":lat": new_image["latency_p99_ms"]["N"],
                ":ts": new_image["timestamp"]["N"],
            },
        )

    return {"statusCode": 200}
# query_handler.py - Query側: モデルメトリクスの読み取り
import json
import boto3

dynamodb = boto3.resource("dynamodb")
read_table = dynamodb.Table("model-metrics-read")

def handler(event, context):
    """モデルの最新メトリクスを返す(Query)"""
    model_id = event["pathParameters"]["model_id"]

    response = read_table.get_item(Key={"model_id": model_id})

    if "Item" not in response:
        return {"statusCode": 404, "body": json.dumps({"error": "Model not found"})}

    return {
        "statusCode": 200,
        "body": json.dumps(response["Item"], default=str),
    }

なぜCQRSを選んだか:

  • 書き込みテーブルはイベント単位(append-only)で、読み取りテーブルはモデル単位の集約ビューという異なるスキーマを採用できる
  • 読み取り側のDynamoDBテーブルにはDAX(DynamoDB Accelerator)を追加してキャッシュ層を挿入でき、参照クエリのレイテンシをマイクロ秒レベルに短縮可能
  • 書き込みと読み取りのスケーリングを独立して制御できる

注意点:

CQRSを採用すると、書き込みテーブルと読み取りテーブルの間に結果整合性(Eventual Consistency)が発生します。DynamoDB Streamsの伝搬遅延は通常数百ミリ秒ですが、高負荷時は秒単位に延びる可能性があります。「書き込み直後に最新データを読みたい」要件がある場合は、書き込みテーブルへの強い整合性読み取り(Strongly Consistent Read)をフォールバックとして実装してください。

コールドスタートとコスト最適化を実践する

サーバーレスの本番運用で避けて通れないのがコールドスタート対策とコスト最適化です。2025年8月以降、AWSはLambdaのINITフェーズ(コールドスタート時の初期化処理)にも通常の実行時間と同様の課金を適用するようになりました。この変更により、コールドスタートは単なるレイテンシ問題からコスト問題にもなっています。

コールドスタート対策の選定フローチャート

対策 コスト影響 レイテンシ改善 適用シナリオ
Provisioned Concurrency 高(常時課金) コールドスタート完全排除 推論APIなどレイテンシ要件が厳しいエンドポイント
SnapStart(Java/C#) 低(追加課金なし) INIT時間を約80%短縮(AWS公式発表値) JVMベースの関数
軽量ランタイム選択 なし INIT時間を短縮 新規開発時のランタイム選定
依存関係の最小化 なし パッケージサイズ縮小で改善 すべてのLambda関数
Lambda Layers活用 なし 共通ライブラリの再利用 複数関数で同じSDKを使う場合
# cold_start_optimized.py - コールドスタートを最小化する実装パターン
import json
import os

# グローバルスコープでの初期化: INITフェーズで1回だけ実行される
# Lambda実行環境が再利用される間、この接続は維持される
import boto3

# 環境変数から設定を読み込み(INITフェーズ)
TABLE_NAME = os.environ.get("TABLE_NAME", "default-table")
REGION = os.environ.get("AWS_REGION", "ap-northeast-1")

# クライアントの初期化はグローバルスコープで行う
# これにより、Warm Start時にクライアントの再作成が不要になる
dynamodb = boto3.resource("dynamodb", region_name=REGION)
table = dynamodb.Table(TABLE_NAME)


def handler(event, context):
    """
    ハンドラー関数はビジネスロジックのみに集中する。
    boto3クライアントの初期化はグローバルスコープで完了済み。
    """
    model_id = event.get("pathParameters", {}).get("model_id")
    if not model_id:
        return {"statusCode": 400, "body": json.dumps({"error": "model_id required"})}

    response = table.get_item(Key={"model_id": model_id})

    if "Item" not in response:
        return {"statusCode": 404, "body": json.dumps({"error": "Not found"})}

    return {
        "statusCode": 200,
        "body": json.dumps(response["Item"], default=str),
        "headers": {"Content-Type": "application/json"},
    }

なぜグローバルスコープ初期化を選んだか:

  • Lambda実行環境の再利用(Warm Start)時に、boto3クライアントの初期化コストを回避できる
  • AWS公式ドキュメントでも推奨されているパターン
  • Python 3.12ランタイムでのboto3初期化は約200-300ms程度かかるため、Warm Start時にこの時間を節約できる効果は大きい

コスト最適化の実践テクニック

コスト最適化は、メモリ設定の最適化から始めるのが効果的です。Lambdaではメモリを増やすとCPU割り当ても比例して増加するため、メモリを増やすことで実行時間が短縮され、結果的にコストが下がるケースがあります。

# cost_analyzer.py - Lambda関数のコスト試算ユーティリティ
def estimate_lambda_cost(
    memory_mb: int,
    duration_ms: int,
    invocations_per_month: int,
    region: str = "ap-northeast-1",
) -> dict:
    """
    Lambda関数の月額コストを試算する。
    料金体系: リクエスト数 + 実行時間(GB-秒)
    """
    # 2026年3月時点のap-northeast-1料金
    price_per_request = 0.0000002  # $0.20 per 1M requests
    price_per_gb_second = 0.0000166667  # per GB-second

    # GB-秒の計算
    gb_seconds = (memory_mb / 1024) * (duration_ms / 1000) * invocations_per_month

    # 無料枠を差し引き(月間100万リクエスト + 40万GB-秒)
    billable_requests = max(0, invocations_per_month - 1_000_000)
    billable_gb_seconds = max(0, gb_seconds - 400_000)

    request_cost = billable_requests * price_per_request
    compute_cost = billable_gb_seconds * price_per_gb_second
    total_cost = request_cost + compute_cost

    return {
        "memory_mb": memory_mb,
        "duration_ms": duration_ms,
        "invocations_per_month": invocations_per_month,
        "gb_seconds": round(gb_seconds, 2),
        "request_cost_usd": round(request_cost, 4),
        "compute_cost_usd": round(compute_cost, 4),
        "total_monthly_cost_usd": round(total_cost, 4),
    }


# 使用例: メモリ設定による実行時間とコストのトレードオフを比較
if __name__ == "__main__":
    configs = [
        {"memory_mb": 128, "duration_ms": 3000},   # 低メモリ: 遅いが単価安
        {"memory_mb": 512, "duration_ms": 800},     # 中メモリ: バランス型
        {"memory_mb": 1024, "duration_ms": 400},    # 高メモリ: 速いが単価高
        {"memory_mb": 2048, "duration_ms": 250},    # 最大メモリ: 最速
    ]

    print("| Memory (MB) | Duration (ms) | Monthly Cost (USD) |")
    print("|-------------|---------------|---------------------|")
    for cfg in configs:
        result = estimate_lambda_cost(
            memory_mb=cfg["memory_mb"],
            duration_ms=cfg["duration_ms"],
            invocations_per_month=5_000_000,
        )
        print(
            f"| {cfg['memory_mb']:>11} | {cfg['duration_ms']:>13} "
            f"| ${result['total_monthly_cost_usd']:>18.4f} |"
        )

ポイント: AWS Lambda Power Tuningツール(awslabs/aws-lambda-power-tuning)を使うと、Step Functionsベースで自動的に最適なメモリ設定を特定できます。実際の関数に対して複数のメモリ設定でベンチマークを実行し、コストとパフォーマンスのバランスを可視化してくれます。

サーバーレスのアンチパターンを回避する

サーバーレスは従来のサーバーベースアーキテクチャとは異なる設計原則が必要です。ここでは、AWS公式ドキュメントやre:Invent 2025の発表で指摘されている主要なアンチパターンを紹介します。

アンチパターン1: Lambda Monolith

1つのLambda関数に全てのビジネスロジックを詰め込むパターンです。最初は開発が簡単に見えますが、関数のパッケージサイズが肥大化してコールドスタートが悪化し、IAMポリシーが過剰に広くなります。

# bad_example.py - アンチパターン: Lambda Monolith
def handler(event, context):
    """全てのAPIルートを1つの関数で処理するアンチパターン"""
    path = event["path"]
    method = event["httpMethod"]

    if path == "/users" and method == "GET":
        return get_users()
    elif path == "/users" and method == "POST":
        return create_user(event)
    elif path == "/orders" and method == "GET":
        return get_orders()
    elif path == "/orders" and method == "POST":
        return create_order(event)
    elif path == "/inference" and method == "POST":
        return run_inference(event)
    # ... さらに多くのルートが追加されていく

問題点:

  • パッケージサイズが大きくなり、コールドスタートが遅延する
  • /usersのGETに必要なIAMポリシーが/inferenceにも適用され、最小権限の原則に違反する
  • 1つのバグ修正が全ルートに影響するリスクがある

改善策: API Gatewayのルーティングで各パスを個別のLambda関数にマッピングしてください。

アンチパターン2: 同期的なLambda連鎖呼び出し

Lambda関数が別のLambda関数を同期的に呼び出すパターンです。呼び出し元は呼び出し先の処理完了まで待機するため、待機時間にも課金が発生します。

# bad_chain.py - アンチパターン: 同期的Lambda連鎖
import boto3
import json

lambda_client = boto3.client("lambda")

def order_handler(event, context):
    """注文処理 → 決済 → 在庫更新を同期的に呼び出すアンチパターン"""
    # 決済Lambdaを同期的に呼び出し(待ち時間に課金が発生)
    payment_response = lambda_client.invoke(
        FunctionName="process-payment",
        InvocationType="RequestResponse",  # 同期呼び出し
        Payload=json.dumps(event),
    )
    payment_result = json.loads(payment_response["Payload"].read())

    # 在庫更新Lambdaを同期的に呼び出し(さらに待ち時間)
    inventory_response = lambda_client.invoke(
        FunctionName="update-inventory",
        InvocationType="RequestResponse",
        Payload=json.dumps(payment_result),
    )

    return json.loads(inventory_response["Payload"].read())

問題点:

  • order_handlerはprocess-paymentとupdate-inventoryの待ち時間にも課金される
  • エラーハンドリングが複雑になる(ネストしたtry-except)
  • 同時実行数がチェーン全体で必要になる(3関数×同時実行数)

改善策: SQSキューによるデカップリング、またはStep Functionsによるオーケストレーションに置き換えてください。本記事のSagaパターンの実装例を参照してください。

アンチパターン3: 再帰的なLambda呼び出し

Lambda関数がS3にファイルを書き込み、そのS3イベントが同じLambda関数をトリガーする無限ループパターンです。

リスク: Lambda関数が自動スケールで大量に起動し、コストが急増する可能性があります。AWSは再帰ループ検出機能を提供していますが、入出力のS3バケットやプレフィックスを分離する設計で根本的に防止するのが安全です。

アンチパターン4: 不要なProxy Lambda

API GatewayとDynamoDBの間にLambda関数を配置しているが、Lambda内部ではDynamoDBへの単純な読み書きしか行っていないパターンです。ビジネスロジックが不要な場合は、API Gateway VTLテンプレートによるDynamoDB直接統合で、Lambda関数自体を不要にできます。

よくある問題と解決方法

問題 原因 解決方法
DynamoDB呼び出しがタイムアウトする DynamoDBクライアントのデフォルトリトライ(最大10回、合計50秒超) config=Config(retries={"max_attempts": 3}, connect_timeout=5, read_timeout=5) でタイムアウトを短縮
API Gatewayが504を返す Lambda実行時間が29秒(API Gateway上限)を超過 Lambda timeout を29秒未満に設定。長時間処理は非同期呼び出し+ポーリングに変更
Step Functionsの状態遷移コストが高い ステップ数が多いワークフローをStandard Workflowで実行 短時間・高頻度ワークフローはExpress Workflowを検討(約10分の1の料金)
コールドスタートが頻発する トラフィックが少ない時間帯に実行環境が回収される Provisioned Concurrencyの設定、またはApplication Auto Scalingで時間帯ごとに調整
Lambda関数のパッケージサイズが大きい 不要な依存関係がデプロイパッケージに含まれている Lambda Layersで共通ライブラリを分離、pip installの --no-deps オプション活用

まとめと次のステップ

まとめ:

  • イベント駆動パターン(Fan-Out/Fan-In、イベントソーシング)は、サーバーレスの基本であり、ProducerとConsumerの分離がスケーラビリティの鍵
  • ワークフローパターン(Saga、オーケストレーション vs コレオグラフィ)は、処理の順序保証と独立性の要件で使い分ける
  • CQRSパターンは、書き込みと読み取りの要件が異なるシステム(MLメトリクス管理など)で特に有効
  • コールドスタート対策は2025年8月以降コスト問題にもなっており、グローバルスコープ初期化やProvisioned Concurrencyを戦略的に活用する
  • アンチパターン(Lambda Monolith、同期連鎖、再帰ループ、不要なProxy)を認識し、設計段階で回避することが本番運用の安定性に直結する

次にやるべきこと:

  • AWSサーバーレスパターンカタログで自分のユースケースに近いパターンを探す
  • 本記事のコード例をベースに、自プロジェクトの小さな機能から段階的にサーバーレス化を試す
  • AWS Lambda Power Tuningでメモリ設定を最適化し、コストとパフォーマンスのバランスを把握する

参考


注意: この記事はAI(Claude Code)により自動生成されました。内容の正確性については複数の情報源で検証していますが、実際の利用時は公式ドキュメントもご確認ください。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?