0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Lambdaの実行中に、追加データを送ってみる

Last updated at Posted at 2024-07-20

この記事について

よくあるLambdaは次の図のような動きをします。
リクエストを投げるとLambdaの処理が始まって、処理が完了すると結果が返ります。

buffered-response.png

この記事では、次の図のようなことをする方法を紹介します。
実行中のLambdaに対して、追加のデータを送信します。

buffered-response-2.png

利用する環境

Lambdaの実装言語: TypeScript
ランタイム: Node.js 18.x

何が嬉しいのか

双方向通信をするだけならAPI GatewayのWebSocketの裏側に置けば事足りるのですが、たまに便利なケースがあります。

たとえば、ライブラリやデータの読み込みに時間がかかる場合です。

case-rag1.png

ユーザーが入力する前にLambdaを立ち上げて、あらかじめライブラリを読み込ませておくことができます。

case-rag copy 2.png

ユーザーの待ち時間を短縮できます。

実装の図

今回は関数URLとS3トリガーで実装します。

structb.png

【処理の流れ】

  1. Lambdaの関数URLを実行します。
    Lambdaの中で、AWS IoT Coreのトピックをサブスクライブしておきます
  2. S3に追加データをアップロードします。
    S3のファイルアップロードをトリガーにして、IoT Coreにパブリッシュします。
  3. Lambdaでメッセージを受信します。
    Lambdaが通知を受け取って、S3に置かれたペイロードを読み出します。

パフォーマンスを見てみる

実装の詳細を説明する前に、この構成で実行したときに、どのくらい時間がかかるのかを比較してみます。

パターンA: 直接実行する

Aの実装はLambdaを噛ませずに実装するパターンです。
クライアントからBedrockのHaikuにプロンプトを直接送ります。

pattern-a.png

Bedrockを直接実行
Total: 2.66 sec
    セッションの初期化時間: 0.34 sec
    Bedrock実行から受信開始まで: 1.21 sec
    Bedrock実行から受信終了まで: 1.40 sec
    Bedrockアップロード時間: 1.11 sec

1.2秒でレスポンスストリーミングの応答の返却が始まって、1.4秒で応答が終わりました。
処理全体にかかった時間は2.6秒でした。

パターンB: 追加データを送って実行

Bの実装が今回の実装です。プロンプトを追加データとして送ります。
送信するデータはAと同じプロンプト、同じパラメータです。

pattern-b.png

今回の構成で実行
Total: 4.47 sec
    セッションの初期化時間: 0.19 sec
    Bedrock実行から受信開始まで: 3.25 sec
    Bedrock実行から受信終了まで: 3.58 sec
    Bedrockアップロード時間: 1.81 sec
    S3アップロード時間: 1.14 sec
    S3アップロード開始から受信開始まで: 3.21 sec
    S3アップロード開始から受信終了まで: 3.54 sec

1.8秒で応答の返却が始まって、応答しながらS3からの受信を待っています。
レスポンスストリーミングの応答が始まって、3.5秒で応答が終わりました。
処理全体にかかった時間は4.4秒でした。

Bedrockを直接呼ぶ場合に比べて2秒ほど余分に時間がかかっています。
このくらいの遅延が問題にならないなら次に進みます。

実装方法の説明

実装方法を順に説明します。

1. 事前準備: あらかじめAWS IoTのエンドポイントを確認しておく

まず、AWS IoTのエンドポイントを確認します。
コンソールでAWS CLIのコマンドを実行して、エンドポイントを調べます。

エンドポイントを取るリクエスト
aws iot describe-endpoint --endpoint-type "iot:Data-ATS"

実行するとエンドポイントアドレスが返ってくるので、アドレスをメモしておきます。

実行結果
{
    "endpointAddress": "xxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
}

2. Lambda側で追加データの受信待ちを実装する

Lambda側で追加データの受信待ちを実装します。
依存ライブラリにaws-iot-device-sdk-v2@aws-sdk/client-s3を追加しておきます。

npm install aws-iot-device-sdk-v2 @aws-sdk/client-s3

また、Lambdaの権限にIoTのサブスクライブとコネクションを追加しておきます。
S3の対象バケットへのアクセス権限もgrantReadで追加しておきます。

実装は以下のように書きます。

追加データの受信
import { mqtt5, mqtt, iot } from "aws-iot-device-sdk-v2";
import { once } from "events";

function wait(milliSeconds: number) {
  return new Promise((resolve) => setTimeout(resolve, milliSeconds));
}

class Connector {
    private _waitingFlag: boolean;

    constructor() {
        this._waitingFlag = true;
    }
    
    async connect() {
        // ライブラリを参照
        const { Mqtt5Client, QoS } = mqtt5;
        const { AwsIotMqtt5ClientConfigBuilder } = iot;

        // 接続を初期化する
        const client = new Mqtt5Client(
          AwsIotMqtt5ClientConfigBuilder.newWebsocketMqttBuilderWithSigv4Auth(
            "事前準備でメモしたAWS IoTのエンドポイントアドレス"
          ).build()
        );
    
        // S3にファイルが格納された。データを受信した
        client.on("messageReceived", (message) => {
          console.log("Message Received");
          this._waitingFlag = false;
        });
    
        // 受信待ちを始める
        const connectionSuccess = once(client, "connectionSuccess");
        client.start();
        await connectionSuccess;
        
        const rejected = await client.subscribe({
          subscriptions: [
            {
              topicFilter: /** 任意のトピック名。例: "/receive/data/#" */,
              qos: QoS.AtLeastOnce,
            },
          ],
        });
    
        while (this._waitingFlag) {
          // 受信待ちをする
          // ※setTimeoutで一時的にメインスレッドから離れないと、MQTTのメッセージを受信できない
          await wait(100);
        }
    
        // 受信を完了、切断する
        const stopped = once(client, "stopped");
        client.stop();
        await stopped;
        client.close();
    }
}

データを受信するとmessageReceivedが呼び出しを受けます。

ペイロードの読み込み処理
import { mqtt5 } from "aws-iot-device-sdk-v2";
import { isArrayBuffer, isArrayBufferView } from "util/types";

/** 
    ペイロードの変換処理、messageReceivedから以下のように呼び出す
    
    client.on("messageReceived", (message) => {
      const receivedData = toPayload(message.message)
      this._waitingFlag = false;
    });
*/
function toPayload(packet: mqtt5.PublishPacket | undefined): Payload {
  if (packet !== undefined) {
    if (isArrayBuffer(packet.payload) || isArrayBufferView(packet.payload)) {
      return {
        topic: packet.topicName,
        qos: packet.qos,
        payload: new TextDecoder().decode(packet.payload),
      };
    }
    if (typeof packet.payload == "string") {
      return {
        topic: packet.topicName,
        qos: packet.qos,
        payload: packet.payload,
      };
    }
  }
  return {
    topic: "",
    qos: mqtt5.QoS.AtLeastOnce,
    payload: `${packet?.payload}`,
  };
}

ペイロードを使ってS3のデータ取得処理を実行すれば、追加データを読み出すことができます。

S3からの読み込み部分
import {
  S3Client,
  GetObjectCommand,
} from "@aws-sdk/client-s3";

/** S3のクライアント */
const s3Client = new S3Client();

/** 受信したオブジェクトを参照する */
const command = new GetObjectCommand({
  Bucket: /** MQTTのペイロードから受け取ったバケット情報 */,
  Key: /** MQTTのペイロードから受け取ったファイルのキー情報 */,
});
// 読み込みリクエストを実行
const result = await s3Client.send(command);
// 文字列として追加データを読み込む
const data = await result.Body?.transformToString();

3. S3トリガーの転送側を実装する

S3に設定する転送処理を実装します。
依存ライブラリに@aws-sdk/client-iot-data-planeを追加しておきます。
また、転送処理のLambdaにもiotの実行権限を追加しておきます。

npm install @aws-sdk/client-iot-data-plane

送信側と受信側で使うライブラリが違う理由
送信はclient-iot-data-plane、受信はaws-iot-device-sdk-v2を使います。

client-iot-data-planeはAWS SDKのv3です。接続せずに送信することができるため、実装が簡単で、処理にかかる時間も短くなります。
ただ、受信と接続ができません。

実装は以下の通りです。

転送処理の実装
import {
  IoTDataPlaneClient,
  PublishCommand,
} from "@aws-sdk/client-iot-data-plane";
import { randomUUID } from "crypto";

export const handler = async (event: any, context: any) => {
  const client = new IoTDataPlaneClient();

  for (const item of event.Records) {
    const bucketName = item.s3.bucket.name;
    const objectKey = item.s3.object.key;
    const input = {
      topic: /** 任意のトピック名 例: "/receive/data/${id}" */.replace("${id}", randomUUID()),
      qos: 1,
      retain: false,
      payload: new TextEncoder().encode(
        JSON.stringify({
          bucketName,
          objectKey,
        })
      ),
    };
    const command = new PublishCommand(input);
    await client.send(command);
  }
};

このLambdaを、S3トリガーに登録します。

CDKでバケットにトリガーを登録
bucket.addEventNotification(
  cdk.aws_s3.EventType.OBJECT_CREATED,
  new cdk.aws_s3_notifications.LambdaDestination(/** 上のLambda */)
);

ここまで実装すると、ユーザーがS3にファイルを配置することでトリガーが呼ばれて、実行中のLambdaにメッセージを飛ばすことができます。

他の実装

S3トリガーを挟まずに、クライアントから直接client-iot-data-planeでデータを送信することもできます。

クライアントからLambdaに直接データが届くため、応答が早くなります。

ただ、S3のように署名付きURLで認証情報を受け渡すことができないので、クライアント側で必要になる権限は強くなります。

リポジトリ

今回のソースコードはこちらです

リポジトリは、BedrockのエンドポイントにLambdaを偽装して、Bedrockにプロンプトを遅延送信するソースコードになっています。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?