はじめに
先日API GatewayのREST APIがレスポンスストリーミングに対応しました。
上記ブログで、生成AIアプリケーションの回答をストリーミングでユーザーに返す実装例が紹介されています。
ブログではNode.jsが利用されていますが、下記リポジトリにPythonを使った実装例も載っています。
上記のPython実装例を参考にしつつ、API Gateway - Lambda (Python / Strands Agents) - Bedrock の構成をCDKで実装し、動作を見てみました。
これまでどうだったか
先述のAWSブログから図を引用します。
この構成要素におけるレスポンスストリーミングの対応状況は下記のとおりです。
- API Gateway(REST API)は今回のアップデートでレスポンスストリーミングに初めて対応
- Lambdaは 2023/4 からレスポンスストリーミングに対応
- Node.jsのみネイティブに対応
- その他の言語はカスタムランタイムまたはLambda Web Adapterを利用することで対応
Lambda は Node.js マネージドランタイムでのレスポンスストリーミングをサポートしています。その他の言語の場合は、カスタムランタイム API 統合を備えたカスタムランタイムを使用してレスポンスをストリーミングするか、Lambda Web Adapter を使用することができます。
Lambdaの関数URLを利用することで前段にAPI Gatewayが無くてもレスポンスストリーミングを実現することはできていました。関数URLを使うと構成がシンプルになりますが、API Gatewayがレスポンスストリーミングに対応したことで、例えばCognitoやWAFをネイティブに組み合わせて利用することができます。
下記ブログでは、サーバーレスなストリーミング対応生成AIアプリの実装パターンが紹介されています。今回のREST APIに関するアップデートについての記述はありませんが、関連情報としてリンクを載せておきます。
試してみた
11/24時点ではCDKのL2コンストラクトがストリーミングレスポンスに未対応であったため、L1コンストラクトで実装しました。(※AWS DevTools Hero後藤さんが PR を上げてくださっているので間も無くL2で書けるようになりそうです)
主要なコードを抜粋して記載します。
lambda\src\Dockerfile
FROM public.ecr.aws/lambda/python:3.13
WORKDIR /var/task
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.9.1 /lambda-adapter /opt/extensions/lambda-adapter
COPY requirements.txt ./
RUN python -m pip install -r requirements.txt
COPY *.py ./
ENTRYPOINT ["python", "-m", "uvicorn"]
CMD ["main:app", "--host", "0.0.0.0", "--port", "8080"]
lambda\src\main.py
import json
import os
import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from strands import Agent
app = FastAPI()
strands_agent = Agent(
model="jp.anthropic.claude-haiku-4-5-20251001-v1:0",
system_prompt="You are a helpful AI assistant.",
)
# Based on https://aws.amazon.com/blogs/opensource/introducing-strands-agents-1-0-production-ready-multi-agent-orchestration-made-simple/
async def streamer(request: str):
async for event in strands_agent.stream_async(request):
if "data" in event:
yield event["data"]
@app.get("/{request_path:path}")
async def catch_all(request: Request, request_path: str):
# Catch-all route to handle all GET requests
return
@app.post("/{request_path:path}")
async def index(request: Request):
# Get the JSON payload from the POST body
body = await request.body()
payload = json.loads(body.decode("utf-8"))
request_param = payload.get("request")
return StreamingResponse(streamer(request_param))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "8080")))
lambda\src\requirements.txt
strands-agents
fastapi
uvicorn
lib/test-streaming-api-stack.ts
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as cdk from 'aws-cdk-lib/core';
import { Construct } from 'constructs';
import * as path from 'path';
export class TestStreamingApiStack extends cdk.Stack {
constructor(scope: Construct, id: string, props?: cdk.StackProps) {
super(scope, id, props);
// Lambda関数
const fn = new lambda.DockerImageFunction(this, 'Function', {
code: lambda.DockerImageCode.fromImageAsset(
path.join(__dirname, '../lambda/src')
),
memorySize: 512,
timeout: cdk.Duration.minutes(15),
environment: {
AWS_LWA_INVOKE_MODE: 'response_stream',
},
});
// Lambda実行ロールにBedrock権限を付与
fn.role?.addToPrincipalPolicy(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: [
'bedrock:InvokeModelWithResponseStream',
'bedrock:InvokeModel',
],
resources: ['*'],
})
);
// REST API
const api = new apigateway.CfnRestApi(this, 'Api', {
name: 'test-streaming-api',
endpointConfiguration: {
types: ['REGIONAL'],
},
});
// ルートリソースIDを取得
const rootResourceId = api.attrRootResourceId;
// /streaming リソース
const streamingResource = new apigateway.CfnResource(
this,
'StreamingResource',
{
restApiId: api.ref,
parentId: rootResourceId,
pathPart: 'streaming',
}
);
// /streaming リソースのメソッド
const streamingMethod = new apigateway.CfnMethod(this, 'StreamingMethod', {
restApiId: api.ref,
resourceId: streamingResource.ref,
httpMethod: 'ANY',
authorizationType: 'NONE',
integration: {
type: 'AWS_PROXY',
integrationHttpMethod: 'POST',
uri: `arn:aws:apigateway:${this.region}:lambda:path/2021-11-15/functions/${fn.functionArn}/response-streaming-invocations`,
responseTransferMode: 'STREAM',
},
});
// Lambda権限
fn.addPermission('ApiGatewayInvoke', {
principal: new iam.ServicePrincipal('apigateway.amazonaws.com'),
sourceArn: `arn:aws:execute-api:${this.region}:${this.account}:${api.ref}/*/*`,
});
// デプロイメント
const deployment = new apigateway.CfnDeployment(this, 'Deployment', {
restApiId: api.ref,
});
deployment.addDependency(streamingMethod);
const stageName = 'dev';
// ステージ
new apigateway.CfnStage(this, 'ApiStage', {
restApiId: api.ref,
deploymentId: deployment.ref,
stageName,
});
}
}
CURLコマンドでアクセスしてみました。
curl -X POST %URL% -H "Content-Type: application/json" -d "{\"request\": \"Bedrockについてどう思う?\"}" --no-buffer
結果はこちらです。
ちゃんとストリーミングされました!
文字が小さいですが、雰囲気は伝わるでしょうか?
料金
ドキュメントに下記の記述があります。
Streaming enabled REST APIs are metered in 10MB response payload increments.
レスポンスサイズが10MBまでのものは1リクエスト、10MB以上の大きなサイズである場合は2以上のリクエストとしてカウントされるようです。大きなサイズのファイルを頻繁にやり取りするような環境では、コストに気を付けた方が良さそうです。
さいごに
今回は、Strands Agentsを使いたかったためPythonを利用してみましたが、Node.jsであれば引用ブログ内に書かれていたとおりもっと簡易に実装ができそうです。
弊社では一緒に働く仲間を募集中です!
現在、様々な職種を募集しております。
カジュアル面談も可能ですので、ご連絡お待ちしております!
募集内容等詳細は、是非採用サイトをご確認ください。

