11
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Lambda Durable Functionsで実装する音声要約システム - 長時間実行ワークフローと人間承認フローの実装

11
Posted at

はじめに

2025年のre:Inventで発表されたAWS Lambda Durable Functionsを触ってみました。

これまでLambdaは実行時間に制限があったため、長時間処理が必要な場合はStep Functionsを使うのが定石でしたが、Durable Functionsの登場でコード内で完結できるようになったのは大きいですね。

今回は音声ファイルの文字起こし→要約生成→人間の承認待ち、という実用的なユースケースで試してみたので、その実装方法を共有します。AWS Amplify Gen 2と組み合わせることで、インフラ周りもサクッとセットアップできました。

Amplify Gen 2でのDurable Functions作成

まず最初にハマったのがここ。Amplify Gen 2のdefineFunctionではdurableConfigを直接指定できないので、CDKのL2コンストラクトNodejsFunctionを使う必要があります。

import { defineFunction } from "@aws-amplify/backend";
import { Duration } from "aws-cdk-lib";
import * as lambda from "aws-cdk-lib/aws-lambda";
import * as nodejs from "aws-cdk-lib/aws-lambda-nodejs";
import * as iam from "aws-cdk-lib/aws-iam";

export const audioSummary = defineFunction(
  (scope) => {
    // NodejsFunctionを直接使用してdurableConfigを設定
    const audioSummaryFunction = new nodejs.NodejsFunction(
      scope,
      "audioSummary",
      {
        entry: path.join(__dirname, "handler.ts"),
        handler: "handler",
        runtime: lambda.Runtime.NODEJS_24_X,
        timeout: Duration.minutes(15),
        memorySize: 1024,
        durableConfig: {
          executionTimeout: Duration.hours(25),
          retentionPeriod: Duration.days(7),
        },
        bundling: { format: nodejs.OutputFormat.ESM },
      }
    );

    // Durable Functions必須の権限
    audioSummaryFunction.addToRolePolicy(
      new iam.PolicyStatement({
        actions: [
          "lambda:CheckpointDurableExecution",
          "lambda:GetDurableExecutionState",
        ],
        resources: ["*"],
      })
    );

    // 修飾されたARNが必要なため、エイリアスを作成
    const functionAlias = new lambda.Alias(scope, "audioSummaryProdAlias", {
      aliasName: "prod",
      version: audioSummaryFunction.currentVersion,
    });
    (audioSummaryFunction as any).functionAlias = functionAlias;

    return audioSummaryFunction;
  },
  { resourceGroupName: "data" } // 循環参照を避けるため
);

ここがポイント

durableConfigの設定

executionTimeoutでワークフロー全体の実行時間を指定します。今回は人間の承認待ちが最大24時間なので、余裕を持って25時間に設定しました。retentionPeriodは実行完了後の状態保持期間で、デバッグ用に7日間としています。

IAM権限

Durable Functionsの状態管理には専用の権限が必要です。lambda:CheckpointDurableExecutionlambda:GetDurableExecutionStateを忘れずに付与しましょう。

エイリアスが必須

Durable Functionsは修飾されたARN(:prodとか)が必要なので、エイリアスを作成してfunctionAliasプロパティに設定する必要があります。これをやらないと動きません。

循環参照に注意

Amplify Gen 2で複数のLambda関数を連携させる場合、resourceGroupNameでスタックを分離しないと循環参照エラーになります。

実装するシステム

今回作ったのは、音声ファイルをアップロードすると自動で以下の処理が走るシステムです。

  1. Amazon Transcribeで文字起こし(完了までポーリング)
  2. Amazon Bedrockで2種類の要約を生成(ビジネス向け・カジュアル向けを並列処理)
  3. 人間が承認するまで待機(最大24時間)
  4. 選択された要約を保存

最大25時間かかる可能性がありますが、Durable Functionsなら待機中はコストがかからないので経済的です。

アーキテクチャ

Durable Functionsの主要API

1. waitForCondition - ポーリング処理

Transcribeみたいな非同期処理の完了を待つときに使います。定期的にステータスをチェックして、完了したら次に進む感じです。

const completedJobResult = await context.waitForCondition(
  "poll-transcribe",
  async (currentState, ctx) => {
    // Transcribeジョブのステータスを取得
    const response = await transcribeClient.send(
      new GetTranscriptionJobCommand({
        TranscriptionJobName: currentState.jobName,
      })
    );

    const jobStatus = response.TranscriptionJob?.TranscriptionJobStatus;

    return {
      jobName: currentState.jobName,
      status: jobStatus!,
      transcriptUri: response.TranscriptionJob?.Transcript?.TranscriptFileUri,
    };
  },
  {
    initialState: transcribeJobResult,
    waitStrategy: (state, attempt) => {
      // 完了または失敗の場合は終了
      if (
        state.status === TranscriptionJobStatus.COMPLETED ||
        state.status === TranscriptionJobStatus.FAILED
      ) {
        return { shouldContinue: false };
      }

      // 1時間でタイムアウト
      if (attempt >= 360) {
        return { shouldContinue: false };
      }

      // 10秒待機して継続
      return {
        shouldContinue: true,
        delay: { seconds: 10 },
      };
    },
  }
);

waitStrategyで待機時間とタイムアウトを柔軟にコントロールできます。今回は10秒間隔でポーリングして、1時間経ったらタイムアウトするようにしました。

待機中はLambdaが実行されていないので課金されないのが嬉しいポイント。Step Functionsみたいに別のサービスを使わず、コード内で完結できるのも楽ですね。

2. parallel - 並列処理

複数の処理を同時に走らせたいときに使います。今回はビジネス向けとカジュアル向けの2種類の要約を並列で生成しました。

const summariesResult = await context.parallel("generate-summaries", [
  // ビジネス向け要約
  async (stepContext) => {
    stepContext.logger.info("Generating business summary");
    const prompt = `以下の文字起こしテキストから、ビジネス向けの要約を作成してください。
- フォーマルな表現を使用
- 重要な決定事項やアクションアイテムを強調
- 箇条書きで整理
- 500文字程度`;
    return await invokeBedrock(prompt, transcriptText);
  },

  // カジュアル向け要約
  async (stepContext) => {
    stepContext.logger.info("Generating casual summary");
    const prompt = `以下の文字起こしテキストから、分かりやすいカジュアルな要約を作成してください。
- 会話調で親しみやすい表現
- 全体の流れを重視
- 段落形式で記述
- 500文字程度`;
    return await invokeBedrock(prompt, transcriptText);
  },
]);

// 結果の取得とエラーチェック
const summaryResults = summariesResult.getResults();
if (summariesResult.hasFailure) {
  throw new Error("Failed to generate summaries");
}

const businessSummary = summaryResults[0];
const casualSummary = summaryResults[1];

配列で渡した処理が並列実行されて、BatchResultで成功・失敗を判定できます。片方が失敗してももう片方の結果は取れるので、柔軟にエラーハンドリングできるのが便利です。

3. waitForCallback - 外部イベント待機

これが今回のメインです。人間の承認とかWebhookとか、外部からのイベントを待つときに使います。

const approvalResult = await context.waitForCallback<{
  selectedType: "BUSINESS" | "CASUAL";
}>(
  "wait-for-approval",
  async (callbackId, ctx) => {
    ctx.logger.info(`Callback ID created: ${callbackId}`);

    // callbackIdをDynamoDBに保存(フロントエンドが使用)
    await updateAudioStatus(audioId, "AWAITING_APPROVAL", {
      callbackId: callbackId,
    });

    ctx.logger.info(`Callback ID saved: ${callbackId}`);
  },
  {
    timeout: { hours: 24 }, // 24時間タイムアウト
  }
);

// 承認結果を使用
const selectedType = approvalResult?.selectedType || "BUSINESS";
const finalSummary =
  selectedType === "BUSINESS" ? businessSummary : casualSummary;

submitter関数でcallbackIdを受け取って、それをDynamoDBに保存します。フロントエンドから承認ボタンが押されたら、このcallbackIdを使ってSendDurableExecutionCallbackSuccess APIを叩いてワークフローを再開させる仕組みです。

24時間でタイムアウトするように設定していますが、タイムアウト時の処理も自分で書けます。

承認処理の実装

フロントエンドからの承認を受け取るResolver Lambdaはこんな感じです。

// approveSummary Mutation Resolver
export const handler: Schema["approveSummary"]["functionHandler"] = async (
  event
) => {
  const { audioId, selectedType } = event.arguments;

  // 1. DynamoDBからcallbackIdを取得
  const getResult = await docClient.send(
    new GetCommand({
      TableName: AUDIO_TABLE_NAME,
      Key: { id: audioId },
    })
  );

  const callbackId = getResult.Item?.callbackId;
  if (!callbackId) {
    throw new Error(`Audio ${audioId} has no callbackId`);
  }

  // 2. Durable Functionにコールバック成功を送信
  await lambdaClient.send(
    new SendDurableExecutionCallbackSuccessCommand({
      CallbackId: callbackId,
      Result: JSON.stringify({ selectedType }),
    })
  );

  // 3. Audioレコードを返却
  return getResult.Item;
};

ここでハマったのが、SendDurableExecutionCallbackSuccessはLambda関数を呼び出すんじゃなくて、Lambda APIに直接送信する点です。IAM権限にlambda:SendDurableExecutionCallbackSuccessを追加するのも忘れずに。

DynamoDB Streamsからの起動

もう一つハマったのが、Durable Functionsはエイリアス付きARNが必須なので、DynamoDB Streamsのトリガーを直接設定できないこと。

解決策は、中間にStarter Lambdaを挟むことです。

Starter Lambdaの実装

DynamoDB Streamsの変更イベントを受け取って、Durable Functionを非同期で起動するだけのシンプルなLambdaです。

// Starter Lambda (DynamoDB Streamsトリガー)
import type { DynamoDBStreamHandler } from "aws-lambda";
import { LambdaClient, InvokeCommand } from "@aws-sdk/client-lambda";

const lambdaClient = new LambdaClient({});

export const handler: DynamoDBStreamHandler = async (event) => {
  console.log("Audio Summary Starter triggered");
  console.log("Event:", JSON.stringify(event, null, 2));

  const audioSummaryArn = process.env.AUDIO_SUMMARY_ARN;
  if (!audioSummaryArn) {
    throw new Error("AUDIO_SUMMARY_ARN environment variable is not set");
  }

  for (const record of event.Records) {
    // INSERTイベント(Audio作成)のみ処理
    if (record.eventName === "INSERT") {
      const newImage = record.dynamodb?.NewImage;

      if (newImage) {
        const audioId = newImage.id?.S;
        const status = newImage.status?.S;
        const s3Key = newImage.s3Key?.S;

        console.log("New Audio created:", {
          audioId,
          status,
          s3Key,
        });

        // Durable Function(audioSummary)を非同期で呼び出し
        try {
          const command = new InvokeCommand({
            FunctionName: audioSummaryArn, // エイリアス付きARN(例: arn:aws:lambda:region:account:function:name:prod)
            InvocationType: "Event", // 非同期実行
            Payload: JSON.stringify({
              audioId,
              s3Key,
            }),
          });

          await lambdaClient.send(command);
          console.log(`Successfully invoked audioSummary for audio ${audioId}`);
        } catch (error) {
          console.error(
            `Failed to invoke audioSummary for audio ${audioId}:`,
            error
          );
          throw error;
        }
      }
    }
  }
};

アーキテクチャの構成

音声アップロード
  ↓
DynamoDBレコード作成(status: PENDING)
  ↓
DynamoDB Streams
  ↓
Starter Lambda(通常のLambda)
  ↓
Durable Function起動(エイリアス付きで呼び出し)

Starter Lambdaは普通のLambda関数として定義して、DynamoDB Streamsトリガーを設定します。Durable Functionの呼び出しには、エイリアス付きARN(:prodとか)を指定するのがポイント。

InvocationType: "Event"で非同期実行にしているので、Starter Lambdaはすぐに完了します。

実装してみての所感

ステート管理が楽

stepの完了時に自動で状態が保存されるので、障害が起きても最後のステップから再開できます。明示的な状態管理コードを書かなくていいのは楽ですね。

コストが抑えられる

waitForConditionwaitForCallbackで待機している間はLambdaが実行されていないので課金されません。従来のポーリング実装だと無駄にLambdaを起動し続けることになるので、コスト的にかなり有利です。

ロギング

context.loggerを使うと実行IDと紐づいたログが記録される

まとめ

Durable Functionsを使ってみて感じたメリットはこんな感じです。

  • コード内で完結: Step Functionsみたいに別のサービスを使わなくていい(手軽!)
  • コストが安い: 待機中は課金されない
  • 柔軟: ポーリング間隔とかタイムアウトを自分で制御できる

人間の承認が必要なワークフローとか、外部APIのポーリングが必要な処理をサクッと作りたい場合に向いてると思います!

参考リンク

おわりに

waitForConditionparallelwaitForCallbackの3つのAPIを使えば、結構複雑なワークフローもシンプルに書けることが分かりました。

Step Functionsの代わりになるかというと、用途次第かなと思いますが、コードで完結したい場合は良い選択肢だと思います。

何かの参考になれば幸いです!

11
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?