1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【AWS】BedrockのストリーミングレスポンスをAPI Gatewayで試してみる

Posted at

【AWS】BedrockのストリーミングレスポンスをAPI Gatewayで試してみる

ChatGPTなど多くのLLMチャットアプリではモデルからの応答がチャンクごとに区切られて次々出てきます。

Amazon BedrockでもInvokeModelWithResponseStreamというAPIが用意されており、
同様のことを実現することができます。

今回はAPI Gataway + Lambda + Bedrockの構成で
API GatewayのエンドポイントからストリーミングレスポンスのAPIを呼び出せるようにしてみました。
(ググってもあまり情報が出てこなかったので備忘です)

構成図

今回検証を行った環境の構成は以下の通りです。

ws-bedrock.drawio.png

Lambda関数からの複数の応答(チャンク毎に区切られたモデルからの応答)を連続的に返すことができるようにするため、API GatewayではWebSocket APIを作成しています。

SendTextHandlerがBedrockに対してInvokeModelWithResponseStreamを投げるためのLambda関数で、
API Gatewayはsendtextというルートに対してリクエストが来た際に、SendTextHandlerを呼び出すように設定します。

その他のHandler(ConnectHandler, DisconnectHandler, DefaultHandler)はAPI Gatewayの事前定義されたルート($connect, $disconnect, $default)に対応しています。

API GatewayでWebSocket APIを作成する際はAWS公式の以下のチュートリアルが参考になります。

実装

以下のCloudformationのテンプレートをデプロイします。

iam.ymlがLambda関数に適用する各種IAM RoleおよびPolicyを定義したテンプレート、
main.ymlがLambda関数やAPI Gatewayをデプロイするためのテンプレートです。
(Bedrockについては使いたいモデルを事前に有効化しておきます。)

SendConnectHandlerに記述するpythonのコードは以下の通りです。

invoke_model_with_response_streamの応答(response)を
チャンク毎に取り出して、WebSocketAPIのコネクションに毎回ポストするようにfor文を記述しています。

import boto3
import json
import os

API_ENDPOINT = os.environ["API_ENDPOINT"]
STAGE = os.environ["STAGE"]
MODEL_ID = os.environ["MODEL_ID"]

def lambda_handler(event,context):
    brt = boto3.client(service_name='bedrock-runtime')
    apigw_management = boto3.client('apigatewaymanagementapi', endpoint_url=f"{API_ENDPOINT}/{STAGE}")

    connectionId = event.get('requestContext', {}).get('connectionId')

    body_content = json.loads(event.get('body', {}))
    text = body_content.get('text')
    body = json.dumps({
        "anthropic_version": "bedrock-2023-05-31", 
        "messages": [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": f"{text}"
                    }
                ]
            }
        ],
        "max_tokens": 1024
    })
    
    modelId = MODEL_ID
    accept = 'application/json'
    contentType = 'application/json'

    try:
        response = brt.invoke_model_with_response_stream(body=body, modelId=modelId, accept=accept, contentType=contentType)
    except Exception as e:
        return {
            "statusCode": 500,
            "body": f"{e}"
        }

    for event in response.get("body"):
        chunk = json.loads(event["chunk"]["bytes"])
        if chunk['type'] == 'content_block_delta' and chunk['delta']['type'] == 'text_delta':
            print(chunk['delta']['text'])
            try:
                apigw_management.post_to_connection(ConnectionId=connectionId, Data=json.dumps(chunk['delta']['text']))
            except Exception as e:
                return {
                    "statusCode": 500,
                    "body": f"{e}"
                }

    return {
        "statusCode": 200
    }

使ってみる

main.ymlからスタックを作成すると、以下の様にWebSocketAPIのエンドポイントが出力されます。

スクリーンショット 2024-11-20 22.16.01.png

wscatを使ってWebSocket APIに接続してみましょう。

% wscat -c wss://xxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/
Connected (press CTRL+C to quit)
>

以下の形式でメッセージを送ります。

{"action": "sendtext", "text": "<入力テキスト>"}

実行例

> {"action": "sendtext", "text": "Hello, AI! What you can do?"}
< "Hello"
< "! I'm"
< " an AI"
< " assistant create"
< "d by Anthropic"
< " to"
< " be helpful, harm"
< "less, and honest"
< ". I can help"
< " with all sorts of"
< " tasks like:"
< "\n\n\u2022"
< " Answering questions on"
< " a wide range of"
< " topics\n\u2022"
< " Helping with analysis"
< " and research "
< "\n\u2022"
< " Pr"
< "oofreading an"
< "d editing text\n\u2022"
< " Math"
< " and coding help"
< "\n\u2022 Creative writing"
< " and bra"
< "instorming"
< "\n\u2022 Task"
< " planning"
< " and organization\n\nAn"
< "d much more! Let"
< " me know if there"
< "'s anything specific"
< " I can assist 

テキストではわかりにくいですが、応答が細切れになって次々と返ってきていることが確認できます。

まとめ

今回はAPI Gataway + Lambda + Bedrockの構成で
API GatewayのエンドポイントからストリーミングレスポンスのAPIを呼び出せるようにしてみました。

実装する上での躓きポイントは色々あったのですが、今回はやってみた程度で
ある程度整理できたら追記しようと思います。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?