はじめに
生成AIは日々進化を続けていますが、簡単なタスクであってもレスポンスに10秒程度はかかることが多く、そのレスポンスの遅さをユーザー体験の中でどう丸め込むかが生成AIサービス開発では重要です。このデメリットを解消する方法として、回答を少しずつ生成しながら返す「ストリーミング形式」が注目されています。最近リリースされた Bedrock AgentCore でも、この形式に対応しています。
そこで、クライアントアプリと Bedrock AgentCore を接続する方法の一つとして、API Gateway のレスポンスストリーミング(response streaming)機能を活用する実装方法について調査します。
レスポンス ストリーミングとは
先に動作デモをお見せします!!
十数秒後に1つ結果を受け取るより、回答が少しずつ返ってくるほうが安心感ありますね。(個人的な感想)

対象読者
- 既存プロジェクトのバックエンドはPythonで書かれており、AIエージェント関連のビジネスロジックもTypeScriptではなくPythonを用いて開発・保守したいユーザー
- 会社のセキュリティポリシーの都合上、WAF等による自社固有のアクセス制限設定を設けたままAIエージェント機能をデプロイしたいユーザー
- Bedrock AgentCoreをさっそく業務で利用したいが、会社のセキュリティポリシー的にクライアントアプリー>Lambdaの公開URL->Bedrock Agentcoreの構成にするのが難しくて悩んでいたユーザー
- 2025年11月19日にリリースされたAmazon API Gatewayのresponse streaming機能を使ってみたいユーザー
- AWS CDKを用いたAWSリソース管理のIaC(infrastructure as code)に興味がある/すでに実践しているユーザー
前置き
-
Bedrock AgentCore自体はruntimeがすでにデプロイされている仮定で進めます。AgentCoreへのデプロイは様々な解説記事が出ております。ご参照ください。
-
API Gatewayのresponse streaming機能は公式サンプルが公開されています。公式サンプルはSAM(Serverless Application Model)で記載されていますが、個人的には最近CDKのほうが好みなので、今回もCDKに変換して進めます。
- 「最小権限付与を実現するgrantメソッドがCDKには搭載されていて、商用開発時にヒューマンエラーが起きにくい」点をとても推しています!!
全体構成
- APIGateway (streaming設定が必要)
- Lambda (Pythonで書かれたFastAPI)
ユーザーの窓口がAPIGWの提供するエンドポイントになり、そのバックエンドとしてAWS Lambdaを起動します。
Lambda関数(Python)でstreaming形式を実現するためには、Lambda Web Adapterを利用したFastAPIを構成する必要があります。
Lambda Web AdapterはECS等のコンテナ向けアプリケーションをLambdaで動かすための拡張機能です。
参考文献欄に詳しく解説されている資料載せていますので、気になる方はコチラもご参照ください!
ディレクトリ構成/ソースコード紹介
cdk init {任意のプロジェクト名} --language pythonを用いて、CDKプロジェクトを初期構成してください。
下記ディレクトリ構成になるように、いくつかのファイルを更新していきます。
cdk-python-app/
├── app.py # 作成するリソースの設定、定義ファイル
├── src/
│ ├── Dockerfile # Lambdaを構成するコンテナイメージ設定
│ ├── main.py # コンテナ内で実行するPythonコード
│ └── requirements.txt
├── cdk.json
├── .gitignore
├── test/
│ └── {任意のテストファイル}
└── README.md
app.py
今回作成するAWSリソースを定義するファイルになります。
上から順に
- 利用するIAMロール
- Lambda関数
- APIGateway
をそれぞれリソース定義しています。APIGatewayのストリーミングレスポンス設定に絞って閲覧したい方は、下の方からご確認いただけますと幸いです。
import aws_cdk as cdk
from aws_cdk import (
Stack,
Duration,
CfnOutput,
aws_lambda as lambda_,
aws_apigateway as apigw,
aws_logs as logs,
aws_iam as iam,
)
from constructs import Construct
class BedrockStreamingStack(Stack):
def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
super().__init__(scope, construct_id, **kwargs)
# Context変数からAGENT_ARNを取得
agent_arn = self.node.try_get_context("agent_arn") or ""
# Lambda実行ロールの作成
lambda_role = iam.Role(
self,
"StreamingFunctionRole",
assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
managed_policies=[
iam.ManagedPolicy.from_aws_managed_policy_name(
"service-role/AWSLambdaBasicExecutionRole"
)
],
)
# DockerイメージベースのLambda関数の作成
streaming_function = lambda_.DockerImageFunction(
self,
"StreamingResponseFunction",
code=lambda_.DockerImageCode.from_image_asset("src"),
timeout=Duration.seconds(300),
memory_size=512,
role=lambda_role,
environment={
"AGENT_ARN": agent_arn,
},
log_retention=logs.RetentionDays.ONE_MONTH,
)
# BedrockAgentCorePolicyの追加
lambda_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["bedrock-agentcore:InvokeAgentRuntime"],
resources=["*"], # アプリケーション内でエージェントARNによって制限されます
)
)
# Bedrockモデル呼び出しポリシーの追加
lambda_role.add_to_policy(
iam.PolicyStatement(
effect=iam.Effect.ALLOW,
actions=["bedrock:InvokeModelWithResponseStream"],
resources=[
f"arn:aws:bedrock:*:{self.account}:inference-profile/*",
"arn:aws:bedrock:*::foundation-model/*",
],
)
)
# API Gatewayのアクセスログ用ロググループ
access_log_group = logs.LogGroup(
self,
"ApiGatewayAccessLogs",
retention=logs.RetentionDays.ONE_MONTH,
log_group_name=f"/aws/apigateway/{self.stack_name}/access-logs",
)
# API Gateway用のIAMロールを作成(Lambda呼び出し用)
api_gateway_role = iam.Role(
self,
"ApiGatewayLambdaInvokeRole",
assumed_by=iam.ServicePrincipal("apigateway.amazonaws.com"),
description="Role for API Gateway to invoke Lambda function",
)
# Lambda関数の呼び出し権限を付与
streaming_function.grant_invoke(api_gateway_role)
# OpenAPI定義を使用してREST APIを作成(ストリーミングレスポンス対応)
openapi_definition = {
"openapi": "3.0.1",
"info": {
"title": "Bedrock Streaming API",
"version": "1.0.0",
},
"paths": {
"/ask": {
"post": {
"requestBody": {
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"request": {
"type": "string",
"description": "The request to send to the Strands agent",
}
},
"required": ["request"],
}
}
}
},
"responses": {
"200": {
"description": "Streaming response from Strands agent",
"content": {
"text/html": {
"schema": {"type": "string"}
}
},
}
},
"x-amazon-apigateway-integration": {
"uri": f"arn:aws:apigateway:{self.region}:lambda:path/2025-11-15/functions/{streaming_function.function_arn}/response-streaming-invocations",
"httpMethod": "POST",
"responseTransferMode": "STREAM",
"type": "aws_proxy",
"credentials": api_gateway_role.role_arn,
},
}
}
},
}
api = apigw.SpecRestApi(
self,
"BedrockStreamingApi",
api_definition=apigw.ApiDefinition.from_inline(openapi_definition),
endpoint_types=[apigw.EndpointType.REGIONAL],
deploy_options=apigw.StageOptions(
stage_name="Prod",
logging_level=apigw.MethodLoggingLevel.INFO,
data_trace_enabled=True,
metrics_enabled=True,
access_log_destination=apigw.LogGroupLogDestination(access_log_group),
access_log_format=apigw.AccessLogFormat.json_with_standard_fields(
caller=False,
http_method=True,
ip=True,
protocol=True,
request_time=True,
resource_path=True,
response_length=True,
status=True,
user=True,
),
),
)
# Lambda Permissionを明示的に追加(API Gatewayからの呼び出しを許可)
streaming_function.add_permission(
"ApiGatewayInvokePermission",
principal=iam.ServicePrincipal("apigateway.amazonaws.com"),
action="lambda:InvokeFunction",
source_arn=f"arn:aws:execute-api:{self.region}:{self.account}:{api.rest_api_id}/*/*",
)
# 出力
CfnOutput(
self,
"BedrockStreamingApiUrl",
description="API Gateway endpoint URL for Bedrock streaming",
value=f"{api.url}ask",
)
CfnOutput(
self,
"AgentArn",
description="Bedrock Agent ARN configured for this deployment",
value=agent_arn if agent_arn else "Not configured",
)
app = cdk.App()
BedrockStreamingStack(app, "BedrockStreamingStack")
app.synth()
特に注意するべきはopenapi_definitionで定義した設定情報内の
x-amazon-apigateway-integration.responseTransferMode をSTREAMに設定する部分です。
マネジメントコンソール上でAPIGatewayを設定する場合は添付の画像のように選択できます。

src/Dockerfile
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0
FROM public.ecr.aws/lambda/python:3.13
# Lambda Web Adapterのインストール
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.8.4 /lambda-adapter /opt/extensions/lambda-adapter
# 作業ディレクトリの設定
WORKDIR ${LAMBDA_TASK_ROOT}
# 依存関係のインストール
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# アプリケーションコードのコピー
COPY main.py .
# 環境変数の設定
ENV AWS_LWA_INVOKE_MODE=response_stream
ENV AWS_LWA_PORT=8000
ENV AWS_LWA_READINESS_CHECK_PORT=8000
ENV AWS_LWA_READINESS_CHECK_PATH=/docs
ENV AWS_LWA_ASYNC_INIT=true
# FastAPIアプリケーションの起動
# Lambda Web Adapterがこのコマンドを実行します
ENTRYPOINT []
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--log-level", "info"]
requirements.txt
fastapi
uvicorn
boto3
src/main.py
FastAPIを用いたストリーミング方式を実現するために、yieldを用いて回答の生成ごとにレスポンスします。
またBedrock AgentCoreからのレスポンス内容を任意の形に変換したい場合は、extract_text_from_event_line関数の処理を変更してください。
ServerSentEvents(SSE)形式に対応させたい場合は、data: {"token": text}への変換が求められると思います。
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import boto3
import json,ast
import os
import logging
import string
import random
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
app = FastAPI()
AGENT_ARN = os.environ.get("AGENT_ARN")
def generate_runtime_session_id(length: int = 34) -> str:
"""Generate a random runtime session ID with specified length (minimum 34 characters)"""
if length < 34:
length = 34
characters = string.ascii_lowercase + string.ascii_uppercase + string.digits
return ''.join(random.choice(characters) for _ in range(length))
def extract_text_from_event_line(line: str) -> str:
"""
イベントストリームの1行からテキストデータを抽出する
Args:
line: イベントストリームの1行("data: "プレフィックスは除去済み)
Returns:
抽出されたテキスト。抽出できない場合は空文字列
"""
try:
# まずJSONとしてパースを試みる
event_data = json.loads(line)
# 辞書型の場合のみ処理
if isinstance(event_data, dict):
# パターン1: {"event": {"contentBlockDelta": ...}}
if "event" in event_data:
event_obj = event_data["event"]
if "contentBlockDelta" in event_obj:
delta = event_obj["contentBlockDelta"].get("delta", {})
text = delta.get("text", "")
if text:
logger.debug(f"Extracted text from contentBlockDelta: {text}")
return text
# messageStartやその他のイベントはログのみ
elif "messageStart" in event_obj:
logger.debug(f"Message started: {event_obj['messageStart']}")
# パターン2: {"data": "text", ...} (直接dataキーがある場合)
elif "data" in event_data:
text = event_data["data"]
if text and isinstance(text, str):
logger.debug(f"Extracted text from data: {text}")
return text
# 文字列の場合(Pythonオブジェクトの文字列表現)
elif isinstance(event_data, str):
# 文字列表現されたdictをパースしようとする
try:
parsed_data = ast.literal_eval(event_data)
if isinstance(parsed_data, dict) and "data" in parsed_data:
text = parsed_data["data"]
if text and isinstance(text, str):
logger.debug(f"Extracted text from parsed data: {text}")
return text
except (ValueError, SyntaxError):
logger.debug(f"Could not parse string as dict: {event_data[:100]}")
except json.JSONDecodeError:
# JSONでない場合、Pythonオブジェクトの文字列表現として処理
try:
parsed_data = ast.literal_eval(line)
if isinstance(parsed_data, dict) and "data" in parsed_data:
text = parsed_data["data"]
if text and isinstance(text, str):
logger.debug(f"Extracted text from literal_eval: {text}")
return text
except (ValueError, SyntaxError):
logger.warning(f"Could not parse line: {line[:100]}")
return ""
def streamer(request: str):
"""
Invoke Bedrock Agent and stream the response
"""
bedrock_agent_client = boto3.client('bedrock-agentcore', region_name="ap-northeast-1")
# Generate new random session ID for each request
session_id = generate_runtime_session_id()
# Prepare the payload
payload = json.dumps({"prompt": request}).encode()
logger.info("Invoking Agent: agentId=%s, sessionId=%s, inputText=%s", AGENT_ARN, session_id, request)
try:
response = bedrock_agent_client.invoke_agent_runtime(
agentRuntimeArn=AGENT_ARN,
runtimeSessionId=session_id,
payload=payload,
qualifier="DEFAULT"
)
event_stream = response.get('contentType', [])
# Handle streaming response
if "text/event-stream" in response.get("contentType", ""):
for line in response["response"].iter_lines(chunk_size=10):
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
line = line[6:]
# イベント行からテキストを抽出
text = extract_text_from_event_line(line)
if text:
yield text
else:
# Get event stream from completion field
event_stream = response.get('completion', [])
logger.info("Event stream type: %s", type(event_stream))
except Exception as e:
error_msg = "Error invoking agent: " + str(e)
logger.error(error_msg, exc_info=True)
yield f"\n\nError: {error_msg}"
@app.get("/{request_path:path}")
async def catch_all(request: Request, request_path: str):
# Catch-all route to handle all GET requests
return
@app.post("/{request_path:path}")
async def index(request: Request):
# Get the JSON payload from the POST body
body = await request.body()
payload = json.loads(body.decode("utf-8"))
request_param = payload.get("message") #クライアント側がbodyに含めるkeyに合わせること
return StreamingResponse(streamer(request_param))
デプロイ手順
- 下記コマンドを実行して、AWS上にデプロイします。
cdk bootstrap
cdk deploy -c agent_arn={すでにデプロイ済みのAgentCore runtimeのARN}
実際のレスポンスの様子【再掲】
- デプロイ先の
/askあてにリクエストすること - ボディ内のmessageキーに生成AIへのinput内容を含ませること
に注意してください。
curl -X POST {CDKでデプロイされたAPIGatewayのエンドポイントURL}/{stage名}/ask -H "Content-Type: application/json" -d "{\"message\": \"AWS認定試験について詳しく教えてください。\"}"
感想
- 当初は「ALB + ECSでストリーミングAPIを作る」ネタ仕込みをしていたのですが、APIGatewayにstreaming機能が追加されたということで、急遽APIGW + Lambdaに切り替えました。
- Pythonで動くLambdaだと
Lambda Web Adapterの利用が必須ということで、現時点ではマネコンポチポチのみで本構成を実現することは難しそうです。(TypeScriptはこのへん恵まれていてちょっとうらやましい。。。) - 実務利用の際にはAPIGatewayへのWAFやレート制限,Cognitoオーソライザーの設定等も加えたうえで、ご活用のほどよろしくお願いします。
