この記事は、Japan AWS Jr. Champions Advent Calendar 2025 シリーズ1・7日目の記事となります。
はじめに
AWS re:Invent 2025で発表された AWS Lambda Durable Functions は、長時間の待機(最大1年)や中断/再開、ステップの明示化をLambdaでネイティブに扱える仕組みです。
「AIが作業して、人を待って、リトライして、判断をする」という、AI時代の業務プロセスに必要な機能がLambdaだけで書けるようになりました。
今回はこれを使って、以下のようなワークフローを全部サーバーレスで作ってみました:
- Bedrock Nova 2 Liteで文章生成
- DynamoDBに保存
- 人間がレビューして承認or差し戻し
- AIが自動で修正
- 最大N回繰り返し
- タイムアウトで自動終了
本記事では、「AIが文章を生成し、人間がレビューし、差し戻しの場合は決められた回数内、AIが修正を繰り返すワークフロー」の実装方法と学びをまとめます。
Durable Functionsとは
AWS Lambda Durable Functions は、従来のLambdaでは難しかった「長時間実行」「状態管理」「中断/再開」をネイティブにサポートする新機能です。
従来のLambdaとの違い
| 項目 | 従来のLambda | Durable Functions |
|---|---|---|
| 最大実行時間 | 15分 | 最大 1 年(ステップ + 待機 + 再開 を含む) |
| 待機中の課金 | 実行時間分課金 | computeコストなし(実行時間に含まれない) |
| 状態管理 | 外部DB必須 | SDK内で管理 |
| リトライ | 手動実装 | 組み込み機能 |
各ステップは Lambda function timeout(最大15分) に収まる必要があります。
1年はワークフロー全体の経過時間の上限です。
主な特徴
-
@durable_execution:関数全体をdurable実行として定義 -
context.step():各ステップを明示的に管理 -
context.wait():課金なしで待機(最大1年) - 自動リトライ:失敗時の再実行が組み込み
チェックポイントと保持期間
- 実行履歴とcheckpointは自動で保持
- デフォルト14日、最大90日で削除
- 期限後は実行の詳細の取得や再実行不可
長い保持はデバッグや監査に有効なものの、ストレージコストに影響します。
これにより、「人間を待つ」「外部イベントを待つ」といった業務プロセスが、Lambdaだけで書けるようになりました。
HITLとは
HITL(Human-In-The-Loop) は、「人間がループの中に入る」という意味で、AIと人間が協調して作業を進めるアーキテクチャパターンです。
HITLが必要な理由
AIは強力ですが、以下のような場面では人間の判断が不可欠です:
- 最終判断:法的責任、倫理的判断
- 品質保証:AIの出力が要件を満たしているか
- 例外処理:AIが対応できないエッジケース
- ドメイン知識:専門的な文脈理解
HITLの典型的なフロー
今回の実装は、この 「AI生成 → 人間レビュー → AI修正」のループを、Durable Functionsで実現したものです。
使用スタック
- AWS Lambda Durable Functions
- Amazon Bedrock (Nova 2 Lite)
- DynamoDB
- Python (boto3)
アーキテクチャ
システム全体は3つのコンポーネントで構成されています。
コンポーネント説明
-
Durable Lambda(メイン処理)
- AIによるドラフト生成
- 人間レビューの待機
- 修正ループの制御
-
DynamoDB(状態管理)
- ドラフトとレビュー状態を保存
- Lambda間の情報共有
-
Review Updater Lambda(レビュー更新)
- 人間がレビュー結果を反映
- 承認/差し戻しの判断を記録
DynamoDBスキーマ
| 項目 | 説明 |
|---|---|
taskId |
タスクID(パーティションキー) |
draft |
生成された文章 |
status |
PENDING_REVIEW / APPROVED / REJECTED
|
version |
ドラフトのバージョン番号 |
reviewComment |
レビュアーのコメント |
フロー解説
実際の処理フローを詳しく見ていきます。
ポイント
- 最大N回の修正ループ:無限ループを防ぐ
- タイムアウト機能:人間が反応しない場合は自動終了
- バージョン管理:修正履歴を追跡可能
- 課金なし待機:数日〜数週間待っても追加コストなし
実装
実装の重要部分を解説します。各セクションの折りたたみに全コードを載せています。
1. DynamoDBテーブル
-
テーブル名:
HumanReviewTasks -
パーティションキー:
taskId(String) - キャパシティ:PAY_PER_REQUEST(オンデマンド)
2. メイン処理Lambda
from aws_durable_execution_sdk_python import (
DurableContext,
durable_execution,
durable_step,
)
@durable_execution
def lambda_handler(event: dict, context: DurableContext):
task_id = event.get("taskId")
user_profile = event.get("userProfile", {})
max_rounds = int(event.get("maxRounds", 3))
# 1回目ドラフト生成
draft = context.step(
generate_first_draft(task_id, user_profile),
name="generate-draft-v1",
)
version = 1
while True:
# DynamoDBに保存
context.step(
put_task_item(task_id, draft, version, "PENDING_REVIEW", ""),
name=f"save-draft-v{version}",
)
# 人間のレビューを待つ(課金なし待機)
status, review_comment = wait_for_review(
context=context,
task_id=task_id,
max_attempts=max_attempts_per_round,
wait_seconds=wait_seconds,
round_label=f"round{version}",
)
# 承認されたら終了
if status == "APPROVED":
return {"status": "APPROVED", "draft": draft}
# タイムアウトなら終了
if status in ("TIMEOUT", "NOT_FOUND"):
return {"status": "TIMEOUT", "draft": draft}
# 差し戻し上限に達したら終了
if version >= max_rounds:
return {"status": "REJECTED", "draft": draft}
# AIが修正して次のループへ
new_version = version + 1
draft = context.step(
generate_revised_draft(task_id, user_profile, draft, review_comment),
name=f"generate-draft-v{new_version}",
)
version = new_version
重要なポイント:
-
@durable_step:各ステップを明示的に定義 -
context.wait(Duration.from_seconds(60)):課金なしで待機 -
context.step():ステップの実行と状態管理
Durable Lambda 全コード
import os
import json
import boto3
from aws_durable_execution_sdk_python import (
DurableContext,
durable_execution,
durable_step,
)
from aws_durable_execution_sdk_python.config import Duration
# ========= 環境変数 =========
TABLE_NAME = os.environ["TABLE_NAME"]
BEDROCK_REGION = os.environ.get("BEDROCK_REGION", "us-east-1")
MODEL_ID = os.environ.get("MODEL_ID", "amazon.nova-lite-v1:0")
# ========= クライアント =========
dynamodb = boto3.resource("dynamodb")
table = dynamodb.Table(TABLE_NAME)
bedrock = boto3.client("bedrock-runtime", region_name=BEDROCK_REGION)
def _nova_converse(system_prompt: str, user_prompt: str) -> str:
response = bedrock.converse(
modelId=MODEL_ID,
system=[{"text": system_prompt}],
messages=[{"role": "user", "content": [{"text": user_prompt}]}],
inferenceConfig={"maxTokens": 900, "temperature": 0.7},
)
message = response["output"]["message"]
contents = message.get("content", [])
texts = [c.get("text", "") for c in contents if "text" in c]
return "\n".join(texts).strip()
@durable_step
def generate_first_draft(step_context, task_id: str, user_profile: dict) -> str:
profile_str = json.dumps(user_profile, ensure_ascii=False)
system_prompt = """
あなたは日本の金融機関に所属するアナリストです。
日本の法令・コンプライアンスを意識し、過度に断定的な表現や
将来の成果を保証するような言い回しは避けてください。
読者にとって分かりやすく、落ち着いたトーンで説明します。
""".strip()
user_prompt = f"""
以下の条件で投資レポートのドラフトを書いてください。
- 対象: 日本株の長期積立投資を検討している30代会社員
- 読者プロファイル: {profile_str}
- トーン: 穏やかで誠実、専門用語は噛み砕いて説明
- 文字数: 日本語で約800文字
- 出力は本文のみ。前後に説明やラベルは付けないこと。
""".strip()
draft = _nova_converse(system_prompt, user_prompt)
step_context.logger.info(f"Generated first draft for task {task_id}")
return draft
@durable_step
def generate_revised_draft(step_context, task_id: str, user_profile: dict,
previous_draft: str, review_comment: str) -> str:
profile_str = json.dumps(user_profile, ensure_ascii=False)
system_prompt = """
あなたは日本の金融機関に所属するアナリストです。
レビュアーのコメントを踏まえて投資レポートを改善します。
日本の法令・コンプライアンスに配慮し、誤解を招く表現は避けてください。
""".strip()
user_prompt = f"""
以下のドラフトが人間レビュアーによって差し戻されました。
--- 読者プロファイル ---
{profile_str}
--- 元のドラフト ---
{previous_draft}
--- レビュアーコメント ---
{review_comment}
このコメントを踏まえて、ドラフトを改善してください。
出力は修正後の本文のみとし、前後に説明やラベルを付けないでください。
""".strip()
new_draft = _nova_converse(system_prompt, user_prompt)
step_context.logger.info(f"Generated revised draft for task {task_id}")
return new_draft
@durable_step
def put_task_item(step_context, task_id: str, draft: str, version: int,
status: str, review_comment: str = ""):
table.put_item(
Item={
"taskId": task_id,
"draft": draft,
"status": status,
"version": version,
"reviewComment": review_comment,
}
)
step_context.logger.info(f"Put task item taskId={task_id}, version={version}")
return True
@durable_step
def get_task_status(step_context, task_id: str) -> dict:
res = table.get_item(Key={"taskId": task_id}, ConsistentRead=True)
if "Item" not in res:
return {"status": "NOT_FOUND", "reviewComment": ""}
item = res["Item"]
return {"status": item.get("status", "PENDING_REVIEW"),
"reviewComment": item.get("reviewComment", "")}
def wait_for_review(context: DurableContext, task_id: str,
max_attempts: int, wait_seconds: int, round_label: str):
for attempt in range(max_attempts):
context.wait(Duration.from_seconds(wait_seconds))
result = context.step(
get_task_status(task_id),
name=f"check-status-{round_label}-{attempt}",
)
status = result.get("status", "PENDING_REVIEW")
review_comment = result.get("reviewComment", "")
if status in ("APPROVED", "REJECTED", "NOT_FOUND"):
return status, review_comment
return "TIMEOUT", ""
@durable_execution
def lambda_handler(event: dict, context: DurableContext):
task_id = event.get("taskId")
if not task_id:
raise ValueError("taskId is required")
user_profile = event.get("userProfile", {})
max_rounds = int(event.get("maxRounds", 3))
max_attempts_per_round = int(event.get("maxAttemptsPerRound", 3))
wait_seconds = int(event.get("waitSeconds", 60))
draft = context.step(
generate_first_draft(task_id, user_profile),
name="generate-draft-v1",
)
version = 1
while True:
context.step(
put_task_item(task_id, draft, version, "PENDING_REVIEW", ""),
name=f"save-draft-v{version}",
)
status, review_comment = wait_for_review(
context=context,
task_id=task_id,
max_attempts=max_attempts_per_round,
wait_seconds=wait_seconds,
round_label=f"round{version}",
)
if status == "APPROVED":
return {"taskId": task_id, "status": "APPROVED", "draft": draft}
if status in ("TIMEOUT", "NOT_FOUND"):
return {"taskId": task_id, "status": "TIMEOUT", "draft": draft}
if version >= max_rounds:
return {"taskId": task_id, "status": "REJECTED", "draft": draft}
new_version = version + 1
draft = context.step(
generate_revised_draft(task_id, user_profile, draft, review_comment),
name=f"generate-draft-v{new_version}",
)
version = new_version
3. レビュー用 Lambda
DynamoDBの status と reviewComment を更新するだけのシンプルな関数です。
def lambda_handler(event, context):
task_id = event["taskId"]
status = event["status"] # "APPROVED" or "REJECTED"
review_comment = event.get("reviewComment", "")
table.update_item(
Key={"taskId": task_id},
UpdateExpression="SET #s = :s, reviewComment = :c",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":s": status,
":c": review_comment,
},
)
return {"taskId": task_id, "status": status}
UIはなくても、Lambdaコンソールから手動実行すればOKです。
Review Updater Lambda 全コード
import os
import json
import boto3
dynamodb = boto3.resource("dynamodb")
TABLE_NAME = os.environ["TABLE_NAME"]
table = dynamodb.Table(TABLE_NAME)
def lambda_handler(event, context):
"""
期待する event:
{
"taskId": "task-001",
"status": "APPROVED" | "REJECTED",
"reviewComment": "ここをこう直してほしい"
}
"""
if isinstance(event, str):
event = json.loads(event)
task_id = event["taskId"]
status = event["status"]
review_comment = event.get("reviewComment", "")
if status not in ("APPROVED", "REJECTED"):
raise ValueError("status must be APPROVED or REJECTED")
table.update_item(
Key={"taskId": task_id},
UpdateExpression="SET #s = :s, reviewComment = :c",
ExpressionAttributeNames={"#s": "status"},
ExpressionAttributeValues={
":s": status,
":c": review_comment,
},
)
return {
"taskId": task_id,
"status": status,
"reviewComment": review_comment,
}
動作デモ
実際にシステムを動かした際の流れを紹介します。
実行イメージ
-
メイン処理Lambdaを実行(Lambdaコンソールの「テスト」タブ)
{ "taskId": "task-001", "userProfile": { "age": 35, "riskTolerance": "middle", "goal": "老後資金と教育資金をバランスよく準備したい" }, "maxRounds": 3, "maxAttemptsPerRound": 3, "waitSeconds": 60 } } -
Review Updater Lambda で差し戻し
{ "taskId": "task-001", "status": "REJECTED", "reviewComment": "リスク説明が足りません" } -
承認すると
APPROVEDで終了{ "taskId": "task-001", "status": "APPROVED" }
実務での応用
このアーキテクチャが活きそうな実務シーンを紹介します。
適用できる業務
- レポート生成&レビュー:AIが週次レポートを作成、上司が承認
- RFP回答:AIが初稿作成、営業が修正指示
- 法務レビュー:AIが契約書ドラフト、法務が確認
- 広告コピー生成:AIが複数案作成、マーケが選択
- コードレビュー:AIがリファクタ提案、開発者が判断
- 顧客応対テンプレ:AIが返信案作成、CSが承認
「AIが書いて、人がチェックする」ケースで適用可能です。
実装時の拡張ポイント
実プロダクト化する際は、以下を追加すると良さそうです。
- Slack連携:レビュー通知とボタンでの承認/差し戻し
- Web UI:ドラフトの可視化と編集機能
- API Gateway:外部システムからのトリガー
- 監査ログ:誰がいつ何を判断したかの記録
- 通知機能:タイムアウト前のリマインダー
逆に向かないケース
- リアルタイム性が重要:即座の応答が必要な場合
- 完全自動化したい:人間の判断が不要な場合
- 複雑なUI操作:ドラッグ&ドロップなど高度な操作が必要
Durable Functions は、人間の承認や外部イベントのような、
待ちが発生する業務プロセスに強い仕組みだと感じました。
おわりに
状態管理、分岐、人間を待つといった処理を、従来であれば Step Functions で実装していたところを、Python のコードだけで完結できた点は実装体験として非常に快適でした。
一方で、Step Functions のようにフローが図で可視化されないため、現在どこまで進んでいるかをコンソールから直感的に把握できない点は、運用面での課題になりそうです。
また、「人間が何もしなくても最終的には終わる」HITL設計は実務で重要です。
レビューが無視されれば TIMEOUT、差し戻しが続けば GIVE UP などの終点に流し、
放置されてもワークフローが詰まらないようにしておくことで、運用負荷を下げられます。
これらのパターンは Step Functions でも実現できますが、同じ設計をコード中心に実装できる点は Durable Functions の魅力だと感じました。
参考リンク



