はじめに
AWS で生成 AI バックエンドを CDK で構築していると、必ずぶつかる壁があります。
API Gateway の統合タイムアウト 29 秒問題です。
Bedrock Agent(Claude 3.5 Sonnet)が RAG 検索 + SageMaker LLM 推論(Tsuzumi2 等)を連鎖的に実行する場合、処理時間は平気で 30〜90 秒を超えます。しかし API Gateway には 最大 29 秒という厳格な制限があり、504 Gateway Timeout が多発します。
本記事では、この問題を CDK TypeScript で解決する 3 つのパターンを、アーキテクチャ図・実装コード付きで解説します。
問題の全体像
なぜ Lambda タイムアウトを伸ばしても解決しないのか
Lambda の timeout: 120s を設定していても、API Gateway の制限が 29 秒であるため、Lambda が処理を続けている間に API GW が先にタイムアウトします。
API GW (29s) → Lambda (120s) → Bedrock Agent → SageMaker (30-90s)
↑
ここが先に死ぬ
解決パターン 3 選
| パターン | 適合ユースケース | 実装コスト | UX |
|---|---|---|---|
| A: 非同期 + ポーリング(SQS + DynamoDB) | バッチ処理、管理画面 | 中 | ポーリングが必要 |
| B: WebSocket API | チャット UI、リアルタイム | 中 | ストリーミング感◎ |
| C: Lambda Response Streaming | API レスポンス、シンプル構成 | 低 | ストリーミング受信◎ |
パターン A: 非同期 + ポーリング(SQS + DynamoDB)
最もシンプルで堅牢なパターンです。
CDK 実装
// lib/stacks/async-api-stack.ts
import * as cdk from 'aws-cdk-lib';
import * as apigw from 'aws-cdk-lib/aws-apigateway';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as lambdaNodejs from 'aws-cdk-lib/aws-lambda-nodejs';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as dynamodb from 'aws-cdk-lib/aws-dynamodb';
import * as lambdaEventSources from 'aws-cdk-lib/aws-lambda-event-sources';
import { Construct } from 'constructs';
export class AsyncApiStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
// ── DynamoDB: ジョブ管理テーブル ──────────────────────────
const jobsTable = new dynamodb.Table(this, 'JobsTable', {
tableName: 'ai-agent-jobs',
partitionKey: { name: 'jobId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
timeToLiveAttribute: 'ttl', // 24h で自動削除
encryption: dynamodb.TableEncryption.AWS_MANAGED,
pointInTimeRecovery: true,
removalPolicy: cdk.RemovalPolicy.DESTROY,
});
// ── SQS: ジョブキュー ──────────────────────────────────────
const dlq = new sqs.Queue(this, 'JobDlq', {
queueName: 'ai-agent-job-dlq',
retentionPeriod: cdk.Duration.days(14),
});
const jobQueue = new sqs.Queue(this, 'JobQueue', {
queueName: 'ai-agent-job-queue',
visibilityTimeout: cdk.Duration.seconds(180), // Worker Lambda タイムアウトの 3 倍
deadLetterQueue: { queue: dlq, maxReceiveCount: 3 },
});
// ── Lambda: Submit(即時 202 を返す) ───────────────────────
const submitLambda = new lambdaNodejs.NodejsFunction(this, 'SubmitLambda', {
functionName: 'ai-agent-submit',
entry: 'lambda/submit/index.ts',
runtime: lambda.Runtime.NODEJS_22_X,
timeout: cdk.Duration.seconds(10), // 即時応答なので短くてOK
environment: {
JOB_TABLE_NAME: jobsTable.tableName,
JOB_QUEUE_URL: jobQueue.queueUrl,
},
});
jobsTable.grantWriteData(submitLambda);
jobQueue.grantSendMessages(submitLambda);
// ── Lambda: Worker(Bedrock Agent 実行・タイムアウト 15 分) ─
const workerLambda = new lambdaNodejs.NodejsFunction(this, 'WorkerLambda', {
functionName: 'ai-agent-worker',
entry: 'lambda/worker/index.ts',
runtime: lambda.Runtime.NODEJS_22_X,
timeout: cdk.Duration.minutes(15), // 十分な時間を確保
memorySize: 2048,
environment: {
JOB_TABLE_NAME: jobsTable.tableName,
BEDROCK_AGENT_ID: 'YOUR_AGENT_ID',
BEDROCK_AGENT_ALIAS_ID: 'YOUR_ALIAS_ID',
},
});
jobsTable.grantWriteData(workerLambda);
// SQS イベントソース
workerLambda.addEventSource(
new lambdaEventSources.SqsEventSource(jobQueue, {
batchSize: 1, // 1件ずつ処理(LLM は重いため)
maxConcurrency: 5,
}),
);
// Bedrock 呼び出し権限
workerLambda.addToRolePolicy(new cdk.aws_iam.PolicyStatement({
actions: ['bedrock:InvokeAgent'],
resources: [`arn:aws:bedrock:${this.region}:${this.account}:agent/*`],
}));
// ── Lambda: Result(結果取得) ─────────────────────────────
const resultLambda = new lambdaNodejs.NodejsFunction(this, 'ResultLambda', {
functionName: 'ai-agent-result',
entry: 'lambda/result/index.ts',
runtime: lambda.Runtime.NODEJS_22_X,
timeout: cdk.Duration.seconds(10),
environment: { JOB_TABLE_NAME: jobsTable.tableName },
});
jobsTable.grantReadData(resultLambda);
// ── API Gateway ────────────────────────────────────────────
const api = new apigw.RestApi(this, 'AsyncAgentApi', {
restApiName: 'ai-async-agent-api',
deployOptions: { stageName: 'v1' },
});
const agentResource = api.root.addResource('agent');
// POST /agent → ジョブ投入(即時 202)
agentResource.addMethod('POST', new apigw.LambdaIntegration(submitLambda));
// GET /agent/result/{jobId} → 結果取得
const resultResource = agentResource
.addResource('result')
.addResource('{jobId}');
resultResource.addMethod('GET', new apigw.LambdaIntegration(resultLambda));
new cdk.CfnOutput(this, 'ApiUrl', { value: api.url });
}
}
Lambda 実装(Submit)
// lambda/submit/index.ts
import { DynamoDBClient, PutItemCommand } from '@aws-sdk/client-dynamodb';
import { SQSClient, SendMessageCommand } from '@aws-sdk/client-sqs';
import { randomUUID } from 'crypto';
import type { APIGatewayProxyHandler } from 'aws-lambda';
const dynamo = new DynamoDBClient({});
const sqs = new SQSClient({});
export const handler: APIGatewayProxyHandler = async (event) => {
const body = JSON.parse(event.body ?? '{}');
const jobId = randomUUID();
const now = Math.floor(Date.now() / 1000);
// DynamoDB にジョブを PENDING で登録
await dynamo.send(new PutItemCommand({
TableName: process.env.JOB_TABLE_NAME!,
Item: {
jobId: { S: jobId },
status: { S: 'PENDING' },
message: { S: body.message ?? '' },
createdAt: { N: String(now) },
ttl: { N: String(now + 86400) }, // 24h
},
}));
// SQS にキュー投入
await sqs.send(new SendMessageCommand({
QueueUrl: process.env.JOB_QUEUE_URL!,
MessageBody: JSON.stringify({ jobId, message: body.message }),
}));
return {
statusCode: 202, // Accepted(即時返却)
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ jobId, status: 'PENDING' }),
};
};
Lambda 実装(Worker)
// lambda/worker/index.ts
import { DynamoDBClient, UpdateItemCommand } from '@aws-sdk/client-dynamodb';
import {
BedrockAgentRuntimeClient,
InvokeAgentCommand,
} from '@aws-sdk/client-bedrock-agent-runtime';
import type { SQSHandler } from 'aws-lambda';
const dynamo = new DynamoDBClient({});
const bedrock = new BedrockAgentRuntimeClient({});
export const handler: SQSHandler = async (event) => {
for (const record of event.Records) {
const { jobId, message } = JSON.parse(record.body);
try {
// PROCESSING に更新
await updateJobStatus(jobId, 'PROCESSING');
// Bedrock Agent 呼び出し(時間制限なし!)
const command = new InvokeAgentCommand({
agentId: process.env.BEDROCK_AGENT_ID!,
agentAliasId: process.env.BEDROCK_AGENT_ALIAS_ID!,
sessionId: jobId, // jobId をセッションIDに使用
inputText: message,
});
const response = await bedrock.send(command);
let result = '';
for await (const chunk of response.completion) {
if (chunk.chunk?.bytes) {
result += new TextDecoder().decode(chunk.chunk.bytes);
}
}
// 完了
await updateJobStatus(jobId, 'COMPLETED', result);
} catch (err) {
await updateJobStatus(jobId, 'FAILED', undefined, String(err));
throw err; // SQS に失敗を通知(DLQ へ)
}
}
};
async function updateJobStatus(
jobId: string,
status: string,
result?: string,
error?: string,
) {
await dynamo.send(new UpdateItemCommand({
TableName: process.env.JOB_TABLE_NAME!,
Key: { jobId: { S: jobId } },
UpdateExpression: 'SET #s = :s, updatedAt = :t' +
(result ? ', #r = :r' : '') +
(error ? ', #e = :e' : ''),
ExpressionAttributeNames: {
'#s': 'status',
...(result ? { '#r': 'result' } : {}),
...(error ? { '#e': 'error' } : {}),
},
ExpressionAttributeValues: {
':s': { S: status },
':t': { N: String(Math.floor(Date.now() / 1000)) },
...(result ? { ':r': { S: result } } : {}),
...(error ? { ':e': { S: error } } : {}),
},
}));
}
パターン B: WebSocket API Gateway
チャット UI などリアルタイム性が求められる場合に最適です。接続を維持したままストリーミング応答できます。
// lib/stacks/websocket-api-stack.ts
import * as apigwv2 from 'aws-cdk-lib/aws-apigatewayv2';
import * as apigwv2integrations from 'aws-cdk-lib/aws-apigatewayv2-integrations';
export class WebSocketApiStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: cdk.StackProps) {
super(scope, id, props);
const connectionsTable = new dynamodb.Table(this, 'ConnectionsTable', {
partitionKey: { name: 'connectionId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
timeToLiveAttribute: 'ttl',
});
// Lambda: メッセージハンドラー(Bedrock Agent 呼び出し)
const messageHandler = new lambdaNodejs.NodejsFunction(this, 'MessageHandler', {
entry: 'lambda/ws-message/index.ts',
timeout: cdk.Duration.minutes(15), // WebSocket は長時間 OK
memorySize: 2048,
});
// WebSocket API
const webSocketApi = new apigwv2.WebSocketApi(this, 'AiWebSocketApi', {
apiName: 'ai-agent-websocket',
connectRouteOptions: {
integration: new apigwv2integrations.WebSocketLambdaIntegration(
'ConnectIntegration', connectHandler,
),
},
disconnectRouteOptions: {
integration: new apigwv2integrations.WebSocketLambdaIntegration(
'DisconnectIntegration', disconnectHandler,
),
},
defaultRouteOptions: {
integration: new apigwv2integrations.WebSocketLambdaIntegration(
'MessageIntegration', messageHandler,
),
},
});
const stage = new apigwv2.WebSocketStage(this, 'ProdStage', {
webSocketApi,
stageName: 'prod',
autoDeploy: true,
});
// Lambda に @connections API への権限付与
messageHandler.addToRolePolicy(new cdk.aws_iam.PolicyStatement({
actions: ['execute-api:ManageConnections'],
resources: [`arn:aws:execute-api:${this.region}:${this.account}:${webSocketApi.apiId}/*`],
}));
new cdk.CfnOutput(this, 'WebSocketUrl', { value: stage.url });
}
}
クライアントからの利用例
// クライアント(ブラウザ)
const ws = new WebSocket('wss://xxxxxxx.execute-api.ap-northeast-1.amazonaws.com/prod');
ws.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'chunk') {
// ストリーミング受信(テキストを少しずつ表示)
appendToChat(data.text);
} else if (data.type === 'done') {
ws.close();
}
};
ws.onopen = () => {
ws.send(JSON.stringify({ message: '日本語で教えてください' }));
};
パターン C: Lambda Response Streaming(最小コスト)
既存の REST API 構成をほぼそのまま維持できます。Lambda Function URL または Lambda Streaming を使います。
// lambda/streaming/index.ts
import {
BedrockAgentRuntimeClient,
InvokeAgentCommand,
} from '@aws-sdk/client-bedrock-agent-runtime';
import { Handler } from 'aws-lambda';
// @ts-ignore — Lambda streaming types
import awslambda from 'aws-lambda';
const bedrock = new BedrockAgentRuntimeClient({});
// ストリーミング対応ハンドラー
export const handler = awslambda.streamifyResponse(
async (event: any, responseStream: any) => {
const body = JSON.parse(event.body ?? '{}');
responseStream.setContentType('text/event-stream');
const command = new InvokeAgentCommand({
agentId: process.env.BEDROCK_AGENT_ID!,
agentAliasId: process.env.BEDROCK_AGENT_ALIAS_ID!,
sessionId: event.requestContext?.requestId ?? Date.now().toString(),
inputText: body.message,
});
const response = await bedrock.send(command);
for await (const chunk of response.completion) {
if (chunk.chunk?.bytes) {
const text = new TextDecoder().decode(chunk.chunk.bytes);
// SSE 形式でチャンクを送信
responseStream.write(`data: ${JSON.stringify({ text })}\n\n`);
}
}
responseStream.write('data: [DONE]\n\n');
responseStream.end();
},
);
// CDK: Lambda Function URL でストリーミング有効化
const streamingLambda = new lambda.Function(this, 'StreamingLambda', {
runtime: lambda.Runtime.NODEJS_22_X,
handler: 'index.handler',
code: lambda.Code.fromAsset('lambda/streaming'),
timeout: cdk.Duration.minutes(15),
memorySize: 2048,
});
// Function URL(API GW 不要)
const fnUrl = streamingLambda.addFunctionUrl({
authType: lambda.FunctionUrlAuthType.AWS_IAM,
invokeMode: lambda.InvokeMode.RESPONSE_STREAM, // ← これが重要
cors: {
allowedOrigins: ['https://your-frontend.com'],
allowedMethods: [lambda.HttpMethod.POST],
},
});
new cdk.CfnOutput(this, 'StreamingUrl', { value: fnUrl.url });
パターン選択フローチャート
各パターンのコスト比較
| コスト要素 | パターン A | パターン B | パターン C |
|---|---|---|---|
| SQS | $0.40/100 万メッセージ | なし | なし |
| DynamoDB | $1.25/100 万 Write | 接続管理のみ | なし |
| API GW | REST API 課金 | WebSocket 課金 | なし(Function URL 無料) |
| Lambda | Submit + Worker + Result | Connect + Handler | Streaming のみ |
| 月額(1万リクエスト想定) | 約 $5〜10 | 約 $3〜8 | 約 $2〜5 |
CDK での完全な非同期パターン A(本番推奨)
実際のプロダクションで使えるように、監視・セキュリティ・エラーハンドリングを含めた完全実装です。
// lib/stacks/production-async-stack.ts
export class ProductionAsyncStack extends cdk.Stack {
constructor(scope: Construct, id: string, props: cdk.StackProps & {
kmsKey: kms.Key;
vpc: ec2.Vpc;
lambdaSg: ec2.SecurityGroup;
}) {
super(scope, id, props);
const { kmsKey, vpc, lambdaSg } = props;
// DynamoDB(暗号化・PITR・TTL)
const jobsTable = new dynamodb.Table(this, 'JobsTable', {
partitionKey: { name: 'jobId', type: dynamodb.AttributeType.STRING },
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
encryption: dynamodb.TableEncryption.CUSTOMER_MANAGED,
encryptionKey: kmsKey,
pointInTimeRecovery: true,
timeToLiveAttribute: 'ttl',
});
// SQS(KMS 暗号化・DLQ・可視性タイムアウト)
const dlq = new sqs.Queue(this, 'JobDlq', {
encryptionMasterKey: kmsKey,
retentionPeriod: cdk.Duration.days(14),
});
const jobQueue = new sqs.Queue(this, 'JobQueue', {
encryptionMasterKey: kmsKey,
visibilityTimeout: cdk.Duration.seconds(900), // 15 分
deadLetterQueue: { queue: dlq, maxReceiveCount: 3 },
});
// CloudWatch アラーム(DLQ メッセージ急増で通知)
const dlqAlarm = new cloudwatch.Alarm(this, 'DlqAlarm', {
metric: dlq.metricNumberOfMessagesSent(),
threshold: 1,
evaluationPeriods: 1,
alarmDescription: '非同期ジョブが失敗し DLQ に積まれています',
});
// Worker Lambda(VPC 内・長タイムアウト)
const workerLambda = new lambdaNodejs.NodejsFunction(this, 'WorkerLambda', {
entry: 'lambda/worker/index.ts',
timeout: cdk.Duration.minutes(15),
memorySize: 2048,
vpc,
vpcSubnets: { subnetType: ec2.SubnetType.PRIVATE_WITH_EGRESS },
securityGroups: [lambdaSg],
tracing: lambda.Tracing.ACTIVE,
deadLetterQueue: dlq,
retryAttempts: 2,
reservedConcurrentExecutions: 10,
environment: {
JOB_TABLE_NAME: jobsTable.tableName,
POWERTOOLS_SERVICE_NAME: 'ai-agent-worker',
},
});
jobsTable.grantWriteData(workerLambda);
workerLambda.addEventSource(
new lambdaEventSources.SqsEventSource(jobQueue, {
batchSize: 1,
maxConcurrency: 5,
}),
);
}
}
まとめ
API Gateway × Lambda × Bedrock Agent + SageMaker LLM の 29 秒タイムアウト問題は、非同期化によって確実に解決できます。
| 要件 | 推奨パターン |
|---|---|
| 管理画面・API 連携 | A: SQS + DynamoDB ポーリング |
| チャット UI・リアルタイム | B: WebSocket API |
| シンプルに始めたい | C: Lambda Streaming + Function URL |
重要なのは「同期処理の限界を認識し、非同期に設計を変える」という発想の転換です。
CDK で実装する場合は、本記事のコードをそのまま流用できます。ぜひお試しください。