【AWS】BedrockのストリーミングレスポンスをAPI Gatewayで試してみる
ChatGPTなど多くのLLMチャットアプリではモデルからの応答がチャンクごとに区切られて次々出てきます。
Amazon BedrockでもInvokeModelWithResponseStreamというAPIが用意されており、
同様のことを実現することができます。
今回はAPI Gataway + Lambda + Bedrockの構成で
API GatewayのエンドポイントからストリーミングレスポンスのAPIを呼び出せるようにしてみました。
(ググってもあまり情報が出てこなかったので備忘です)
構成図
今回検証を行った環境の構成は以下の通りです。
Lambda関数からの複数の応答(チャンク毎に区切られたモデルからの応答)を連続的に返すことができるようにするため、API GatewayではWebSocket APIを作成しています。
SendTextHandlerがBedrockに対してInvokeModelWithResponseStreamを投げるためのLambda関数で、
API Gatewayはsendtext
というルートに対してリクエストが来た際に、SendTextHandlerを呼び出すように設定します。
その他のHandler(ConnectHandler, DisconnectHandler, DefaultHandler)はAPI Gatewayの事前定義されたルート($connect
, $disconnect
, $default
)に対応しています。
API GatewayでWebSocket APIを作成する際はAWS公式の以下のチュートリアルが参考になります。
実装
以下のCloudformationのテンプレートをデプロイします。
iam.yml
がLambda関数に適用する各種IAM RoleおよびPolicyを定義したテンプレート、
main.yml
がLambda関数やAPI Gatewayをデプロイするためのテンプレートです。
(Bedrockについては使いたいモデルを事前に有効化しておきます。)
SendConnectHandlerに記述するpythonのコードは以下の通りです。
invoke_model_with_response_streamの応答(response
)を
チャンク毎に取り出して、WebSocketAPIのコネクションに毎回ポストするようにfor文を記述しています。
import boto3
import json
import os
API_ENDPOINT = os.environ["API_ENDPOINT"]
STAGE = os.environ["STAGE"]
MODEL_ID = os.environ["MODEL_ID"]
def lambda_handler(event,context):
brt = boto3.client(service_name='bedrock-runtime')
apigw_management = boto3.client('apigatewaymanagementapi', endpoint_url=f"{API_ENDPOINT}/{STAGE}")
connectionId = event.get('requestContext', {}).get('connectionId')
body_content = json.loads(event.get('body', {}))
text = body_content.get('text')
body = json.dumps({
"anthropic_version": "bedrock-2023-05-31",
"messages": [
{
"role": "user",
"content": [
{
"type": "text",
"text": f"{text}"
}
]
}
],
"max_tokens": 1024
})
modelId = MODEL_ID
accept = 'application/json'
contentType = 'application/json'
try:
response = brt.invoke_model_with_response_stream(body=body, modelId=modelId, accept=accept, contentType=contentType)
except Exception as e:
return {
"statusCode": 500,
"body": f"{e}"
}
for event in response.get("body"):
chunk = json.loads(event["chunk"]["bytes"])
if chunk['type'] == 'content_block_delta' and chunk['delta']['type'] == 'text_delta':
print(chunk['delta']['text'])
try:
apigw_management.post_to_connection(ConnectionId=connectionId, Data=json.dumps(chunk['delta']['text']))
except Exception as e:
return {
"statusCode": 500,
"body": f"{e}"
}
return {
"statusCode": 200
}
使ってみる
main.ymlからスタックを作成すると、以下の様にWebSocketAPIのエンドポイントが出力されます。
wscatを使ってWebSocket APIに接続してみましょう。
% wscat -c wss://xxxxxxxx.execute-api.ap-northeast-1.amazonaws.com/dev/
Connected (press CTRL+C to quit)
>
以下の形式でメッセージを送ります。
{"action": "sendtext", "text": "<入力テキスト>"}
実行例
> {"action": "sendtext", "text": "Hello, AI! What you can do?"}
< "Hello"
< "! I'm"
< " an AI"
< " assistant create"
< "d by Anthropic"
< " to"
< " be helpful, harm"
< "less, and honest"
< ". I can help"
< " with all sorts of"
< " tasks like:"
< "\n\n\u2022"
< " Answering questions on"
< " a wide range of"
< " topics\n\u2022"
< " Helping with analysis"
< " and research "
< "\n\u2022"
< " Pr"
< "oofreading an"
< "d editing text\n\u2022"
< " Math"
< " and coding help"
< "\n\u2022 Creative writing"
< " and bra"
< "instorming"
< "\n\u2022 Task"
< " planning"
< " and organization\n\nAn"
< "d much more! Let"
< " me know if there"
< "'s anything specific"
< " I can assist
テキストではわかりにくいですが、応答が細切れになって次々と返ってきていることが確認できます。
まとめ
今回はAPI Gataway + Lambda + Bedrockの構成で
API GatewayのエンドポイントからストリーミングレスポンスのAPIを呼び出せるようにしてみました。
実装する上での躓きポイントは色々あったのですが、今回はやってみた程度で
ある程度整理できたら追記しようと思います。