9
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Python & AWSユーザー向け】AIエージェント用のStreamingAPIを作ってみよう!

9
Last updated at Posted at 2025-11-26

はじめに

生成AIは日々進化を続けていますが、簡単なタスクであってもレスポンスに10秒程度はかかることが多く、そのレスポンスの遅さをユーザー体験の中でどう丸め込むかが生成AIサービス開発では重要です。このデメリットを解消する方法として、回答を少しずつ生成しながら返す「ストリーミング形式」が注目されています。最近リリースされた Bedrock AgentCore でも、この形式に対応しています。
そこで、クライアントアプリと Bedrock AgentCore を接続する方法の一つとして、API Gateway のレスポンスストリーミング(response streaming)機能を活用する実装方法について調査します。

レスポンス ストリーミングとは

先に動作デモをお見せします!!
十数秒後に1つ結果を受け取るより、回答が少しずつ返ってくるほうが安心感ありますね。(個人的な感想)
streaming_demo_20251123_004.gif

対象読者

  • 既存プロジェクトのバックエンドはPythonで書かれており、AIエージェント関連のビジネスロジックもTypeScriptではなくPythonを用いて開発・保守したいユーザー
  • 会社のセキュリティポリシーの都合上、WAF等による自社固有のアクセス制限設定を設けたままAIエージェント機能をデプロイしたいユーザー
  • Bedrock AgentCoreをさっそく業務で利用したいが、会社のセキュリティポリシー的にクライアントアプリー>Lambdaの公開URL->Bedrock Agentcoreの構成にするのが難しくて悩んでいたユーザー
  • 2025年11月19日にリリースされたAmazon API Gatewayのresponse streaming機能を使ってみたいユーザー
  • AWS CDKを用いたAWSリソース管理のIaC(infrastructure as code)に興味がある/すでに実践しているユーザー

前置き

全体構成

20251123_構成図_001.drawio.png

  • APIGateway (streaming設定が必要)
  • Lambda (Pythonで書かれたFastAPI)

ユーザーの窓口がAPIGWの提供するエンドポイントになり、そのバックエンドとしてAWS Lambdaを起動します。  
Lambda関数(Python)でstreaming形式を実現するためには、Lambda Web Adapterを利用したFastAPIを構成する必要があります。

Lambda Web AdapterはECS等のコンテナ向けアプリケーションをLambdaで動かすための拡張機能です。
参考文献欄に詳しく解説されている資料載せていますので、気になる方はコチラもご参照ください!

ディレクトリ構成/ソースコード紹介

cdk init {任意のプロジェクト名} --language pythonを用いて、CDKプロジェクトを初期構成してください。
下記ディレクトリ構成になるように、いくつかのファイルを更新していきます。

cdk-python-app/
├── app.py                   # 作成するリソースの設定、定義ファイル
├── src/
│   ├── Dockerfile    # Lambdaを構成するコンテナイメージ設定
│   ├── main.py       # コンテナ内で実行するPythonコード
│   └── requirements.txt
├── cdk.json
├── .gitignore
├── test/
│   └── {任意のテストファイル}
└── README.md

app.py

今回作成するAWSリソースを定義するファイルになります。
上から順に

  • 利用するIAMロール
  • Lambda関数
  • APIGateway

をそれぞれリソース定義しています。APIGatewayのストリーミングレスポンス設定に絞って閲覧したい方は、下の方からご確認いただけますと幸いです。

import aws_cdk as cdk
from aws_cdk import (
    Stack,
    Duration,
    CfnOutput,
    aws_lambda as lambda_,
    aws_apigateway as apigw,
    aws_logs as logs,
    aws_iam as iam,
)
from constructs import Construct


class BedrockStreamingStack(Stack):
    def __init__(self, scope: Construct, construct_id: str, **kwargs) -> None:
        super().__init__(scope, construct_id, **kwargs)

        # Context変数からAGENT_ARNを取得
        agent_arn = self.node.try_get_context("agent_arn") or ""

        # Lambda実行ロールの作成
        lambda_role = iam.Role(
            self,
            "StreamingFunctionRole",
            assumed_by=iam.ServicePrincipal("lambda.amazonaws.com"),
            managed_policies=[
                iam.ManagedPolicy.from_aws_managed_policy_name(
                    "service-role/AWSLambdaBasicExecutionRole"
                )
            ],
        )

        # DockerイメージベースのLambda関数の作成
        streaming_function = lambda_.DockerImageFunction(
            self,
            "StreamingResponseFunction",
            code=lambda_.DockerImageCode.from_image_asset("src"),
            timeout=Duration.seconds(300),
            memory_size=512,
            role=lambda_role,
            environment={
                "AGENT_ARN": agent_arn,
            },
            log_retention=logs.RetentionDays.ONE_MONTH,
        )

        # BedrockAgentCorePolicyの追加
        lambda_role.add_to_policy(
            iam.PolicyStatement(
                effect=iam.Effect.ALLOW,
                actions=["bedrock-agentcore:InvokeAgentRuntime"],
                resources=["*"],  # アプリケーション内でエージェントARNによって制限されます
            )
        )

        # Bedrockモデル呼び出しポリシーの追加
        lambda_role.add_to_policy(
            iam.PolicyStatement(
                effect=iam.Effect.ALLOW,
                actions=["bedrock:InvokeModelWithResponseStream"],
                resources=[
                    f"arn:aws:bedrock:*:{self.account}:inference-profile/*",
                    "arn:aws:bedrock:*::foundation-model/*",
                ],
            )
        )

        # API Gatewayのアクセスログ用ロググループ
        access_log_group = logs.LogGroup(
            self,
            "ApiGatewayAccessLogs",
            retention=logs.RetentionDays.ONE_MONTH,
            log_group_name=f"/aws/apigateway/{self.stack_name}/access-logs",
        )

        # API Gateway用のIAMロールを作成(Lambda呼び出し用)
        api_gateway_role = iam.Role(
            self,
            "ApiGatewayLambdaInvokeRole",
            assumed_by=iam.ServicePrincipal("apigateway.amazonaws.com"),
            description="Role for API Gateway to invoke Lambda function",
        )
        
        # Lambda関数の呼び出し権限を付与
        streaming_function.grant_invoke(api_gateway_role)

        # OpenAPI定義を使用してREST APIを作成(ストリーミングレスポンス対応)
        openapi_definition = {
            "openapi": "3.0.1",
            "info": {
                "title": "Bedrock Streaming API",
                "version": "1.0.0",
            },
            "paths": {
                "/ask": {
                    "post": {
                        "requestBody": {
                            "content": {
                                "application/json": {
                                    "schema": {
                                        "type": "object",
                                        "properties": {
                                            "request": {
                                                "type": "string",
                                                "description": "The request to send to the Strands agent",
                                            }
                                        },
                                        "required": ["request"],
                                    }
                                }
                            }
                        },
                        "responses": {
                            "200": {
                                "description": "Streaming response from Strands agent",
                                "content": {
                                    "text/html": {
                                        "schema": {"type": "string"}
                                    }
                                },
                            }
                        },
                        "x-amazon-apigateway-integration": {
                            "uri": f"arn:aws:apigateway:{self.region}:lambda:path/2025-11-15/functions/{streaming_function.function_arn}/response-streaming-invocations",
                            "httpMethod": "POST",
                            "responseTransferMode": "STREAM",
                            "type": "aws_proxy",
                            "credentials": api_gateway_role.role_arn,
                        },
                    }
                }
            },
        }

        api = apigw.SpecRestApi(
            self,
            "BedrockStreamingApi",
            api_definition=apigw.ApiDefinition.from_inline(openapi_definition),
            endpoint_types=[apigw.EndpointType.REGIONAL],
            deploy_options=apigw.StageOptions(
                stage_name="Prod",
                logging_level=apigw.MethodLoggingLevel.INFO,
                data_trace_enabled=True,
                metrics_enabled=True,
                access_log_destination=apigw.LogGroupLogDestination(access_log_group),
                access_log_format=apigw.AccessLogFormat.json_with_standard_fields(
                    caller=False,
                    http_method=True,
                    ip=True,
                    protocol=True,
                    request_time=True,
                    resource_path=True,
                    response_length=True,
                    status=True,
                    user=True,
                ),
            ),
        )

        # Lambda Permissionを明示的に追加(API Gatewayからの呼び出しを許可)
        streaming_function.add_permission(
            "ApiGatewayInvokePermission",
            principal=iam.ServicePrincipal("apigateway.amazonaws.com"),
            action="lambda:InvokeFunction",
            source_arn=f"arn:aws:execute-api:{self.region}:{self.account}:{api.rest_api_id}/*/*",
        )

        # 出力
        CfnOutput(
            self,
            "BedrockStreamingApiUrl",
            description="API Gateway endpoint URL for Bedrock streaming",
            value=f"{api.url}ask",
        )
        
        CfnOutput(
            self,
            "AgentArn",
            description="Bedrock Agent ARN configured for this deployment",
            value=agent_arn if agent_arn else "Not configured",
        )


app = cdk.App()
BedrockStreamingStack(app, "BedrockStreamingStack")
app.synth()

特に注意するべきはopenapi_definitionで定義した設定情報内の
x-amazon-apigateway-integration.responseTransferModeSTREAMに設定する部分です。

マネジメントコンソール上でAPIGatewayを設定する場合は添付の画像のように選択できます。
Stream_マネコン画面.png

src/Dockerfile

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: MIT-0

FROM public.ecr.aws/lambda/python:3.13

# Lambda Web Adapterのインストール
COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.8.4 /lambda-adapter /opt/extensions/lambda-adapter

# 作業ディレクトリの設定
WORKDIR ${LAMBDA_TASK_ROOT}

# 依存関係のインストール
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

# アプリケーションコードのコピー
COPY main.py .

# 環境変数の設定
ENV AWS_LWA_INVOKE_MODE=response_stream
ENV AWS_LWA_PORT=8000
ENV AWS_LWA_READINESS_CHECK_PORT=8000
ENV AWS_LWA_READINESS_CHECK_PATH=/docs
ENV AWS_LWA_ASYNC_INIT=true

# FastAPIアプリケーションの起動
# Lambda Web Adapterがこのコマンドを実行します
ENTRYPOINT []
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--log-level", "info"]

requirements.txt

fastapi
uvicorn
boto3

src/main.py

FastAPIを用いたストリーミング方式を実現するために、yieldを用いて回答の生成ごとにレスポンスします。
またBedrock AgentCoreからのレスポンス内容を任意の形に変換したい場合は、extract_text_from_event_line関数の処理を変更してください。
ServerSentEvents(SSE)形式に対応させたい場合は、data: {"token": text}への変換が求められると思います。


from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
import asyncio
import boto3
import json,ast
import os
import  logging
import string
import random


# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)


app = FastAPI()
AGENT_ARN = os.environ.get("AGENT_ARN")


def generate_runtime_session_id(length: int = 34) -> str:
    """Generate a random runtime session ID with specified length (minimum 34 characters)"""
    if length < 34:
        length = 34
    characters = string.ascii_lowercase + string.ascii_uppercase + string.digits
    return ''.join(random.choice(characters) for _ in range(length))


def extract_text_from_event_line(line: str) -> str:
    """
    イベントストリームの1行からテキストデータを抽出する
    
    Args:
        line: イベントストリームの1行("data: "プレフィックスは除去済み)
    
    Returns:
        抽出されたテキスト。抽出できない場合は空文字列
    """
    try:
        # まずJSONとしてパースを試みる
        event_data = json.loads(line)
        
        # 辞書型の場合のみ処理
        if isinstance(event_data, dict):
            # パターン1: {"event": {"contentBlockDelta": ...}}
            if "event" in event_data:
                event_obj = event_data["event"]
                
                if "contentBlockDelta" in event_obj:
                    delta = event_obj["contentBlockDelta"].get("delta", {})
                    text = delta.get("text", "")
                    
                    if text:
                        logger.debug(f"Extracted text from contentBlockDelta: {text}")
                        return text
                        
                # messageStartやその他のイベントはログのみ
                elif "messageStart" in event_obj:
                    logger.debug(f"Message started: {event_obj['messageStart']}")
            
            # パターン2: {"data": "text", ...} (直接dataキーがある場合)
            elif "data" in event_data:
                text = event_data["data"]
                if text and isinstance(text, str):
                    logger.debug(f"Extracted text from data: {text}")
                    return text
        
        # 文字列の場合(Pythonオブジェクトの文字列表現)
        elif isinstance(event_data, str):
            # 文字列表現されたdictをパースしようとする
            try:
                parsed_data = ast.literal_eval(event_data)
                if isinstance(parsed_data, dict) and "data" in parsed_data:
                    text = parsed_data["data"]
                    if text and isinstance(text, str):
                        logger.debug(f"Extracted text from parsed data: {text}")
                        return text
            except (ValueError, SyntaxError):
                logger.debug(f"Could not parse string as dict: {event_data[:100]}")
                
    except json.JSONDecodeError:
        # JSONでない場合、Pythonオブジェクトの文字列表現として処理
        try:
            parsed_data = ast.literal_eval(line)
            if isinstance(parsed_data, dict) and "data" in parsed_data:
                text = parsed_data["data"]
                if text and isinstance(text, str):
                    logger.debug(f"Extracted text from literal_eval: {text}")
                    return text
        except (ValueError, SyntaxError):
            logger.warning(f"Could not parse line: {line[:100]}")
    
    return ""





def streamer(request: str):
    """
    Invoke Bedrock Agent and stream the response
    """
    bedrock_agent_client = boto3.client('bedrock-agentcore', region_name="ap-northeast-1")
    
    # Generate new random session ID for each request
    session_id = generate_runtime_session_id()

    # Prepare the payload
    payload = json.dumps({"prompt": request}).encode()

    
    logger.info("Invoking Agent: agentId=%s, sessionId=%s, inputText=%s", AGENT_ARN, session_id, request)
    

    try:
        response = bedrock_agent_client.invoke_agent_runtime(
            agentRuntimeArn=AGENT_ARN,
            runtimeSessionId=session_id,
            payload=payload,
            qualifier="DEFAULT"
        )
       
        event_stream = response.get('contentType', [])
        
        # Handle streaming response
        if "text/event-stream" in response.get("contentType", ""):
            for line in response["response"].iter_lines(chunk_size=10):
                if line:
                    line = line.decode("utf-8")
                    if line.startswith("data: "):
                        line = line[6:]
                        
                        # イベント行からテキストを抽出
                        text = extract_text_from_event_line(line)
                        if text:
                            yield text
                            
        else:
            # Get event stream from completion field
            event_stream = response.get('completion', [])
            logger.info("Event stream type: %s", type(event_stream))
        
        
                
    except Exception as e:
        error_msg = "Error invoking agent: " + str(e)
        logger.error(error_msg, exc_info=True)
        yield f"\n\nError: {error_msg}"


@app.get("/{request_path:path}")
async def catch_all(request: Request, request_path: str):
    # Catch-all route to handle all GET requests
    return


@app.post("/{request_path:path}")
async def index(request: Request):
    # Get the JSON payload from the POST body
    body = await request.body()
    payload = json.loads(body.decode("utf-8"))
    request_param = payload.get("message") #クライアント側がbodyに含めるkeyに合わせること

    return StreamingResponse(streamer(request_param))


デプロイ手順

  • 下記コマンドを実行して、AWS上にデプロイします。
cdk bootstrap
cdk deploy -c agent_arn={すでにデプロイ済みのAgentCore runtimeのARN}

実際のレスポンスの様子【再掲】

  • デプロイ先の/askあてにリクエストすること
  • ボディ内のmessageキーに生成AIへのinput内容を含ませること

に注意してください。

curl -X POST {CDKでデプロイされたAPIGatewayのエンドポイントURL}/{stage名}/ask -H "Content-Type: application/json" -d "{\"message\": \"AWS認定試験について詳しく教えてください。\"}"

streaming_demo_20251123_004.gif

感想

  • 当初は「ALB + ECSでストリーミングAPIを作る」ネタ仕込みをしていたのですが、APIGatewayにstreaming機能が追加されたということで、急遽APIGW + Lambdaに切り替えました。
  • Pythonで動くLambdaだとLambda Web Adapterの利用が必須ということで、現時点ではマネコンポチポチのみで本構成を実現することは難しそうです。(TypeScriptはこのへん恵まれていてちょっとうらやましい。。。)
  • 実務利用の際にはAPIGatewayへのWAFやレート制限,Cognitoオーソライザーの設定等も加えたうえで、ご活用のほどよろしくお願いします。

参考文献

9
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?