この記事について
よくあるLambdaは次の図のような動きをします。
リクエストを投げるとLambdaの処理が始まって、処理が完了すると結果が返ります。
この記事では、次の図のようなことをする方法を紹介します。
実行中のLambdaに対して、追加のデータを送信します。
利用する環境
Lambdaの実装言語: TypeScript
ランタイム: Node.js 18.x
何が嬉しいのか
双方向通信をするだけならAPI GatewayのWebSocketの裏側に置けば事足りるのですが、たまに便利なケースがあります。
たとえば、ライブラリやデータの読み込みに時間がかかる場合です。
ユーザーが入力する前にLambdaを立ち上げて、あらかじめライブラリを読み込ませておくことができます。
ユーザーの待ち時間を短縮できます。
実装の図
今回は関数URLとS3トリガーで実装します。
【処理の流れ】
-
Lambdaの関数URLを実行します。
Lambdaの中で、AWS IoT Coreのトピックをサブスクライブしておきます -
S3に追加データをアップロードします。
S3のファイルアップロードをトリガーにして、IoT Coreにパブリッシュします。 -
Lambdaでメッセージを受信します。
Lambdaが通知を受け取って、S3に置かれたペイロードを読み出します。
パフォーマンスを見てみる
実装の詳細を説明する前に、この構成で実行したときに、どのくらい時間がかかるのかを比較してみます。
パターンA: 直接実行する
Aの実装はLambdaを噛ませずに実装するパターンです。
クライアントからBedrockのHaikuにプロンプトを直接送ります。
Total: 2.66 sec
セッションの初期化時間: 0.34 sec
Bedrock実行から受信開始まで: 1.21 sec
Bedrock実行から受信終了まで: 1.40 sec
Bedrockアップロード時間: 1.11 sec
1.2秒でレスポンスストリーミングの応答の返却が始まって、1.4秒で応答が終わりました。
処理全体にかかった時間は2.6秒でした。
パターンB: 追加データを送って実行
Bの実装が今回の実装です。プロンプトを追加データとして送ります。
送信するデータはAと同じプロンプト、同じパラメータです。
Total: 4.47 sec
セッションの初期化時間: 0.19 sec
Bedrock実行から受信開始まで: 3.25 sec
Bedrock実行から受信終了まで: 3.58 sec
Bedrockアップロード時間: 1.81 sec
S3アップロード時間: 1.14 sec
S3アップロード開始から受信開始まで: 3.21 sec
S3アップロード開始から受信終了まで: 3.54 sec
1.8秒で応答の返却が始まって、応答しながらS3からの受信を待っています。
レスポンスストリーミングの応答が始まって、3.5秒で応答が終わりました。
処理全体にかかった時間は4.4秒でした。
Bedrockを直接呼ぶ場合に比べて2秒ほど余分に時間がかかっています。
このくらいの遅延が問題にならないなら次に進みます。
実装方法の説明
実装方法を順に説明します。
1. 事前準備: あらかじめAWS IoTのエンドポイントを確認しておく
まず、AWS IoTのエンドポイントを確認します。
コンソールでAWS CLIのコマンドを実行して、エンドポイントを調べます。
aws iot describe-endpoint --endpoint-type "iot:Data-ATS"
実行するとエンドポイントアドレスが返ってくるので、アドレスをメモしておきます。
{
"endpointAddress": "xxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com"
}
2. Lambda側で追加データの受信待ちを実装する
Lambda側で追加データの受信待ちを実装します。
依存ライブラリにaws-iot-device-sdk-v2
と@aws-sdk/client-s3
を追加しておきます。
npm install aws-iot-device-sdk-v2 @aws-sdk/client-s3
また、Lambdaの権限にIoTのサブスクライブとコネクションを追加しておきます。
S3の対象バケットへのアクセス権限もgrantReadで追加しておきます。
実装は以下のように書きます。
import { mqtt5, mqtt, iot } from "aws-iot-device-sdk-v2";
import { once } from "events";
function wait(milliSeconds: number) {
return new Promise((resolve) => setTimeout(resolve, milliSeconds));
}
class Connector {
private _waitingFlag: boolean;
constructor() {
this._waitingFlag = true;
}
async connect() {
// ライブラリを参照
const { Mqtt5Client, QoS } = mqtt5;
const { AwsIotMqtt5ClientConfigBuilder } = iot;
// 接続を初期化する
const client = new Mqtt5Client(
AwsIotMqtt5ClientConfigBuilder.newWebsocketMqttBuilderWithSigv4Auth(
"事前準備でメモしたAWS IoTのエンドポイントアドレス"
).build()
);
// S3にファイルが格納された。データを受信した
client.on("messageReceived", (message) => {
console.log("Message Received");
this._waitingFlag = false;
});
// 受信待ちを始める
const connectionSuccess = once(client, "connectionSuccess");
client.start();
await connectionSuccess;
const rejected = await client.subscribe({
subscriptions: [
{
topicFilter: /** 任意のトピック名。例: "/receive/data/#" */,
qos: QoS.AtLeastOnce,
},
],
});
while (this._waitingFlag) {
// 受信待ちをする
// ※setTimeoutで一時的にメインスレッドから離れないと、MQTTのメッセージを受信できない
await wait(100);
}
// 受信を完了、切断する
const stopped = once(client, "stopped");
client.stop();
await stopped;
client.close();
}
}
データを受信するとmessageReceived
が呼び出しを受けます。
import { mqtt5 } from "aws-iot-device-sdk-v2";
import { isArrayBuffer, isArrayBufferView } from "util/types";
/**
ペイロードの変換処理、messageReceivedから以下のように呼び出す
client.on("messageReceived", (message) => {
const receivedData = toPayload(message.message)
this._waitingFlag = false;
});
*/
function toPayload(packet: mqtt5.PublishPacket | undefined): Payload {
if (packet !== undefined) {
if (isArrayBuffer(packet.payload) || isArrayBufferView(packet.payload)) {
return {
topic: packet.topicName,
qos: packet.qos,
payload: new TextDecoder().decode(packet.payload),
};
}
if (typeof packet.payload == "string") {
return {
topic: packet.topicName,
qos: packet.qos,
payload: packet.payload,
};
}
}
return {
topic: "",
qos: mqtt5.QoS.AtLeastOnce,
payload: `${packet?.payload}`,
};
}
ペイロードを使ってS3のデータ取得処理を実行すれば、追加データを読み出すことができます。
import {
S3Client,
GetObjectCommand,
} from "@aws-sdk/client-s3";
/** S3のクライアント */
const s3Client = new S3Client();
/** 受信したオブジェクトを参照する */
const command = new GetObjectCommand({
Bucket: /** MQTTのペイロードから受け取ったバケット情報 */,
Key: /** MQTTのペイロードから受け取ったファイルのキー情報 */,
});
// 読み込みリクエストを実行
const result = await s3Client.send(command);
// 文字列として追加データを読み込む
const data = await result.Body?.transformToString();
3. S3トリガーの転送側を実装する
S3に設定する転送処理を実装します。
依存ライブラリに@aws-sdk/client-iot-data-plane
を追加しておきます。
また、転送処理のLambdaにもiotの実行権限を追加しておきます。
npm install @aws-sdk/client-iot-data-plane
送信側と受信側で使うライブラリが違う理由
送信はclient-iot-data-plane、受信はaws-iot-device-sdk-v2を使います。
client-iot-data-planeはAWS SDKのv3です。接続せずに送信することができるため、実装が簡単で、処理にかかる時間も短くなります。
ただ、受信と接続ができません。
実装は以下の通りです。
import {
IoTDataPlaneClient,
PublishCommand,
} from "@aws-sdk/client-iot-data-plane";
import { randomUUID } from "crypto";
export const handler = async (event: any, context: any) => {
const client = new IoTDataPlaneClient();
for (const item of event.Records) {
const bucketName = item.s3.bucket.name;
const objectKey = item.s3.object.key;
const input = {
topic: /** 任意のトピック名 例: "/receive/data/${id}" */.replace("${id}", randomUUID()),
qos: 1,
retain: false,
payload: new TextEncoder().encode(
JSON.stringify({
bucketName,
objectKey,
})
),
};
const command = new PublishCommand(input);
await client.send(command);
}
};
このLambdaを、S3トリガーに登録します。
bucket.addEventNotification(
cdk.aws_s3.EventType.OBJECT_CREATED,
new cdk.aws_s3_notifications.LambdaDestination(/** 上のLambda */)
);
ここまで実装すると、ユーザーがS3にファイルを配置することでトリガーが呼ばれて、実行中のLambdaにメッセージを飛ばすことができます。
他の実装
S3トリガーを挟まずに、クライアントから直接client-iot-data-plane
でデータを送信することもできます。
クライアントからLambdaに直接データが届くため、応答が早くなります。
ただ、S3のように署名付きURLで認証情報を受け渡すことができないので、クライアント側で必要になる権限は強くなります。
リポジトリ
今回のソースコードはこちらです
リポジトリは、BedrockのエンドポイントにLambdaを偽装して、Bedrockにプロンプトを遅延送信するソースコードになっています。