1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【AWS CDK】API Gateway 29秒タイムアウト × Bedrock Agent + SageMaker LLM 推論の非同期化パターン完全解説

1
Posted at

はじめに

AWS で生成 AI バックエンドを CDK で構築していると、必ずぶつかる壁があります。

API Gateway の統合タイムアウト 29 秒問題です。

Bedrock Agent(Claude 3.5 Sonnet)が RAG 検索 + SageMaker LLM 推論(Tsuzumi2 等)を連鎖的に実行する場合、処理時間は平気で 30〜90 秒を超えます。しかし API Gateway には 最大 29 秒という厳格な制限があり、504 Gateway Timeout が多発します。

本記事では、この問題を CDK TypeScript で解決する 3 つのパターンを、アーキテクチャ図・実装コード付きで解説します。


問題の全体像

なぜ Lambda タイムアウトを伸ばしても解決しないのか

Lambda の timeout: 120s を設定していても、API Gateway の制限が 29 秒であるため、Lambda が処理を続けている間に API GW が先にタイムアウトします。

API GW (29s) → Lambda (120s) → Bedrock Agent → SageMaker (30-90s)
     ↑
     ここが先に死ぬ

解決パターン 3 選

パターン 適合ユースケース 実装コスト UX
A: 非同期 + ポーリング(SQS + DynamoDB) バッチ処理、管理画面 ポーリングが必要
B: WebSocket API チャット UI、リアルタイム ストリーミング感◎
C: Lambda Response Streaming API レスポンス、シンプル構成 ストリーミング受信◎

パターン A: 非同期 + ポーリング(SQS + DynamoDB)

最もシンプルで堅牢なパターンです。

CDK 実装

// lib/stacks/async-api-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as apigw from 'aws-cdk-lib/aws-apigateway';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as lambdaNodejs from 'aws-cdk-lib/aws-lambda-nodejs';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';
import { Construct } from 'constructs';

export class AsyncApiStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    // ── DynamoDB: ジョブ管理テーブル ──────────────────────────
    const jobsTable = new dynamodb.Table(this, 'JobsTable', {
      tableName: 'ai-agent-jobs',
      partitionKey: { name: 'jobId', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      timeToLiveAttribute: 'ttl',       // 24h で自動削除
      encryption: dynamodb.TableEncryption.AWS_MANAGED,
      pointInTimeRecovery: true,
      removalPolicy: cdk.RemovalPolicy.DESTROY,
    });

    // ── SQS: ジョブキュー ──────────────────────────────────────
    const dlq = new sqs.Queue(this, 'JobDlq', {
      queueName: 'ai-agent-job-dlq',
      retentionPeriod: cdk.Duration.days(14),
    });

    const jobQueue = new sqs.Queue(this, 'JobQueue', {
      queueName: 'ai-agent-job-queue',
      visibilityTimeout: cdk.Duration.seconds(180), // Worker Lambda タイムアウトの 3 倍
      deadLetterQueue: { queue: dlq, maxReceiveCount: 3 },
    });

    // ── Lambda: Submit(即時 202 を返す) ───────────────────────
    const submitLambda = new lambdaNodejs.NodejsFunction(this, 'SubmitLambda', {
      functionName: 'ai-agent-submit',
      entry: 'lambda/submit/index.ts',
      runtime: lambda.Runtime.NODEJS_22_X,
      timeout: cdk.Duration.seconds(10),  // 即時応答なので短くてOK
      environment: {
        JOB_TABLE_NAME: jobsTable.tableName,
        JOB_QUEUE_URL: jobQueue.queueUrl,
      },
    });
    jobsTable.grantWriteData(submitLambda);
    jobQueue.grantSendMessages(submitLambda);

    // ── Lambda: Worker(Bedrock Agent 実行・タイムアウト 15 分) ─
    const workerLambda = new lambdaNodejs.NodejsFunction(this, 'WorkerLambda', {
      functionName: 'ai-agent-worker',
      entry: 'lambda/worker/index.ts',
      runtime: lambda.Runtime.NODEJS_22_X,
      timeout: cdk.Duration.minutes(15),  // 十分な時間を確保
      memorySize: 2048,
      environment: {
        JOB_TABLE_NAME: jobsTable.tableName,
        BEDROCK_AGENT_ID: 'YOUR_AGENT_ID',
        BEDROCK_AGENT_ALIAS_ID: 'YOUR_ALIAS_ID',
      },
    });
    jobsTable.grantWriteData(workerLambda);
    // SQS イベントソース
    workerLambda.addEventSource(
      new lambdaEventSources.SqsEventSource(jobQueue, {
        batchSize: 1,   // 1件ずつ処理(LLM は重いため)
        maxConcurrency: 5,
      }),
    );
    // Bedrock 呼び出し権限
    workerLambda.addToRolePolicy(new cdk.aws_iam.PolicyStatement({
      actions: ['bedrock:InvokeAgent'],
      resources: [`arn:aws:bedrock:${this.region}:${this.account}:agent/*`],
    }));

    // ── Lambda: Result(結果取得) ─────────────────────────────
    const resultLambda = new lambdaNodejs.NodejsFunction(this, 'ResultLambda', {
      functionName: 'ai-agent-result',
      entry: 'lambda/result/index.ts',
      runtime: lambda.Runtime.NODEJS_22_X,
      timeout: cdk.Duration.seconds(10),
      environment: { JOB_TABLE_NAME: jobsTable.tableName },
    });
    jobsTable.grantReadData(resultLambda);

    // ── API Gateway ────────────────────────────────────────────
    const api = new apigw.RestApi(this, 'AsyncAgentApi', {
      restApiName: 'ai-async-agent-api',
      deployOptions: { stageName: 'v1' },
    });

    const agentResource = api.root.addResource('agent');

    // POST /agent → ジョブ投入(即時 202)
    agentResource.addMethod('POST', new apigw.LambdaIntegration(submitLambda));

    // GET /agent/result/{jobId} → 結果取得
    const resultResource = agentResource
      .addResource('result')
      .addResource('{jobId}');
    resultResource.addMethod('GET', new apigw.LambdaIntegration(resultLambda));

    new cdk.CfnOutput(this, 'ApiUrl', { value: api.url });
  }
}

Lambda 実装(Submit)

// lambda/submit/index.ts
import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { randomUUID } from 'crypto';
import type { APIGatewayProxyHandler } from 'aws-lambda';

const dynamo = new DynamoDBClient({});
const sqs = new SQSClient({});

export const handler: APIGatewayProxyHandler = async (event) => {
  const body = JSON.parse(event.body ?? '{}');
  const jobId = randomUUID();
  const now = Math.floor(Date.now() / 1000);

  // DynamoDB にジョブを PENDING で登録
  await dynamo.send(new PutItemCommand({
    TableName: process.env.JOB_TABLE_NAME!,
    Item: {
      jobId:     { S: jobId },
      status:    { S: 'PENDING' },
      message:   { S: body.message ?? '' },
      createdAt: { N: String(now) },
      ttl:       { N: String(now + 86400) }, // 24h
    },
  }));

  // SQS にキュー投入
  await sqs.send(new SendMessageCommand({
    QueueUrl: process.env.JOB_QUEUE_URL!,
    MessageBody: JSON.stringify({ jobId, message: body.message }),
  }));

  return {
    statusCode: 202,  // Accepted(即時返却)
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ jobId, status: 'PENDING' }),
  };
};

Lambda 実装(Worker)

// lambda/worker/index.ts
import { DynamoDBClient, UpdateItemCommand } from '@aws-sdk/client-dynamodb';
import {
  BedrockAgentRuntimeClient,
  InvokeAgentCommand,
} from '@aws-sdk/client-bedrock-agent-runtime';
import type { SQSHandler } from 'aws-lambda';

const dynamo = new DynamoDBClient({});
const bedrock = new BedrockAgentRuntimeClient({});

export const handler: SQSHandler = async (event) => {
  for (const record of event.Records) {
    const { jobId, message } = JSON.parse(record.body);

    try {
      // PROCESSING に更新
      await updateJobStatus(jobId, 'PROCESSING');

      // Bedrock Agent 呼び出し(時間制限なし!)
      const command = new InvokeAgentCommand({
        agentId:       process.env.BEDROCK_AGENT_ID!,
        agentAliasId:  process.env.BEDROCK_AGENT_ALIAS_ID!,
        sessionId:     jobId,  // jobId をセッションIDに使用
        inputText:     message,
      });

      const response = await bedrock.send(command);
      let result = '';
      for await (const chunk of response.completion) {
        if (chunk.chunk?.bytes) {
          result += new TextDecoder().decode(chunk.chunk.bytes);
        }
      }

      // 完了
      await updateJobStatus(jobId, 'COMPLETED', result);

    } catch (err) {
      await updateJobStatus(jobId, 'FAILED', undefined, String(err));
      throw err; // SQS に失敗を通知(DLQ へ)
    }
  }
};

async function updateJobStatus(
  jobId: string,
  status: string,
  result?: string,
  error?: string,
) {
  await dynamo.send(new UpdateItemCommand({
    TableName: process.env.JOB_TABLE_NAME!,
    Key: { jobId: { S: jobId } },
    UpdateExpression: 'SET #s = :s, updatedAt = :t' +
      (result ? ', #r = :r' : '') +
      (error  ? ', #e = :e' : ''),
    ExpressionAttributeNames: {
      '#s': 'status',
      ...(result ? { '#r': 'result' } : {}),
      ...(error  ? { '#e': 'error'  } : {}),
    },
    ExpressionAttributeValues: {
      ':s': { S: status },
      ':t': { N: String(Math.floor(Date.now() / 1000)) },
      ...(result ? { ':r': { S: result } } : {}),
      ...(error  ? { ':e': { S: error  } } : {}),
    },
  }));
}

パターン B: WebSocket API Gateway

チャット UI などリアルタイム性が求められる場合に最適です。接続を維持したままストリーミング応答できます。

// lib/stacks/websocket-api-stack.ts
import * as apigwv2 from 'aws-cdk-lib/aws-apigatewayv2';
import * as apigwv2integrations from 'aws-cdk-lib/aws-apigatewayv2-integrations';

export class WebSocketApiStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: cdk.StackProps) {
    super(scope, id, props);

    const connectionsTable = new dynamodb.Table(this, 'ConnectionsTable', {
      partitionKey: { name: 'connectionId', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      timeToLiveAttribute: 'ttl',
    });

    // Lambda: メッセージハンドラー(Bedrock Agent 呼び出し)
    const messageHandler = new lambdaNodejs.NodejsFunction(this, 'MessageHandler', {
      entry: 'lambda/ws-message/index.ts',
      timeout: cdk.Duration.minutes(15),   // WebSocket は長時間 OK
      memorySize: 2048,
    });

    // WebSocket API
    const webSocketApi = new apigwv2.WebSocketApi(this, 'AiWebSocketApi', {
      apiName: 'ai-agent-websocket',
      connectRouteOptions: {
        integration: new apigwv2integrations.WebSocketLambdaIntegration(
          'ConnectIntegration', connectHandler,
        ),
      },
      disconnectRouteOptions: {
        integration: new apigwv2integrations.WebSocketLambdaIntegration(
          'DisconnectIntegration', disconnectHandler,
        ),
      },
      defaultRouteOptions: {
        integration: new apigwv2integrations.WebSocketLambdaIntegration(
          'MessageIntegration', messageHandler,
        ),
      },
    });

    const stage = new apigwv2.WebSocketStage(this, 'ProdStage', {
      webSocketApi,
      stageName: 'prod',
      autoDeploy: true,
    });

    // Lambda に @connections API への権限付与
    messageHandler.addToRolePolicy(new cdk.aws_iam.PolicyStatement({
      actions: ['execute-api:ManageConnections'],
      resources: [`arn:aws:execute-api:${this.region}:${this.account}:${webSocketApi.apiId}/*`],
    }));

    new cdk.CfnOutput(this, 'WebSocketUrl', { value: stage.url });
  }
}

クライアントからの利用例

// クライアント(ブラウザ)
const ws = new WebSocket('wss://xxxxxxx.execute-api.ap-northeast-1.amazonaws.com/prod');

ws.onmessage = (event) => {
  const data = JSON.parse(event.data);
  if (data.type === 'chunk') {
    // ストリーミング受信(テキストを少しずつ表示)
    appendToChat(data.text);
  } else if (data.type === 'done') {
    ws.close();
  }
};

ws.onopen = () => {
  ws.send(JSON.stringify({ message: '日本語で教えてください' }));
};

パターン C: Lambda Response Streaming(最小コスト)

既存の REST API 構成をほぼそのまま維持できます。Lambda Function URL または Lambda Streaming を使います。

// lambda/streaming/index.ts
import {
  BedrockAgentRuntimeClient,
  InvokeAgentCommand,
} from '@aws-sdk/client-bedrock-agent-runtime';
import { Handler } from 'aws-lambda';
// @ts-ignore — Lambda streaming types
import awslambda from 'aws-lambda';

const bedrock = new BedrockAgentRuntimeClient({});

// ストリーミング対応ハンドラー
export const handler = awslambda.streamifyResponse(
  async (event: any, responseStream: any) => {
    const body = JSON.parse(event.body ?? '{}');

    responseStream.setContentType('text/event-stream');

    const command = new InvokeAgentCommand({
      agentId:      process.env.BEDROCK_AGENT_ID!,
      agentAliasId: process.env.BEDROCK_AGENT_ALIAS_ID!,
      sessionId:    event.requestContext?.requestId ?? Date.now().toString(),
      inputText:    body.message,
    });

    const response = await bedrock.send(command);

    for await (const chunk of response.completion) {
      if (chunk.chunk?.bytes) {
        const text = new TextDecoder().decode(chunk.chunk.bytes);
        // SSE 形式でチャンクを送信
        responseStream.write(`data: ${JSON.stringify({ text })}\n\n`);
      }
    }

    responseStream.write('data: [DONE]\n\n');
    responseStream.end();
  },
);
// CDK: Lambda Function URL でストリーミング有効化
const streamingLambda = new lambda.Function(this, 'StreamingLambda', {
  runtime: lambda.Runtime.NODEJS_22_X,
  handler: 'index.handler',
  code: lambda.Code.fromAsset('lambda/streaming'),
  timeout: cdk.Duration.minutes(15),
  memorySize: 2048,
});

// Function URL(API GW 不要)
const fnUrl = streamingLambda.addFunctionUrl({
  authType: lambda.FunctionUrlAuthType.AWS_IAM,
  invokeMode: lambda.InvokeMode.RESPONSE_STREAM,  // ← これが重要
  cors: {
    allowedOrigins: ['https://your-frontend.com'],
    allowedMethods: [lambda.HttpMethod.POST],
  },
});

new cdk.CfnOutput(this, 'StreamingUrl', { value: fnUrl.url });

パターン選択フローチャート


各パターンのコスト比較

コスト要素 パターン A パターン B パターン C
SQS $0.40/100 万メッセージ なし なし
DynamoDB $1.25/100 万 Write 接続管理のみ なし
API GW REST API 課金 WebSocket 課金 なし(Function URL 無料)
Lambda Submit + Worker + Result Connect + Handler Streaming のみ
月額(1万リクエスト想定) 約 $5〜10 約 $3〜8 約 $2〜5

CDK での完全な非同期パターン A(本番推奨)

実際のプロダクションで使えるように、監視・セキュリティ・エラーハンドリングを含めた完全実装です。

// lib/stacks/production-async-stack.ts
export class ProductionAsyncStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: cdk.StackProps & {
    kmsKey: kms.Key;
    vpc: ec2.Vpc;
    lambdaSg: ec2.SecurityGroup;
  }) {
    super(scope, id, props);
    const { kmsKey, vpc, lambdaSg } = props;

    // DynamoDB(暗号化・PITR・TTL)
    const jobsTable = new dynamodb.Table(this, 'JobsTable', {
      partitionKey: { name: 'jobId', type: dynamodb.AttributeType.STRING },
      billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
      encryption: dynamodb.TableEncryption.CUSTOMER_MANAGED,
      encryptionKey: kmsKey,
      pointInTimeRecovery: true,
      timeToLiveAttribute: 'ttl',
    });

    // SQS(KMS 暗号化・DLQ・可視性タイムアウト)
    const dlq = new sqs.Queue(this, 'JobDlq', {
      encryptionMasterKey: kmsKey,
      retentionPeriod: cdk.Duration.days(14),
    });
    const jobQueue = new sqs.Queue(this, 'JobQueue', {
      encryptionMasterKey: kmsKey,
      visibilityTimeout: cdk.Duration.seconds(900), // 15 分
      deadLetterQueue: { queue: dlq, maxReceiveCount: 3 },
    });

    // CloudWatch アラーム(DLQ メッセージ急増で通知)
    const dlqAlarm = new cloudwatch.Alarm(this, 'DlqAlarm', {
      metric: dlq.metricNumberOfMessagesSent(),
      threshold: 1,
      evaluationPeriods: 1,
      alarmDescription: '非同期ジョブが失敗し DLQ に積まれています',
    });

    // Worker Lambda(VPC 内・長タイムアウト)
    const workerLambda = new lambdaNodejs.NodejsFunction(this, 'WorkerLambda', {
      entry: 'lambda/worker/index.ts',
      timeout: cdk.Duration.minutes(15),
      memorySize: 2048,
      vpc,
      vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS },
      securityGroups: [lambdaSg],
      tracing: lambda.Tracing.ACTIVE,
      deadLetterQueue: dlq,
      retryAttempts: 2,
      reservedConcurrentExecutions: 10,
      environment: {
        JOB_TABLE_NAME: jobsTable.tableName,
        POWERTOOLS_SERVICE_NAME: 'ai-agent-worker',
      },
    });

    jobsTable.grantWriteData(workerLambda);
    workerLambda.addEventSource(
      new lambdaEventSources.SqsEventSource(jobQueue, {
        batchSize: 1,
        maxConcurrency: 5,
      }),
    );
  }
}

まとめ

API Gateway × Lambda × Bedrock Agent + SageMaker LLM の 29 秒タイムアウト問題は、非同期化によって確実に解決できます。

要件 推奨パターン
管理画面・API 連携 A: SQS + DynamoDB ポーリング
チャット UI・リアルタイム B: WebSocket API
シンプルに始めたい C: Lambda Streaming + Function URL

重要なのは「同期処理の限界を認識し、非同期に設計を変える」という発想の転換です。

CDK で実装する場合は、本記事のコードをそのまま流用できます。ぜひお試しください。


参考リンク

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?