はじめに
今回は Lambda の新機能、Lambda durable functions を試してみたいと思います!
どんなテーマで作ろうかな?と考えたのですが、以前作った Chrome 拡張機能で Alexa に Web 会議の開始/終了を伝えるシステムを改造してみることにしました。
最初に簡単に Lambda durable functions について説明します。
Lambda durable functions とは
一言で言うと、「マルチステップワークフローをオーケストレーションできる Lambda 関数」だと思います。
AWS Step Functions と似た役割と考えると良いかなと思いますが、特徴的な考え方をまとめていきます。
Durable Execution(永続的な実行)
Durable Execution は、チェックポイントとリプレイという仕組みがあります。
関数が後述する一時停止や中断の後に再開すると、以前に完了したチェックポイントがリプレイされ、関数は実行を継続します。
これにより、最長 1 年間の長時間実行が可能になります。
Steps(ステップ)
ステップは、自動的にリトライとチェックポイントが組み込まれたビジネスロジックの実行単位です。
各ステップが 1 つのチェックポイントとなり、ステップの結果が保存されます。中断後は完了したステップから再開でき、完了済みのステップは再実行されません。
完了したステップから再開するような動きはするのですが、実際は関数の最初から実行され、チェックポイントに保持した内容を結果として利用し、処理をスキップしたように振る舞います。
なお、ステップ外の処理はリプレイ時に都度実行されてしまうので、注意が必要です!
つまり、ステップ外の処理において都度結果が変わるような DB 参照や、乱数生成は、予期せぬ動作になる可能性があります。
Wait States(待機状態)
Wait States は、関数が実行を停止する(課金も停止)計画的な一時停止です。
待機には、指定時間待つ「時間待機」と、外部からのコールバックを待つ「外部コールバック」の 2 種類があります。
この待機を挟むことで、15 分の実行時間の制限を回避することが可能です。
その他ベストプラクティスなどは以下をご確認ください!
試してみた。
では、実際に試してみたいと思います。
変更内容
以下の赤枠の Lambda を durable functions に変更していきます。
(前回のブログより少し構成図を変えました。)
現行の処理は、会議開始/終了の通知を Chrome 拡張機能から受信するたびに、それを別の Alexa Smarthome Skill のバックエンドとして動いている Lambda に中継するだけです。
そこを以下のようなワークフローに変更します。
ステップ設計
ステップは以下のようにします。
会議開始時のステップ
- checkMeetingState(会議の状態チェック)
- notifyStart(会議開始通知)
- saveCallbackId(コールバック ID 保存)
- wait-for-meeting-end(会議終了待ち)
- notifyEnd(会議終了)
- Finalize(後処理)
会議終了時のステップ
- checkMeetingState(会議の状態チェック)
- completeCallback(コールバック完了通知)
会議の開始終了でフローが2つあるのですが今回はひとつの Lambda で実現しています。
わかりにくいので、全体のフローを載せておきます。
実行単位でもまとめてみました。
作ってみる。
では、作成していきます。
作成は新規作成時のみ指定でき、下記の設定にチェックを入れるだけです。
なお、少し表記はリージョンによって異なるみたいです。
初期コードは以下のような感じで、ステップと時限式の Wait があり分かりやすいですね。
import { withDurableExecution } from "@aws/durable-execution-sdk-js";
export const handler = withDurableExecution(async (event, context) => {
// TODO implement
await context.step("Step #1", (stepCtx) => {
stepCtx.logger.info("Hello from step #1");
});
// Pause for 1 second without consuming CPU cycles or incurring usage charges
await context.wait({ seconds: 1 });
// Context logger is replay aware and will not log the same message multiple times
context.logger.info("Waited for 1 second");
const message = await context.step("Step #2", async () => {
return "Hello from Durable Lambda!";
});
const response = {
statusCode: 200,
body: JSON.stringify(message),
};
return response;
});
あとはコーディングするだけです。
完成したソースは以下に格納しました。
Chrome 拡張機能との接続
最後に、Chrome 拡張機能との接続のために API Gateway との接続が必要なのですが、通常の呼び出し方法だと API Gateway 側の 29 秒制限に引っかかってしまいます。
そこで以下2つの方法で対応してみました。
1. Lambda の非同期呼び出し
非同期呼び出しはX-Amz-Invocation-Type:Eventを付与することで実現できます。
CDK で書くとこんな感じです!
// API Gateway → Lambda 非同期統合
const lambdaIntegration = new apigateway.LambdaIntegration(alias, {
proxy: false,
integrationResponses: [
{
statusCode: "202",
responseTemplates: {
"application/json": '{"message": "Request accepted"}',
},
},
],
requestTemplates: {
"application/json": JSON.stringify({
body: "$util.escapeJavaScript($input.body)",
headers: "$input.params().header",
}),
},
passthroughBehavior: apigateway.PassthroughBehavior.NEVER,
requestParameters: {
"integration.request.header.X-Amz-Invocation-Type": "'Event'",
},
});
これで、最大 1 年の実行を待つ必要なく、Lambda を非同期でキックすることができます。
2. EventBridge 統合
API Gateway と Lambda durable functions の間に EventBridge を挟むことで非同期呼び出しを実現し、Lambda durable functions を呼び出すことでも正常に動作しました。
このようなイメージですね。
まとめ
実際に実装をしてみることで、ステップ・リプレイの動作を理解することができました。
特にステップ外の処理がリプレイ時に再度実行される挙動は注意が必要ですね!
また、一つの Lambda 関数に Lambda Durable Functions 固有の処理と、ビジネスロジックを同じ場所に書く必要があるので、コードの書き方は注意が必要かなとは思いました。
無駄な複雑性が生まれないような作りを検討してみたいです!


