Bedrockを使ったアプリをLambdaで動作させたい!
レスポンスはストリームで返したい!!
と思って調べたところ、結構条件があることがわかりました。
ストリーミングレスポンスを返却するための条件
ストリーミングレスポンスを返却するためにはいくつか条件があります。
Lambdaの実装方法の条件
Lambdaの実装方法が、以下の方法である必要があります。
Pythonなどのマネージドランタイムではストリーミングレスポンスがサポートされていませんので、注意が必要です。
Lambdaの呼び出し方の条件
呼び出し側にも条件があります。
- Function URLs
- InvokeWithResponseStream API
今回は、Node.jsのマネージドランタイム(ver.20)とFunction URLsの組み合わせで、ストリーム形式のレスポンスを返却することができましたので紹介します。(TypeScriptで記述しています)
ストリームじゃない基本パターン
まずはストリームじゃないLambdaを作ります。
Bedrock RuntimeのAWS SDKをインストールします。
npm add @aws-sdk/client-bedrock-runtime
関数のコードを作成します。
import { APIGatewayEvent, Context, Handler } from "aws-lambda";
import {
BedrockRuntimeClient,
InvokeModelCommand,
} from "@aws-sdk/client-bedrock-runtime";
const modelId = "anthropic.claude-3-haiku-20240307-v1:0";
// Create a new Bedrock Runtime client instance.
const client = new BedrockRuntimeClient({ region: "us-east-1" });
export const handler: Handler<object, object> = async (
event: APIGatewayEvent,
_context: Context
) => {
const body = JSON.parse(event.body);
const prompt = body.prompt;
// Prepare the payload for the model.
const payload = {
anthropic_version: "bedrock-2023-05-31",
max_tokens: 1000,
messages: [
{
role: "user",
content: [{ type: "text", text: prompt }],
},
],
};
// Invoke Claude with the payload and wait for the response.
const command = new InvokeModelCommand({
contentType: "application/json",
body: JSON.stringify(payload),
modelId,
});
const apiResponse = await client.send(command);
let responseBody = {};
if (apiResponse.body) {
responseBody = JSON.parse(new TextDecoder().decode(apiResponse.body));
}
return responseBody;
};
SAMのtemplate.yaml
です。認証なしのFunction URLsを有効にしています。
Transform: AWS::Serverless-2016-10-31
Resources:
Function1:
Type: AWS::Serverless::Function
Properties:
CodeUri: src/Function
Handler: index1.handler
Runtime: nodejs20.x
MemorySize: 512
Timeout: 30
Tracing: Active
FunctionUrlConfig:
AuthType: NONE
Policies:
- !GetAtt FunctionPolicy.PolicyArn
Metadata:
BuildMethod: esbuild
BuildProperties:
EntryPoints:
- index1.mts
External:
- '@aws-sdk/*'
- aws-sdk
Minify: false
Function1LogGroup:
Type: AWS::Logs::LogGroup
DeletionPolicy: Retain
Properties:
LogGroupName: !Sub /aws/lambda/${Function1}
FunctionPolicy:
Type: AWS::IAM::ManagedPolicy
Properties:
PolicyDocument:
Version: 2012-10-17
Statement:
- Effect: Allow
Action:
- bedrock:*
Resource:
- '*'
ビルドしてデプロイします
sam build
sam deploy --guided
呼び出します。
curl https://***.lambda-url.us-east-1.on.aws/ -H "Content-Type: application/json" -d '{"prompt": "こんにちは"}'
{
"role": "assistant",
"stop_sequence": null,
"usage": {
"output_tokens": 9,
"input_tokens": 12
},
"stop_reason": "end_turn",
"model": "claude-3-haiku-48k-20240307",
"id": "msg_017oooCp8bWLMeStcRt8tfhR",
"type": "message",
"content": [
{
"text": "こんにちは!",
"type": "text"
}
]
}
レスポンスは一度にまとめて返却されます。
Bedrockからはストリームでうけて、ストリームじゃないレスポンスを返す
InvokeModel
の代わりにInvokeModelWithResponse
を使い、Bedrockからストリーミングでレスポンスを受け取ります。
import {
BedrockRuntimeClient,
- InvokeModelCommand,
+ InvokeModelWithResponseStreamCommand,
} from "@aws-sdk/client-bedrock-runtime";
// Invoke Claude with the payload and wait for the response.
- const command = new InvokeModelCommand({
+ const command = new InvokeModelWithResponseStreamCommand({
contentType: "application/json",
body: JSON.stringify(payload),
modelId,
});
受けたレスポンスの処理方法が変わります。
apiResponse.body
が細切れのチャンクで取得できるので、取得のたびにchunk
として受け取ります。Bedrockが生成したトークン部分のみを抜き出して連結することで、completeMessage
を完成させます。
- let responseBody = "";
- if (apiResponse.body) {
- responseBody = JSON.parse(new TextDecoder().decode(apiResponse.body));
- }
+ let completeMessage = "";
+
+ // Decode and process the response stream
+ for await (const item of apiResponse.body) {
+ const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes));
+ const chunk_type = chunk.type;
+
+ if (chunk_type === "content_block_delta") {
+ const text = chunk.delta.text;
+ completeMessage = completeMessage + text;
+ }
+ }
JSONにして返却します。
- return responseBody;
+ return { content: completeMessage };
cURLで呼び出します。最終的にJSONを構築して返却しているので、Lambdaの呼び出し側からすると動作は変わりません。
curl https://***.lambda-url.us-east-1.on.aws/ -H "Content-Type: application/json" -d '{"prompt": "こんにちは"}'
{
"content": "はい、こんにちは。どのようなお手伝いができますでしょうか。私は皆様のお役に立てるよう、できる限りサポートさせていただきます。何か質問やご依頼がございましたら、どうぞお聞かせください。"
}
Bedrockから受けたストリームをストリームで返す
いよいよストリームで返却します。
まず、template.yaml
を修正します。Function URLsのパラメーターとしてInvokeMode
にRESPONSE_STREAM
を指定します。(未指定の場合はBUFFERED
が設定されます)
FunctionUrlConfig:
AuthType: NONE
+ InvokeMode: RESPONSE_STREAM
次にLambda関数のソースコードを修正します。
ハンドラー関数をawslambda.streamifyResponse()
デコレーターでラップします。
また、第二引数にresponseStream
パラメータが追加になります。
- export const handler: Handler<object, object> = async (
- event: APIGatewayEvent,
- _context: Context
- ) => {
+ export const handler: Handler<object, object> = awslambda.streamifyResponse(
+ async (event: APIGatewayEvent, responseStream, _context: Context) => {
...
- };
+ }
+ );
TypeScriptの型エラーが気になる場合、こちらを参考に以下のような型定義ファイルを作成しましょう
export class HttpResponseStream {
static from(underlyingStream: any, prelude: any): any;
}
declare global {
namespace awslambda {
function streamifyResponse(handler: any, options?: any): any;
let HttpResponseStream: HttpResponseStream;
}
}
responseStream.write()
メソッドを使い、チャンクを都度書き込みます。
最後にresponseStream.end()
でストリームを終了します。
return
は不要です。
- let completeMessage = "";
for await (const item of apiResponse.body) {
const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes));
const chunk_type = chunk.type;
if (chunk_type === "content_block_delta") {
const text = chunk.delta.text;
- completeMessage = completeMessage + text;
+ responseStream.write(text);
}
}
- return { content: completeMessage };
+ responseStream.end();
cURLで呼び出します。cURLにはバッファリング機能があるので、ストリーミングレスポンスを正しく確認するために、-N
(--no-buffer)オプションを付与します。
curl -N https://***.lambda-url.us-east-1.on.aws/ -H "Content-Type: application/json" -d '{"prompt": "こんにちは"}'
Bedrockが生成した文字だけを返却しているので、JSONではなくテキストになります。
こんにちは。どのようなお手伝いができますか?
(伝わらないと思いますが、)うまくストリームで返却されました。
LangChainもストリームレスポンス
Bedrockでストリーミングできたので、LangChainでもやってみましょう。
npm add langchain @langchain/community
import { APIGatewayEvent, Context, Handler } from "aws-lambda";
import { BedrockChat } from "@langchain/community/chat_models/bedrock";
import { StringOutputParser } from "@langchain/core/output_parsers";
const modelId = "anthropic.claude-3-haiku-20240307-v1:0";
export const handler: Handler<object, object> = awslambda.streamifyResponse(
async (event: APIGatewayEvent, responseStream, _context: Context) => {
const body = JSON.parse(event.body);
const prompt = body.prompt;
// Create a chat model.
const bedrockChat = new BedrockChat({
model: modelId,
region: "us-east-1",
streaming: true,
});
// Create a parser.
const parser = new StringOutputParser();
// Create a chain.
const chain = bedrockChat.pipe(parser);
// Stream the response.
const stream = await chain.stream(prompt);
for await (const chunk of stream) {
responseStream.write(chunk);
}
responseStream.end();
}
);
Bedrockよりも少しシンプルですね。StringOutputParserのおかげだと思います。
curl -N https://***.lambda-url.us-east-1.on.aws/ -H "Content-Type: application/json" -d '{"prompt": "こんにちは"}'
こんにちは。どのようなお話しができればよろしいでしょうか?私はあなたのお役に立てるよう、できる限りの知識と能力を発揮させていただきます。どうぞ、ご質問やご要望をお聞かせください。
LangChainの場合は、もうちょっとスマートな書き方ができます。
ストリームの終了を自動で行ってくれるpipeline()
を使用します。
+
+ import { Readable } from "stream";
+ import { pipeline } from "stream/promises";
- for await (const chunk of stream) {
- responseStream.write(chunk);
- }
-
- responseStream.end();
+ pipeline(Readable.from(stream), responseStream);
forループ部分も不要になってスッキリしますね。
import { APIGatewayEvent, Context, Handler } from "aws-lambda";
import { Readable } from "stream";
import { pipeline } from "stream/promises";
import { BedrockChat } from "@langchain/community/chat_models/bedrock";
import { StringOutputParser } from "@langchain/core/output_parsers";
const modelId = "anthropic.claude-3-haiku-20240307-v1:0";
export const handler: Handler<object, object> = awslambda.streamifyResponse(
async (event: APIGatewayEvent, responseStream, _context: Context) => {
const body = JSON.parse(event.body);
const prompt = body.prompt;
// Create a chat model.
const bedrockChat = new BedrockChat({
model: modelId,
region: "us-east-1",
streaming: true,
});
// Create a parser.
const parser = new StringOutputParser();
// Create a chain.
const chain = bedrockChat.pipe(parser);
// Stream the response.
const stream = await chain.stream(prompt);
// Pipe the stream to the response.
pipeline(Readable.from(stream), responseStream);
}
);
Transform
という機能を使うと、Bedrockの場合でも生成した文字列だけ抽出できるようですが、私の力ではうまくいきませんでした。😫