2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS Lambda Durable FunctionsでHuman-in-the-loopなAIワークフローを作ってみた

Last updated at Posted at 2025-12-07

この記事は、Japan AWS Jr. Champions Advent Calendar 2025 シリーズ1・7日目の記事となります。

はじめに

AWS re:Invent 2025で発表された AWS Lambda Durable Functions は、長時間の待機(最大1年)や中断/再開、ステップの明示化をLambdaでネイティブに扱える仕組みです。

「AIが作業して、人を待って、リトライして、判断をする」という、AI時代の業務プロセスに必要な機能がLambdaだけで書けるようになりました。

今回はこれを使って、以下のようなワークフローを全部サーバーレスで作ってみました:

  • Bedrock Nova 2 Liteで文章生成
  • DynamoDBに保存
  • 人間がレビューして承認or差し戻し
  • AIが自動で修正
  • 最大N回繰り返し
  • タイムアウトで自動終了

本記事では、「AIが文章を生成し、人間がレビューし、差し戻しの場合は決められた回数内、AIが修正を繰り返すワークフロー」の実装方法と学びをまとめます。

Durable Functionsとは

AWS Lambda Durable Functions は、従来のLambdaでは難しかった「長時間実行」「状態管理」「中断/再開」をネイティブにサポートする新機能です。

従来のLambdaとの違い

項目 従来のLambda Durable Functions
最大実行時間 15分 最大 1 年(ステップ + 待機 + 再開 を含む)
待機中の課金 実行時間分課金 computeコストなし(実行時間に含まれない)
状態管理 外部DB必須 SDK内で管理
リトライ 手動実装 組み込み機能

各ステップは Lambda function timeout(最大15分) に収まる必要があります。
1年はワークフロー全体の経過時間の上限です。

主な特徴

  • @durable_execution:関数全体をdurable実行として定義
  • context.step():各ステップを明示的に管理
  • context.wait():課金なしで待機(最大1年)
  • 自動リトライ:失敗時の再実行が組み込み

チェックポイントと保持期間

  • 実行履歴とcheckpointは自動で保持
  • デフォルト14日、最大90日で削除
  • 期限後は実行の詳細の取得や再実行不可

長い保持はデバッグや監査に有効なものの、ストレージコストに影響します。

これにより、「人間を待つ」「外部イベントを待つ」といった業務プロセスが、Lambdaだけで書けるようになりました。

HITLとは

HITL(Human-In-The-Loop) は、「人間がループの中に入る」という意味で、AIと人間が協調して作業を進めるアーキテクチャパターンです。

HITLが必要な理由

AIは強力ですが、以下のような場面では人間の判断が不可欠です:

  • 最終判断:法的責任、倫理的判断
  • 品質保証:AIの出力が要件を満たしているか
  • 例外処理:AIが対応できないエッジケース
  • ドメイン知識:専門的な文脈理解

HITLの典型的なフロー

今回の実装は、この 「AI生成 → 人間レビュー → AI修正」のループを、Durable Functionsで実現したものです。

使用スタック

  • AWS Lambda Durable Functions
  • Amazon Bedrock (Nova 2 Lite)
  • DynamoDB
  • Python (boto3)

アーキテクチャ

システム全体は3つのコンポーネントで構成されています。

コンポーネント説明

  1. Durable Lambda(メイン処理)

    • AIによるドラフト生成
    • 人間レビューの待機
    • 修正ループの制御
  2. DynamoDB(状態管理)

    • ドラフトとレビュー状態を保存
    • Lambda間の情報共有
  3. Review Updater Lambda(レビュー更新)

    • 人間がレビュー結果を反映
    • 承認/差し戻しの判断を記録

DynamoDBスキーマ

項目 説明
taskId タスクID(パーティションキー)
draft 生成された文章
status PENDING_REVIEW / APPROVED / REJECTED
version ドラフトのバージョン番号
reviewComment レビュアーのコメント

フロー解説

実際の処理フローを詳しく見ていきます。

ポイント

  • 最大N回の修正ループ:無限ループを防ぐ
  • タイムアウト機能:人間が反応しない場合は自動終了
  • バージョン管理:修正履歴を追跡可能
  • 課金なし待機:数日〜数週間待っても追加コストなし

実装

実装の重要部分を解説します。各セクションの折りたたみに全コードを載せています。

1. DynamoDBテーブル

  • テーブル名HumanReviewTasks
  • パーティションキーtaskId (String)
  • キャパシティ:PAY_PER_REQUEST(オンデマンド)

2. メイン処理Lambda

from aws_durable_execution_sdk_python import (
    DurableContext,
    durable_execution,
    durable_step,
)

@durable_execution
def lambda_handler(event: dict, context: DurableContext):
    task_id = event.get("taskId")
    user_profile = event.get("userProfile", {})
    max_rounds = int(event.get("maxRounds", 3))
    
    # 1回目ドラフト生成
    draft = context.step(
        generate_first_draft(task_id, user_profile),
        name="generate-draft-v1",
    )
    version = 1
    
    while True:
        # DynamoDBに保存
        context.step(
            put_task_item(task_id, draft, version, "PENDING_REVIEW", ""),
            name=f"save-draft-v{version}",
        )
        
        # 人間のレビューを待つ(課金なし待機)
        status, review_comment = wait_for_review(
            context=context,
            task_id=task_id,
            max_attempts=max_attempts_per_round,
            wait_seconds=wait_seconds,
            round_label=f"round{version}",
        )
        
        # 承認されたら終了
        if status == "APPROVED":
            return {"status": "APPROVED", "draft": draft}
        
        # タイムアウトなら終了
        if status in ("TIMEOUT", "NOT_FOUND"):
            return {"status": "TIMEOUT", "draft": draft}
        
        # 差し戻し上限に達したら終了
        if version >= max_rounds:
            return {"status": "REJECTED", "draft": draft}
        
        # AIが修正して次のループへ
        new_version = version + 1
        draft = context.step(
            generate_revised_draft(task_id, user_profile, draft, review_comment),
            name=f"generate-draft-v{new_version}",
        )
        version = new_version

重要なポイント

  • @durable_step:各ステップを明示的に定義
  • context.wait(Duration.from_seconds(60))課金なしで待機
  • context.step():ステップの実行と状態管理
Durable Lambda 全コード
import os
import json
import boto3

from aws_durable_execution_sdk_python import (
    DurableContext,
    durable_execution,
    durable_step,
)
from aws_durable_execution_sdk_python.config import Duration

# ========= 環境変数 =========
TABLE_NAME = os.environ["TABLE_NAME"]
BEDROCK_REGION = os.environ.get("BEDROCK_REGION", "us-east-1")
MODEL_ID = os.environ.get("MODEL_ID", "amazon.nova-lite-v1:0")

# ========= クライアント =========
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(TABLE_NAME)
bedrock = boto3.client("bedrock-runtime", region_name=BEDROCK_REGION)


def _nova_converse(system_prompt: str, user_prompt: str) -> str:
    response = bedrock.converse(
        modelId=MODEL_ID,
        system=[{"text": system_prompt}],
        messages=[{"role": "user", "content": [{"text": user_prompt}]}],
        inferenceConfig={"maxTokens": 900, "temperature": 0.7},
    )
    message = response["output"]["message"]
    contents = message.get("content", [])
    texts = [c.get("text", "") for c in contents if "text" in c]
    return "\n".join(texts).strip()


@durable_step
def generate_first_draft(step_context, task_id: str, user_profile: dict) -> str:
    profile_str = json.dumps(user_profile, ensure_ascii=False)
    system_prompt = """
あなたは日本の金融機関に所属するアナリストです。
日本の法令・コンプライアンスを意識し、過度に断定的な表現や
将来の成果を保証するような言い回しは避けてください。
読者にとって分かりやすく、落ち着いたトーンで説明します。
    """.strip()
    user_prompt = f"""
以下の条件で投資レポートのドラフトを書いてください。
- 対象: 日本株の長期積立投資を検討している30代会社員
- 読者プロファイル: {profile_str}
- トーン: 穏やかで誠実、専門用語は噛み砕いて説明
- 文字数: 日本語で約800文字
- 出力は本文のみ。前後に説明やラベルは付けないこと。
""".strip()
    draft = _nova_converse(system_prompt, user_prompt)
    step_context.logger.info(f"Generated first draft for task {task_id}")
    return draft


@durable_step
def generate_revised_draft(step_context, task_id: str, user_profile: dict,
                           previous_draft: str, review_comment: str) -> str:
    profile_str = json.dumps(user_profile, ensure_ascii=False)
    system_prompt = """
あなたは日本の金融機関に所属するアナリストです。
レビュアーのコメントを踏まえて投資レポートを改善します。
日本の法令・コンプライアンスに配慮し、誤解を招く表現は避けてください。
    """.strip()
    user_prompt = f"""
以下のドラフトが人間レビュアーによって差し戻されました。

--- 読者プロファイル ---
{profile_str}

--- 元のドラフト ---
{previous_draft}

--- レビュアーコメント ---
{review_comment}

このコメントを踏まえて、ドラフトを改善してください。
出力は修正後の本文のみとし、前後に説明やラベルを付けないでください。
""".strip()
    new_draft = _nova_converse(system_prompt, user_prompt)
    step_context.logger.info(f"Generated revised draft for task {task_id}")
    return new_draft


@durable_step
def put_task_item(step_context, task_id: str, draft: str, version: int,
                  status: str, review_comment: str = ""):
    table.put_item(
        Item={
            "taskId": task_id,
            "draft": draft,
            "status": status,
            "version": version,
            "reviewComment": review_comment,
        }
    )
    step_context.logger.info(f"Put task item taskId={task_id}, version={version}")
    return True


@durable_step
def get_task_status(step_context, task_id: str) -> dict:
    res = table.get_item(Key={"taskId": task_id}, ConsistentRead=True)
    if "Item" not in res:
        return {"status": "NOT_FOUND", "reviewComment": ""}
    item = res["Item"]
    return {"status": item.get("status", "PENDING_REVIEW"),
            "reviewComment": item.get("reviewComment", "")}


def wait_for_review(context: DurableContext, task_id: str,
                    max_attempts: int, wait_seconds: int, round_label: str):
    for attempt in range(max_attempts):
        context.wait(Duration.from_seconds(wait_seconds))
        result = context.step(
            get_task_status(task_id),
            name=f"check-status-{round_label}-{attempt}",
        )
        status = result.get("status", "PENDING_REVIEW")
        review_comment = result.get("reviewComment", "")
        if status in ("APPROVED", "REJECTED", "NOT_FOUND"):
            return status, review_comment
    return "TIMEOUT", ""


@durable_execution
def lambda_handler(event: dict, context: DurableContext):
    task_id = event.get("taskId")
    if not task_id:
        raise ValueError("taskId is required")

    user_profile = event.get("userProfile", {})
    max_rounds = int(event.get("maxRounds", 3))
    max_attempts_per_round = int(event.get("maxAttemptsPerRound", 3))
    wait_seconds = int(event.get("waitSeconds", 60))

    draft = context.step(
        generate_first_draft(task_id, user_profile),
        name="generate-draft-v1",
    )
    version = 1

    while True:
        context.step(
            put_task_item(task_id, draft, version, "PENDING_REVIEW", ""),
            name=f"save-draft-v{version}",
        )

        status, review_comment = wait_for_review(
            context=context,
            task_id=task_id,
            max_attempts=max_attempts_per_round,
            wait_seconds=wait_seconds,
            round_label=f"round{version}",
        )

        if status == "APPROVED":
            return {"taskId": task_id, "status": "APPROVED", "draft": draft}

        if status in ("TIMEOUT", "NOT_FOUND"):
            return {"taskId": task_id, "status": "TIMEOUT", "draft": draft}

        if version >= max_rounds:
            return {"taskId": task_id, "status": "REJECTED", "draft": draft}

        new_version = version + 1
        draft = context.step(
            generate_revised_draft(task_id, user_profile, draft, review_comment),
            name=f"generate-draft-v{new_version}",
        )
        version = new_version

3. レビュー用 Lambda

DynamoDBの statusreviewComment を更新するだけのシンプルな関数です。

def lambda_handler(event, context):
    task_id = event["taskId"]
    status = event["status"]  # "APPROVED" or "REJECTED"
    review_comment = event.get("reviewComment", "")
    
    table.update_item(
        Key={"taskId": task_id},
        UpdateExpression="SET #s = :s, reviewComment = :c",
        ExpressionAttributeNames={"#s": "status"},
        ExpressionAttributeValues={
            ":s": status,
            ":c": review_comment,
        },
    )
    
    return {"taskId": task_id, "status": status}

UIはなくても、Lambdaコンソールから手動実行すればOKです。

Review Updater Lambda 全コード
import os
import json
import boto3

dynamodb = boto3.resource("dynamodb")
TABLE_NAME = os.environ["TABLE_NAME"]
table = dynamodb.Table(TABLE_NAME)


def lambda_handler(event, context):
    """
    期待する event:
    {
      "taskId": "task-001",
      "status": "APPROVED" | "REJECTED",
      "reviewComment": "ここをこう直してほしい"
    }
    """
    if isinstance(event, str):
        event = json.loads(event)

    task_id = event["taskId"]
    status = event["status"]
    review_comment = event.get("reviewComment", "")

    if status not in ("APPROVED", "REJECTED"):
        raise ValueError("status must be APPROVED or REJECTED")

    table.update_item(
        Key={"taskId": task_id},
        UpdateExpression="SET #s = :s, reviewComment = :c",
        ExpressionAttributeNames={"#s": "status"},
        ExpressionAttributeValues={
            ":s": status,
            ":c": review_comment,
        },
    )

    return {
        "taskId": task_id,
        "status": status,
        "reviewComment": review_comment,
    }

動作デモ

実際にシステムを動かした際の流れを紹介します。

実行イメージ

  1. メイン処理Lambdaを実行(Lambdaコンソールの「テスト」タブ)

     {
       "taskId": "task-001",
       "userProfile": {
         "age": 35,
         "riskTolerance": "middle",
         "goal": "老後資金と教育資金をバランスよく準備したい"
       },
       "maxRounds": 3,
       "maxAttemptsPerRound": 3,
       "waitSeconds": 60
       }
     }
    
  2. DynamoDBに PENDING_REVIEW のドラフトが保存される
    DyanamoDB_pending_preview

  3. Review Updater Lambda で差し戻し

    {
      "taskId": "task-001",
      "status": "REJECTED",
      "reviewComment": "リスク説明が足りません"
    }
    

    DyanamoDB_rejected

  4. AIが修正して再度 PENDING_REVIEW になる
    DyanamoDB_pending_preview2

  5. 承認すると APPROVED で終了

    {
      "taskId": "task-001",
      "status": "APPROVED"
    }
    

    DyanamoDB_approved

実務での応用

このアーキテクチャが活きそうな実務シーンを紹介します。

適用できる業務

  • レポート生成&レビュー:AIが週次レポートを作成、上司が承認
  • RFP回答:AIが初稿作成、営業が修正指示
  • 法務レビュー:AIが契約書ドラフト、法務が確認
  • 広告コピー生成:AIが複数案作成、マーケが選択
  • コードレビュー:AIがリファクタ提案、開発者が判断
  • 顧客応対テンプレ:AIが返信案作成、CSが承認

「AIが書いて、人がチェックする」ケースで適用可能です。

実装時の拡張ポイント

実プロダクト化する際は、以下を追加すると良さそうです。

  • Slack連携:レビュー通知とボタンでの承認/差し戻し
  • Web UI:ドラフトの可視化と編集機能
  • API Gateway:外部システムからのトリガー
  • 監査ログ:誰がいつ何を判断したかの記録
  • 通知機能:タイムアウト前のリマインダー

逆に向かないケース

  • リアルタイム性が重要:即座の応答が必要な場合
  • 完全自動化したい:人間の判断が不要な場合
  • 複雑なUI操作:ドラッグ&ドロップなど高度な操作が必要

Durable Functions は、人間の承認や外部イベントのような、
待ちが発生する業務プロセスに強い仕組みだと感じました。

おわりに

状態管理、分岐、人間を待つといった処理を、従来であれば Step Functions で実装していたところを、Python のコードだけで完結できた点は実装体験として非常に快適でした。

一方で、Step Functions のようにフローが図で可視化されないため、現在どこまで進んでいるかをコンソールから直感的に把握できない点は、運用面での課題になりそうです。

また、「人間が何もしなくても最終的には終わる」HITL設計は実務で重要です。
レビューが無視されれば TIMEOUT、差し戻しが続けば GIVE UP などの終点に流し、
放置されてもワークフローが詰まらないようにしておくことで、運用負荷を下げられます。
これらのパターンは Step Functions でも実現できますが、同じ設計をコード中心に実装できる点は Durable Functions の魅力だと感じました。

参考リンク

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?