0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

API Gatewayレスポンスストリーミングを試してみた

Posted at

はじめに

先日API GatewayのREST APIがレスポンスストリーミングに対応しました。

上記ブログで、生成AIアプリケーションの回答をストリーミングでユーザーに返す実装例が紹介されています。
ブログではNode.jsが利用されていますが、下記リポジトリにPythonを使った実装例も載っています。

上記のPython実装例を参考にしつつ、API Gateway - Lambda (Python / Strands Agents) - Bedrock の構成をCDKで実装し、動作を見てみました。

これまでどうだったか

先述のAWSブログから図を引用します。

image.png

この構成要素におけるレスポンスストリーミングの対応状況は下記のとおりです。

  • API Gateway(REST API)は今回のアップデートでレスポンスストリーミングに初めて対応
  • Lambdaは 2023/4 からレスポンスストリーミングに対応
    • Node.jsのみネイティブに対応
    • その他の言語はカスタムランタイムまたはLambda Web Adapterを利用することで対応

Lambda は Node.js マネージドランタイムでのレスポンスストリーミングをサポートしています。その他の言語の場合は、カスタムランタイム API 統合を備えたカスタムランタイムを使用してレスポンスをストリーミングするか、Lambda Web Adapter を使用することができます。

Lambdaの関数URLを利用することで前段にAPI Gatewayが無くてもレスポンスストリーミングを実現することはできていました。関数URLを使うと構成がシンプルになりますが、API Gatewayがレスポンスストリーミングに対応したことで、例えばCognitoやWAFをネイティブに組み合わせて利用することができます。

下記ブログでは、サーバーレスなストリーミング対応生成AIアプリの実装パターンが紹介されています。今回のREST APIに関するアップデートについての記述はありませんが、関連情報としてリンクを載せておきます。

試してみた

11/24時点ではCDKのL2コンストラクトがストリーミングレスポンスに未対応であったため、L1コンストラクトで実装しました。(※AWS DevTools Hero後藤さんが PR を上げてくださっているので間も無くL2で書けるようになりそうです)

主要なコードを抜粋して記載します。

lambda\src\Dockerfile
lambda\src\Dockerfile
FROM public.ecr.aws/lambda/python:3.13

WORKDIR /var/task

COPY --from=public.ecr.aws/awsguru/aws-lambda-adapter:0.9.1 /lambda-adapter /opt/extensions/lambda-adapter

COPY requirements.txt ./

RUN python -m pip install -r requirements.txt

COPY *.py ./

ENTRYPOINT ["python", "-m", "uvicorn"]

CMD ["main:app", "--host", "0.0.0.0", "--port", "8080"]
lambda\src\main.py
lambda\src\main.py
import json
import os

import uvicorn
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from strands import Agent

app = FastAPI()

strands_agent = Agent(
    model="jp.anthropic.claude-haiku-4-5-20251001-v1:0",
    system_prompt="You are a helpful AI assistant.",
)


# Based on https://aws.amazon.com/blogs/opensource/introducing-strands-agents-1-0-production-ready-multi-agent-orchestration-made-simple/
async def streamer(request: str):
    async for event in strands_agent.stream_async(request):
        if "data" in event:
            yield event["data"]


@app.get("/{request_path:path}")
async def catch_all(request: Request, request_path: str):
    # Catch-all route to handle all GET requests
    return


@app.post("/{request_path:path}")
async def index(request: Request):
    # Get the JSON payload from the POST body
    body = await request.body()
    payload = json.loads(body.decode("utf-8"))
    request_param = payload.get("request")

    return StreamingResponse(streamer(request_param))


if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=int(os.environ.get("PORT", "8080")))
lambda\src\requirements.txt
lambda\src\requirements.txt
strands-agents
fastapi
uvicorn
lib/test-streaming-api-stack.ts
lib/test-streaming-api-stack.ts
import * as apigateway from 'aws-cdk-lib/aws-apigateway';
import * as iam from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as cdk from 'aws-cdk-lib/core';
import { Construct } from 'constructs';
import * as path from 'path';

export class TestStreamingApiStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props?: cdk.StackProps) {
    super(scope, id, props);

    // Lambda関数
    const fn = new lambda.DockerImageFunction(this, 'Function', {
      code: lambda.DockerImageCode.fromImageAsset(
        path.join(__dirname, '../lambda/src')
      ),
      memorySize: 512,
      timeout: cdk.Duration.minutes(15),
      environment: {
        AWS_LWA_INVOKE_MODE: 'response_stream',
      },
    });

    // Lambda実行ロールにBedrock権限を付与
    fn.role?.addToPrincipalPolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: [
          'bedrock:InvokeModelWithResponseStream',
          'bedrock:InvokeModel',
        ],
        resources: ['*'],
      })
    );

    // REST API
    const api = new apigateway.CfnRestApi(this, 'Api', {
      name: 'test-streaming-api',
      endpointConfiguration: {
        types: ['REGIONAL'],
      },
    });

    // ルートリソースIDを取得
    const rootResourceId = api.attrRootResourceId;

    // /streaming リソース
    const streamingResource = new apigateway.CfnResource(
      this,
      'StreamingResource',
      {
        restApiId: api.ref,
        parentId: rootResourceId,
        pathPart: 'streaming',
      }
    );

    // /streaming リソースのメソッド
    const streamingMethod = new apigateway.CfnMethod(this, 'StreamingMethod', {
      restApiId: api.ref,
      resourceId: streamingResource.ref,
      httpMethod: 'ANY',
      authorizationType: 'NONE',
      integration: {
        type: 'AWS_PROXY',
        integrationHttpMethod: 'POST',
        uri: `arn:aws:apigateway:${this.region}:lambda:path/2021-11-15/functions/${fn.functionArn}/response-streaming-invocations`,
        responseTransferMode: 'STREAM',
      },
    });

    // Lambda権限
    fn.addPermission('ApiGatewayInvoke', {
      principal: new iam.ServicePrincipal('apigateway.amazonaws.com'),
      sourceArn: `arn:aws:execute-api:${this.region}:${this.account}:${api.ref}/*/*`,
    });

    // デプロイメント
    const deployment = new apigateway.CfnDeployment(this, 'Deployment', {
      restApiId: api.ref,
    });
    deployment.addDependency(streamingMethod);

    const stageName = 'dev';

    // ステージ
    new apigateway.CfnStage(this, 'ApiStage', {
      restApiId: api.ref,
      deploymentId: deployment.ref,
      stageName,
    });
  }
}

CURLコマンドでアクセスしてみました。

cmd
curl -X POST %URL% -H "Content-Type: application/json" -d "{\"request\": \"Bedrockについてどう思う?\"}" --no-buffer

結果はこちらです。

sample_compressed.gif

ちゃんとストリーミングされました!
文字が小さいですが、雰囲気は伝わるでしょうか?

料金

ドキュメントに下記の記述があります。

Streaming enabled REST APIs are metered in 10MB response payload increments.

レスポンスサイズが10MBまでのものは1リクエスト、10MB以上の大きなサイズである場合は2以上のリクエストとしてカウントされるようです。大きなサイズのファイルを頻繁にやり取りするような環境では、コストに気を付けた方が良さそうです。

さいごに

今回は、Strands Agentsを使いたかったためPythonを利用してみましたが、Node.jsであれば引用ブログ内に書かれていたとおりもっと簡易に実装ができそうです。

弊社では一緒に働く仲間を募集中です!

現在、様々な職種を募集しております。
カジュアル面談も可能ですので、ご連絡お待ちしております!

募集内容等詳細は、是非採用サイトをご確認ください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?