LoginSignup
8
4

Bedrockのストリーミングレスポンスをストリームのまま返却するLambdaの作り方(LangChainも対応)

Posted at

Bedrockを使ったアプリをLambdaで動作させたい!
レスポンスはストリームで返したい!!

と思って調べたところ、結構条件があることがわかりました。

ストリーミングレスポンスを返却するための条件

ストリーミングレスポンスを返却するためにはいくつか条件があります。

Lambdaの実装方法の条件

Lambdaの実装方法が、以下の方法である必要があります。

  • Node.jsのマネージドランタイム
  • カスタムランタイム(参考
  • Lambda Web Adapter(参考

Pythonなどのマネージドランタイムではストリーミングレスポンスがサポートされていませんので、注意が必要です。

Lambdaの呼び出し方の条件

呼び出し側にも条件があります。

  • Function URLs
  • InvokeWithResponseStream API

今回は、Node.jsのマネージドランタイム(ver.20)とFunction URLsの組み合わせで、ストリーム形式のレスポンスを返却することができましたので紹介します。(TypeScriptで記述しています)

ストリームじゃない基本パターン

まずはストリームじゃないLambdaを作ります。

Bedrock RuntimeのAWS SDKをインストールします。

npm add @aws-sdk/client-bedrock-runtime

関数のコードを作成します。

index.mts
import { APIGatewayEvent, Context, Handler } from "aws-lambda";

import {
  BedrockRuntimeClient,
  InvokeModelCommand,
} from "@aws-sdk/client-bedrock-runtime";

const modelId = "anthropic.claude-3-haiku-20240307-v1:0";
// Create a new Bedrock Runtime client instance.
const client = new BedrockRuntimeClient({ region: "us-east-1" });

export const handler: Handler<object, object> = async (
  event: APIGatewayEvent,
  _context: Context
) => {
  const body = JSON.parse(event.body);
  const prompt = body.prompt;

  // Prepare the payload for the model.
  const payload = {
    anthropic_version: "bedrock-2023-05-31",
    max_tokens: 1000,
    messages: [
      {
        role: "user",
        content: [{ type: "text", text: prompt }],
      },
    ],
  };

  // Invoke Claude with the payload and wait for the response.
  const command = new InvokeModelCommand({
    contentType: "application/json",
    body: JSON.stringify(payload),
    modelId,
  });
  const apiResponse = await client.send(command);

  let responseBody = {};
  if (apiResponse.body) {
    responseBody = JSON.parse(new TextDecoder().decode(apiResponse.body));
  }

  return responseBody;
};

SAMのtemplate.yamlです。認証なしのFunction URLsを有効にしています。

template.yaml
Transform: AWS::Serverless-2016-10-31
Resources:
  Function1:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: src/Function
      Handler: index1.handler
      Runtime: nodejs20.x
      MemorySize: 512
      Timeout: 30
      Tracing: Active
      FunctionUrlConfig:
        AuthType: NONE
      Policies:
        - !GetAtt FunctionPolicy.PolicyArn
    Metadata:
      BuildMethod: esbuild
      BuildProperties:
        EntryPoints:
          - index1.mts
        External:
          - '@aws-sdk/*'
          - aws-sdk
        Minify: false
  Function1LogGroup:
    Type: AWS::Logs::LogGroup
    DeletionPolicy: Retain
    Properties:
      LogGroupName: !Sub /aws/lambda/${Function1}
  FunctionPolicy:
    Type: AWS::IAM::ManagedPolicy
    Properties:
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Action:
              - bedrock:*
            Resource:
              - '*'

ビルドしてデプロイします

sam build
sam deploy --guided

呼び出します。

curl https://***.lambda-url.us-east-1.on.aws/ -H "Content-Type: application/json" -d '{"prompt": "こんにちは"}'
レスポンス(フォーマット済み)
{
    "role": "assistant",
    "stop_sequence": null,
    "usage": {
        "output_tokens": 9,
        "input_tokens": 12
    },
    "stop_reason": "end_turn",
    "model": "claude-3-haiku-48k-20240307",
    "id": "msg_017oooCp8bWLMeStcRt8tfhR",
    "type": "message",
    "content": [
        {
            "text": "こんにちは!",
            "type": "text"
        }
    ]
}

レスポンスは一度にまとめて返却されます。

Bedrockからはストリームでうけて、ストリームじゃないレスポンスを返す

InvokeModelの代わりにInvokeModelWithResponseを使い、Bedrockからストリーミングでレスポンスを受け取ります。

index.mts(一部抜粋)
  import {
    BedrockRuntimeClient,
-   InvokeModelCommand,
+   InvokeModelWithResponseStreamCommand,
  } from "@aws-sdk/client-bedrock-runtime";
index.mts(一部抜粋)
    // Invoke Claude with the payload and wait for the response.
-   const command = new InvokeModelCommand({
+   const command = new InvokeModelWithResponseStreamCommand({
      contentType: "application/json",
      body: JSON.stringify(payload),
      modelId,
    });

受けたレスポンスの処理方法が変わります。

apiResponse.bodyが細切れのチャンクで取得できるので、取得のたびにchunkとして受け取ります。Bedrockが生成したトークン部分のみを抜き出して連結することで、completeMessageを完成させます。

index.mts(一部抜粋)
-   let responseBody = "";
-   if (apiResponse.body) {
-     responseBody = JSON.parse(new TextDecoder().decode(apiResponse.body));
-   }
+   let completeMessage = "";
+ 
+   // Decode and process the response stream
+   for await (const item of apiResponse.body) {
+     const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes));
+     const chunk_type = chunk.type;
+ 
+     if (chunk_type === "content_block_delta") {
+       const text = chunk.delta.text;
+       completeMessage = completeMessage + text;
+     }
+   }

JSONにして返却します。

index.mts(一部抜粋)
-   return responseBody;
+   return { content: completeMessage };

cURLで呼び出します。最終的にJSONを構築して返却しているので、Lambdaの呼び出し側からすると動作は変わりません。

curl https://***.lambda-url.us-east-1.on.aws/ -H "Content-Type: application/json" -d '{"prompt": "こんにちは"}'
{
    "content": "はい、こんにちは。どのようなお手伝いができますでしょうか。私は皆様のお役に立てるよう、できる限りサポートさせていただきます。何か質問やご依頼がございましたら、どうぞお聞かせください。"
}

Bedrockから受けたストリームをストリームで返す

いよいよストリームで返却します。

まず、template.yamlを修正します。Function URLsのパラメーターとしてInvokeModeRESPONSE_STREAMを指定します。(未指定の場合はBUFFEREDが設定されます)

template.yaml(一部抜粋)
      FunctionUrlConfig:
        AuthType: NONE
+       InvokeMode: RESPONSE_STREAM

次にLambda関数のソースコードを修正します。

ハンドラー関数をawslambda.streamifyResponse()デコレーターでラップします。
また、第二引数にresponseStreamパラメータが追加になります。

index.mts(一部抜粋)
- export const handler: Handler<object, object> = async (
-   event: APIGatewayEvent,
-   _context: Context
- ) => {
+ export const handler: Handler<object, object> = awslambda.streamifyResponse(
+   async (event: APIGatewayEvent, responseStream, _context: Context) => {

...

- };
+   }
+ );

TypeScriptの型エラーが気になる場合、こちらを参考に以下のような型定義ファイルを作成しましょう

export class HttpResponseStream {
    static from(underlyingStream: any, prelude: any): any;
}

declare global {
    namespace awslambda {
        function streamifyResponse(handler: any, options?: any): any;
        let HttpResponseStream: HttpResponseStream;
    }
}

responseStream.write()メソッドを使い、チャンクを都度書き込みます。
最後にresponseStream.end()でストリームを終了します。
returnは不要です。

index.mts(一部抜粋)
- let completeMessage = "";

  for await (const item of apiResponse.body) {
    const chunk = JSON.parse(new TextDecoder().decode(item.chunk.bytes));
    const chunk_type = chunk.type;

    if (chunk_type === "content_block_delta") {
      const text = chunk.delta.text;
-     completeMessage = completeMessage + text;
+     responseStream.write(text);
    }
  }

- return { content: completeMessage };
+ responseStream.end();

cURLで呼び出します。cURLにはバッファリング機能があるので、ストリーミングレスポンスを正しく確認するために、-N(--no-buffer)オプションを付与します。

curl -N https://***.lambda-url.us-east-1.on.aws/ -H "Content-Type: application/json" -d '{"prompt": "こんにちは"}'

Bedrockが生成した文字だけを返却しているので、JSONではなくテキストになります。

こんにちは。どのようなお手伝いができますか?

(伝わらないと思いますが、)うまくストリームで返却されました。

LangChainもストリームレスポンス

Bedrockでストリーミングできたので、LangChainでもやってみましょう。

npm add langchain @langchain/community
index_langchain.mts
import { APIGatewayEvent, Context, Handler } from "aws-lambda";

import { BedrockChat } from "@langchain/community/chat_models/bedrock";
import { StringOutputParser } from "@langchain/core/output_parsers";

const modelId = "anthropic.claude-3-haiku-20240307-v1:0";

export const handler: Handler<object, object> = awslambda.streamifyResponse(
  async (event: APIGatewayEvent, responseStream, _context: Context) => {
    const body = JSON.parse(event.body);
    const prompt = body.prompt;

    // Create a chat model.
    const bedrockChat = new BedrockChat({
      model: modelId,
      region: "us-east-1",
      streaming: true,
    });

    // Create a parser.
    const parser = new StringOutputParser();

    // Create a chain.
    const chain = bedrockChat.pipe(parser);

    // Stream the response.
    const stream = await chain.stream(prompt);

    for await (const chunk of stream) {
      responseStream.write(chunk);
    }

    responseStream.end();
  }
);

Bedrockよりも少しシンプルですね。StringOutputParserのおかげだと思います。

curl -N https://***.lambda-url.us-east-1.on.aws/ -H "Content-Type: application/json" -d '{"prompt": "こんにちは"}'
こんにちは。どのようなお話しができればよろしいでしょうか?私はあなたのお役に立てるよう、できる限りの知識と能力を発揮させていただきます。どうぞ、ご質問やご要望をお聞かせください。

LangChainの場合は、もうちょっとスマートな書き方ができます。

ストリームの終了を自動で行ってくれるpipeline()を使用します。

index_langchain.mts(一部抜粋)
+
+ import { Readable } from "stream";
+ import { pipeline } from "stream/promises";
index_langchain.mts(一部抜粋)
-    for await (const chunk of stream) {
-      responseStream.write(chunk);
-    }
-
-    responseStream.end();
+    pipeline(Readable.from(stream), responseStream);

forループ部分も不要になってスッキリしますね。

index_langchain.mts
import { APIGatewayEvent, Context, Handler } from "aws-lambda";

import { Readable } from "stream";
import { pipeline } from "stream/promises";

import { BedrockChat } from "@langchain/community/chat_models/bedrock";
import { StringOutputParser } from "@langchain/core/output_parsers";

const modelId = "anthropic.claude-3-haiku-20240307-v1:0";

export const handler: Handler<object, object> = awslambda.streamifyResponse(
  async (event: APIGatewayEvent, responseStream, _context: Context) => {
    const body = JSON.parse(event.body);
    const prompt = body.prompt;

    // Create a chat model.
    const bedrockChat = new BedrockChat({
      model: modelId,
      region: "us-east-1",
      streaming: true,
    });

    // Create a parser.
    const parser = new StringOutputParser();

    // Create a chain.
    const chain = bedrockChat.pipe(parser);

    // Stream the response.
    const stream = await chain.stream(prompt);

    // Pipe the stream to the response.
    pipeline(Readable.from(stream), responseStream);
  }
);

Transformという機能を使うと、Bedrockの場合でも生成した文字列だけ抽出できるようですが、私の力ではうまくいきませんでした。😫

8
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
4