LoginSignup
3
2

LambdaレスポンスストリーミングとAWS-SDKを使ってSlackに進捗バーを表示させる

Last updated at Posted at 2023-12-01

挨拶

こんにちは、ソーシャル経済メディア「NewsPicks」のSREチームの中川です。

この記事は NewsPicks アドベントカレンダー 2023 の2日目の記事です。

概要

本記事の内容としては以下の通りです。

  • 今年発表されたばかりのLambdaレスポンスストリーミングを試した。
  • 上記機能とAWS-SDK for Javascript V3、 Lambda (Node.js) を使ってリアルタイムに情報取得できた。
  • その情報をSlackに投稿して進捗バーを表示させた。

システム構成は以下の通りです。

image.png

そして成果物としてはこちら。
Lambdaレスポンスストリーミングを使ってサーバ起動台数をリアルタイムでSlackに投稿(メッセージ書き換え)してます。

blog_thumnail2.gif

経緯

NewsPicks ではニュース速報としてプッシュ通知を1日に何回か打ちます。
当然その直後はアクセス数が増加しますので事前にサーバ増強しておく必要があります。

サーバ増強コマンドは Slack から打てるようになっておりますが、プロビジョニング完了までには数分かかります。

現状としてはその数分間、増強完了レスポンスが来るまでひたすら待ち続けなければならず、1秒でも早く通知を打ちたい編成チームにとっては大きなストレスとなっていました。

そこで、リアルタイムに進捗表示を入れることにしました。

これにより、編成チームが「これはそこまで大きなニュースじゃなさそうだな...」と判断すれば増強途中でも通知を入れられるようになります。
加えて、エンジニアではない同チームが「本当にサーバ増えてるのかな...?」という不安を打ち消すことにも繋がります。

サーバ増強の流れ

ではどうやって実現するかですが、その前に現状のサーバ増強がどうやって行われるか説明しておきます。

  1. Slackで増強用コマンド打つ。
  2. Slack用ライブラリ(Bolt)を仕込んだ Lambda が起動する。
  3. 上記 Lambda から、サーバ増強用のAPIを含んだ Lambda が呼ばれる。
  4. 増強終わり次第後者の Lambda が終了すると、前者 Lambda がSlackに増強終了の旨投稿する。

という感じです。

呼び出し側のLambdaではSlackとの疎通のみ、呼び出される側はサーバ増強のみ、と機能がわかれていますのでここは壊したくありません。

実装

チームメンバーとの議論で「最近発表されたばかりのLambdaレスポンスストリーミングを使ったら面白そうじゃない?」という話になり調べてみることにしました。

ググってみると、関数URL(HTTPエンドポイント)を有効にしてレスポンスタイプを BUFFRED から RESPONSE_STREAM に変えて試している方々はいらっしゃるようでした。

ただ今回は Lambda(Node.js Runtime, AWS-SDK for Javascript 使用)を使っている中でレスポンスストリーミング対応させたいというのが要件です。

ドキュメントを見たら AWS SDK for JavaScript v3 はサポートされてるのですが今使っているSDKはバージョンが古いのでアップデート必要そうというのが分かりました。
また、Node.js 14以上というのはクリアしてるので何とかなりそうです。

レスポンスストリーミングは、AWS SDK for Java 2.x, AWS SDK for JavaScript v3, AWS SDK for Go version 1 および version 2 でサポートされています。

SDKバージョンアップだけで済むのは良かったのですが、サンプルコードが少なすぎて実際にどう対応させるかは手探りでやっていくしかなさそうです。

更にドキュメント読み進めていくと、Lambdaレスポンスストリームを呼び出す側、呼び出される側双方ともに実装を変えなくてはいけないことも分かりました。

早速コードから見ていきましょう。

Lambdaレスポンスストリームを呼び出す側(Slackとの疎通担当)

レスポンスストリーミングを受け取れる様に実装を変えていきましょう。
ではいきなりですがコード全体を示します。(Slackの説明については割愛します)

import {
    InvokeWithResponseStreamCommand,
    InvokeWithResponseStreamCommandInput,
    LambdaClient,
} from "@aws-sdk/client-lambda";
import { App, BotMessageEvent, KnownEventFromType, SayFn } from "@slack/bolt";

export interface execIncreaseServerProps {
    app: App;
    message: KnownEventFromType<"message">;
    say: SayFn;
    execIncreaseServerFuncName: string;
    desired: number;
}

interface ReceivedProgressMessage {
    readonly runningCount: number;
    readonly desiredCapacity: number;
}

export const executeIncreaseServer = async (context: execIncreaseServerProps): Promise<void> => {
    const { app, message, say, execIncreaseServerFuncName, desired } = context;
    const event = message as BotMessageEvent;
    const user = event.user;

    await invokeLambda(app, say, execIncreaseServerFuncName, desired);
    await say({
        text: `<@${user}> サーバーが起動しました`,
    });
};

async function invokeLambda(
    app: App,
    say: SayFn,
    execIncreaseServerFuncName: string,
    desired: execIncreaseServerProps["desired"]
): Promise<void> {
    const startText = "サーバーを起動しています...";
    const { channel, ts: timestamp } = await say({
        text: startText,
    });

    // 最初のメッセージのレスポンスが取れなければエラー
    if (!channel || !timestamp) {
        throw new Error("channel or timestamp is undefined");
    }

    const invokeParams: InvokeWithResponseStreamCommandInput = {
        FunctionName: execIncreaseServerFuncName,
        InvocationType: "RequestResponse",
        Payload: JSON.stringify({
            desiredCapacity: desired,
        }),
    };
    const command = new InvokeWithResponseStreamCommand(invokeParams);

    const lambdaClient = new LambdaClient();
    const streamResponse = await lambdaClient.send(command);

    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    const eventStream = streamResponse.EventStream!;

    for await (const event of eventStream) {
        if (event.PayloadChunk) {
            const uint8Array = event.PayloadChunk.Payload;
            const { runningCount, desiredCapacity } = JSON.parse(
                new TextDecoder().decode(uint8Array)
            ) as ReceivedProgressMessage;
            await app.client.chat.update({
                token: app.client.token,
                channel: channel,
                ts: timestamp,
                text: `${startText}\n${getProgressText(runningCount, desiredCapacity)}`,
            });
        } else if (event.InvokeComplete) {
            return;
        }
    }
}

function getProgressText(runningCount: number, desiredCapacity: number): string {
    const bar = "".repeat(runningCount);
    return `[${bar.padEnd(desiredCapacity, " ")}] 現在${runningCount}台/目標${desiredCapacity}台`;
}

具体的に変更した部分を上から見ていきましょう。

まず必要なSDKは AWS SDK for JavaScript v3 なのでそれを呼び出しています。

import {
    InvokeWithResponseStreamCommand,
    InvokeWithResponseStreamCommandInput,
    LambdaClient,
} from "@aws-sdk/client-lambda";

次は本題のレスポンスストリーム対応したLambdaを呼び出します。
execIncreaseServerFuncName でレスポンスストリーム対応したLambda(次項目で例示)の名前を入れて、InvocationTypeRequestResponse を指定します。
後者はデフォルトでそうなっていますが敢えて明示しています。これで呼び出される側の関数が応答返すかタイムアウトするまで接続をオープンにしたままになります。(参考

このパラメータを Lambda に send し、ストリームレスポンスを受け取る用意ができました。

const invokeParams: InvokeWithResponseStreamCommandInput = {
        FunctionName: execIncreaseServerFuncName,
        InvocationType: "RequestResponse",
        Payload: JSON.stringify({
            desiredCapacity: desired, // 何台に増強したいかの数値を渡します
        }),
    };
const command = new InvokeWithResponseStreamCommand(invokeParams);

const lambdaClient = new LambdaClient();
const streamResponse = await lambdaClient.send(command);

最後に受け取るレスポンスを取り出していきます。

データボディは streamResponse.EventStream に入っていますが型としては Nullable となります。(当該ドキュメント
ただこのままだとエラー出てしまうので無理やり Non-Null assertion しています。

streamResponse.EventStream は AsyncIterable なので for await を使いペイロードを取り出していきます。
そのペイロードについては Uint8Array で返されるそう(参考)なので、存在すればデコードしSlackに投稿します。またテキスト生成するときに getProgressText という自前の関数を用意して進捗バーを表示させています。

全てのペイロードを返し終われば InvokeComplete が返されるはずですので処理を終わらせます。

ただ正常に終了しなくても InvokeComplete は呼ばれると思われますので、クリティカルな業務に導入する場合は event.InvokeComplete?.ErrorCode とかでエラーハンドリングした方が良いでしょう。

    // eslint-disable-next-line @typescript-eslint/no-non-null-assertion
    const eventStream = streamResponse.EventStream!;

    for await (const event of eventStream) {
        if (event.PayloadChunk) {
            const uint8Array = event.PayloadChunk.Payload;
            const { runningCount, desiredCapacity } = JSON.parse(
                new TextDecoder().decode(uint8Array)
            ) as ReceivedProgressMessage;
            await app.client.chat.update({
                token: app.client.token,
                channel: channel,
                ts: timestamp,
                text: `${startText}\n${getProgressText(runningCount, desiredCapacity)}`,
            });
        } else if (event.InvokeComplete) {
            return;
        }
    }

呼び出される側(サーバ増強担当。サーバ台数をレスポンスストリームで返す側)

では呼ばれる側もレスポンスストリームに対応させていきます。
かなり端折ってはいますが全体のコードを示します。

import { Context } from "aws-lambda";
import { Lambda } from "aws-sdk";

// eslint-disable-next-line @typescript-eslint/no-explicit-any
declare const awslambda: any;

export const handler = awslambda.streamifyResponse(
    async (event: Event, responseStream: NodeJS.WritableStream, context: Context) => {
        await main(event, responseStream, context);
        responseStream.end();
    }
);

// eslint-disable-next-line @typescript-eslint/no-unused-vars
async function main(event: Event, responseStream: NodeJS.WritableStream, context: Context): Promise<void> {

    // サーバ増強用の色々な処理を書いてますが省略。

    for (let count = 0; count < 90; count++) {
        await sleep(10 * 1000);

        const runningCount = await getServerCount(); // 今のサーバ台数を取得
        responseStream.write(JSON.stringify({ runningCount: runningCount, desiredCapacity: minCapacity }));

        const isSatisfied = await isSatisfied(minCapacity); // プッシュ通知に必要な最低台数を上回れば処理終了
        if (isSatisfied) {
            return;
        }
    }
}

上から見ていきます。

まずドキュメント通り handler を streamifyResponse でラップしています。

ここで便宜的に宣言している awslambda について実際には Lambdaランタイムで定義されているものです。(宣言はこちら、実装はこちら)もちろん型定義ファイルを自前で用意するとかでもOKです。

今回は元々あった handler関数を main とリネームし streamifyResponse でラップすることで、関数全体をレスポンスストリームにするんだなと分かりやすくしています。
その中の最後で responseStream.end() とすることで一通り処理が終わってからストリーム終了することを明示してます。

// eslint-disable-next-line @typescript-eslint/no-explicit-any
declare const awslambda: any;

export const handler = awslambda.streamifyResponse(
    async (event: Event, responseStream: NodeJS.WritableStream, context: Context) => {
        await main(event, responseStream, context);
        responseStream.end();
    }
);

あとは main関数の中でストリーム形式で受け取りたい情報を responseStream.write() で書き込んでいきます。
ここでは10秒ごとに、用意されたサーバ台数と目標台数を write しています。
これで基本的な実装はできました。

// eslint-disable-next-line @typescript-eslint/no-unused-vars
async function main(event: Event, responseStream: NodeJS.WritableStream, context: Context): Promise<void> {

    // ↑でサーバ増強用の色々な処理を書いてますが省略。

    for (let count = 0; count < 90; count++) {
        await sleep(10 * 1000);

        const runningCount = await getServerCount(); // 今のサーバ台数を取得
        responseStream.write(JSON.stringify({ runningCount: runningCount, desiredCapacity: minCapacity }));

        const isSatisfied = await isSatisfied(minCapacity); // プッシュ通知に必要な最低台数を上回れば処理終了
        if (isSatisfied) {
            return;
        }
    }
}

ちなみに公式だと const pipeline = require("util").promisify(require("stream").pipeline); という感じで pipeline() を使うのが推奨されています。

ただ今回は上手くレスポンス取得できないのであきらめました。

性能重視の場合は推奨通りに実装した方が良いとは思いますが、今回はそういう要件ではないので上記の通りにしました。

結果

少し複雑にはなりましたが、それ程記述量が多くならずにレスポンスストリーム対応させることができました。

編成チームからは好評頂いているようで、時間かかりましたがやって良かったなという感想です。

執筆時点では AWS SDK を使う場合は言語やバージョンが限られ、ググっても実装例が少ないなど導入へのハードルはちょい高めかなとは思いました。ただ実際利用してみて便利だなとは思ったので是非ともご活用ください。

告知

NewsPicks ではエンジニアを募集中です!ご興味のある方はこちらまで。

今回の記事がおもしろいと思ったら NewsPicks アドベントカレンダーの他の記事も見てみてくださいね。
明日はバーチャル武藤さんが書いてくれます。お楽しみに!

3
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
2