8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Strands × AgentCore Runtimeで実現するHuman in the Loop

8
Last updated at Posted at 2026-02-08

tl;dr

  • AgentCore RuntimeでHuman in the Loopしたいなら、非同期実行で頑張ろう
    • 最大8時間待機可能
    • 待機中はメモリのみ課金のサーバーレスサービス
    • Human in the Loopと相性が結構良さそう
  • 実装としてはStrandsのHooksやInterruptを使おう
    • ツール実行時に割り込み処理を入れる
    • その後、人間の承認を受け取って処理を再開する流れ

はじめに・概要

こんにちは、ふくちです。

今回はStrands Agents×AgentCore RuntimeでHuman in the Loop(以降HITLと記載)を実現するための方法とその設計思想・実装詳細・動作原理・ハマりポイントなどを解説します。

HITLとは、AIエージェントの出力や実行予定アクションなどを、人間が確認してOK/NGなどの判断を下すようなものです。
コーディングエージェントを使っている際にツール利用やプラン作成時に確認を求められることがあると思いますが、これをStrands×AgentCoreで実現するというのがゴールです。
image.png

設計フェーズ

まずは今回の構成を作るにあたっての全体図を解説します。

技術スタック

できるだけシンプルにしています。複雑なステート管理を無くし、最小構成でHITLを実現しています。

技術 役割 採用理由
Strands Agents AIエージェント構築フレームワーク Interrupts機能でツール実行前の承認フローを実装可能
AgentCore Runtime エージェント実行基盤 非同期タスク実行可能
最大8時間のセッション継続
AgentCore Runtime内のメモリ ステート管理 高速、外部依存なし
※AgentCore Memoryとは異なる

アーキテクチャ

  • AgentCore Runtimeの POST /invocations でエージェントを非同期に実行する
    • 入力ペイロードのaction次第で動作が変わる形(自前実装)
    • start: プロンプトを受けてエージェントをバックグラウンドで開始
    • list_pending: メモリー内の承認待ちリクエスト一覧を返す
    • approve/reject: 人間からの承認/拒否をメモリーに記録する
    • resume: 記録された承認レスポンスでエージェントを再開
    • result: セッションの実行結果を取得
  • エージェントが承認必要ツール(delete_fileなど)を使おうとした際にApprovalHookが起動してセッションが一時停止し、人間の承認を待つ
  • 承認を待っている間含め、Runtimeの非同期実行中は /ping がHealthyBusyになる
  • セッションは最大8時間継続する

image.png

AgentCore Runtimeの非同期実行については以前のブログをご参照ください。

Runtimeの /ping ステータス遷移や、全体フローとしては以下の通りです。

各用語・コンポーネントを整理すると以下のようになります。

コンポーネント 責務
AgentCore Runtime インフラ部分
(HTTPエンドポイント提供、非同期タスク管理、ステート管理)
Strands Agent エージェント実装部分(LLMとの対話、ツール実行)
ApprovalHook HITL実施地点(危険ツール検知、Interrupt発火、承認結果の処理)
session_states Runtime内のメモリー内で保管される
エージェントインスタンスID、実行結果、タスクIDなど
pending_approvals Runtime内のメモリー内で保管される
承認待ちリクエストの管理など

課金形態

AgentCore Runtimeの課金モデルは以下リンクに記載されています。

注意点として、HealthyBusy中もメモリ課金は発生します。ただし、CPUにおいてI/O待ち(承認待ち)中は課金されません。

Lambda durable functionsも似たような感じで、実行そのものにはお金かかりませんがデータ保存のところでお金がかかるようです。

実装詳細解説

ここからは具体的な実装の話をしていきます。

1. メモリ内ストレージでステート管理

エージェントのステートと承認待ちステートをメモリ内で管理します。
外部(DynamoDBなど)でのステート管理も可能だと思いますが、ひとまずRuntime内でステートを保持するこの形が一番シンプルだと思います。

データ構造は以下のとおりです。

agent_without_dynamo.py
# session_states の構造
session_states = {
    "abc123-def456": {
        "agent": <strands.Agent>,       # エージェントインスタンス(再開用)
        "result": <AgentResult>,         # interrupt情報を含む
        "status": "waiting_approval",    # pending/completed/error
        "prompt": "Delete /tmp/test.txt",
        "task_id": xxxxxxxxxx,        # タスク開始時に登録するタスクID
    }
}

# pending_approvals の構造
pending_approvals = {
    "abc123-def456": [
        {
            "session_id": "abc123-def456",
            "interrupt_id": "v1:before_tool_call:tooluse_xxx:yyy",
            "name": "hitl-demo-delete_files-approval",
            "reason": {
                "tool": "delete_files",
                "input": {"paths": ["/tmp/test.txt"]},
                "message": "Tool 'delete_files' requires human approval..."
            },
            "status": "pending",  # pending/approved/rejected
            "response": None,     # 承認時: "y", 信頼時: "t", 拒否時: "n"
            "created_at": "2024-01-16T12:00:00",
        }
    ]
}

interruptが発生したタイミング、すなわち承認が必要なツールを使おうとしたタイミングでセッションIDや現在の処理ステータスなどをRuntime内メモリに保管しておくような処理になっています。

session_statesにエージェントインスタンスを保持している理由は、interruptResponse形式で再開する際に同一インスタンスが必要なためです。

task_idを保存することで、resume時に元タスクを完了させるという動きに持っていきます。これがないとresumeした際に新タスクが立ち上がってしまいます。

また、リスト形式の承認待ちにすることで、1セッションにおける複数interrupt発生にも対応できるようにしています。

2. ApprovalHook(承認フック)の実装

用途は、危険なツールが実行される前にInterruptを発火し、人間の承認を要求することです。

上記以外にも、例えばエージェントによるメール送信・作業計画作成など、人間のレビューが必要な場面で起動するHookも有効です。

StrandsのHooks機能

Hooksはエージェントの動作に割り込むための仕組みです。

エージェントのライフサイクル(開始→モデル呼び出し→ツール実行→完了)の各ポイントにHooksを掛けて、自分の処理を挟み込むことができます。

エージェントのライフサイクルは以下の形であり、四角で囲ってあるポイントごとにHooksを設定することができます。

公式ドキュメントベースの図だと以下です。
image.png

もう少し補足を加えると、以下のようになります。

┌──────────────────────────────────────────────────────────────────────┐
│                         エージェントのライフサイクル                      │
├──────────────────────────────────────────────────────────────────────┤
│                                                                      │
│         ユーザー入力                                                   │
│             │                                                        │
│             ▼                                                        │
│   ┌───────────────────┐                                              │
│   │ AgentInitialized  │ ← エージェント初期化完了時                       │
│   └─────────┬─────────┘                                              │
│             ▼                                                        │
│   ┌───────────────────┐                                              │
│   │ BeforeInvocation  │ ← リクエスト開始時                              │
│   └─────────┬─────────┘                                              │
│             ▼                                                        │
│   ┌───────────────────┐                                              │
│   │ BeforeModelCall   │ ← LLMに問い合わせる前                           │
│   └─────────┬─────────┘                                              │
│             ▼                                                        │
│        [LLM処理]                                                      │
│             │                                                        │
│             ▼                                                        │
│   ┌───────────────────┐                                              │
│   │ AfterModelCall    │ ← LLMから応答が返ってきた後                      │
│   └─────────┬─────────┘                                              │
│             ▼                                                        │
│   LLMが「ツールを使いたい」と判断                                         │
│             │                                                        │
│             ▼                                                        │
│   ┌───────────────────────────────────────────────────────┐          │
│   │ BeforeToolCall  ← ツール実行前(本実装で使用)             │          │
│   │   ここで「承認が必要」と判断してInterruptを発火             │          │
│   └─────────┬─────────────────────────────────────────────┘          │
│             ▼                                                        │
│        [ツール実行]                                                    │
│             │                                                        │
│             ▼                                                        │
│   ┌───────────────────┐                                              │
│   │ AfterToolCall     │ ← ツール実行後                                 │
│   └─────────┬─────────┘                                              │
│             ▼                                                        │
│   ┌───────────────────┐                                              │
│   │ AfterInvocation   │ ← リクエスト完了時                              │
│   └───────────────────┘                                              │
│                                                                      │
└──────────────────────────────────────────────────────────────────────┘

利用可能なフックイベント一覧と、用途を纏めると以下のようになります。

イベント 発火タイミング 主な用途
AgentInitializedEvent エージェント初期化完了時 初期設定、ログ開始
BeforeInvocationEvent リクエスト開始時 認証チェック、カウンターリセット
AfterInvocationEvent リクエスト完了時(成功/失敗問わず) ログ記録、クリーンアップ
MessageAddedEvent 会話履歴にメッセージ追加時 会話ログ、フィルタリング
BeforeModelCallEvent LLMに問い合わせる前 プロンプト加工、レート制限
AfterModelCallEvent LLMから応答が返った後 応答の検証、リトライ判定
BeforeToolCallEvent ツール実行前 ★本実装で使用 承認要求、引数の検証・修正
AfterToolCallEvent ツール実行後 結果の加工、ログ記録

Hooks機能の実装方法

Hooksを使うには、HookProviderインターフェースを実装したクラスを作成します。
このクラス内でresister_hooks()メソッドを使うことで、エージェントのイベントとコールバック関数を紐づけることができます。

from strands.hooks import BeforeToolCallEvent, HookProvider, HookRegistry

class MyHook(HookProvider):
    """
    HookProvider を継承すると、
    「register_hooks」メソッドでイベントとコールバックを紐付けられる
    """

    def register_hooks(self, registry: HookRegistry, **kwargs) -> None:
        # BeforeToolCallEvent が発火したら self.my_callback を呼ぶように設定
        registry.add_callback(BeforeToolCallEvent, self.my_callback)

    def my_callback(self, event: BeforeToolCallEvent) -> None:
        # ツールが実行される直前に、この関数が自動的に呼ばれる
        print(f"ツール {event.tool_use['name']} が実行されようとしています")

また、余談ですがBeforeToolCallEventにおいては、以下のプロパティを操作することでツール実行を制御できます。

プロパティ 説明
event.tool_use["name"] str 実行されるツールの名前
event.tool_use["input"] dict ツールに渡される引数
event.cancel_tool str | None 文字列を設定すると
ツール実行をキャンセルする
(その文字列がエラーメッセージになる)
event.selected_tool Tool 別のツールに差し替え可能
event.agent Agent エージェントインスタンスへの参照
event.agent.state State 状態を保存/取得できるオブジェクト

Interruptsとは?

Interrupts(割り込み)は、エージェントの実行を一時停止して、人間の入力を待つ仕組みです。

現在のところ、割り込みが可能なのは先程のエージェントのライフサイクルにおける BeforeToolCallEvent のみとなっています。

ここで割り込みを行うことで、ツール呼び出しの実行前に割り込みを行い、人間の承認や追加入力を求めることができます。

まさしくHITLを実現するための機能です。

Interruptの動作フロー

まず、エージェントが特定のツールを使おうとしてHooksが発火し、interrupt状態へ移行し一時停止します。
エージェントのステートとしては、result.stop_reason == "interrupt" です。

event.interrupt()で作成されるInterruptオブジェクトには以下の情報が含まれます。

class Interrupt:
    id: str       # 一意の識別子(例: "v1:before_tool_call:tooluse_xxx:yyy")
    name: str     # 開発者が指定した名前(例: "hitl-demo-delete_files-approval")
    reason: Any   # 開発者が指定した理由(任意のJSON化可能オブジェクト)
    response: Any # 人間からの応答(resume後に設定される)

image.png

ユーザーがツールの使用を承認したら、delete_filesツールが実行されます。
ただしこの時、複数のInterruptがある場合はすべてに応答しないとresumeできない点に注意が必要です。

image.png

# 単一のinterruptに応答する場合
responses = [
    {
        "interruptResponse": {
            "interruptId": interrupt.id,  # どのinterruptへの応答か
            "response": "y"               # 承認の場合は "y"、拒否は "n"、信頼は "t"
        }
    }
]

# 複数のinterruptがある場合は、すべてに応答が必要
responses = [
    {"interruptResponse": {"interruptId": "interrupt_1", "response": "y"}},
    {"interruptResponse": {"interruptId": "interrupt_2", "response": "n"}},
]

# エージェントを再開
result = agent(responses)

Interruptを発火できる場所はHooksの他にTools内でも設定できるようです。
今回はHooksしか試していませんが、詳しく知りたい方は公式ドキュメントをご参照ください。

場所 方法 説明
Hooks内 event.interrupt(name, reason) BeforeToolCallEventで使用
Tools内 tool_context.interrupt(name, reason) ToolContextを受け取るツールで使用

3. エージェント開始処理

エージェントをバックグラウンドで開始します。interrupt発生時でもHealthyBusy状態を維持するため、最長約8時間の待機時間を確保することができます。

interrupt_occurredフラグはcomplete_async_task()の呼び出しを制御しています。

これはPython側の挙動の話が絡んでくるようです。Pythonではreturnしてもfinally節は実行されます。
そのため、interrupt_occurredフラグを使ってcomplete_async_task()の呼び出しを制御しています。

agent_without_dynamo.py
# Pythonの try-finally の挙動
try:
    if condition:
        return  # ← returnしても…↓
finally:
    do_something()  # ← finallyは必ず実行される

4. エージェント再開処理

承認後にエージェントを中断点(=interruptしていた箇所)から再開するための処理です。
先程までinterruptしていたタスクを指定して再実行する必要があるため、再開時はtask_idを指定しています。

この時はinterruptResponceを用いて以下の形式でレスポンスを渡します。
Runtime内メモリにinterruptIdを保管していたのは、ここで使用するためです。

agent_without_dynamo.py
# Strands Agentsが期待するフォーマット
responses = [
    {
        "interruptResponse": {
            "interruptId": "v1:before_tool_call:tooluse_xxx:yyy",  # interrupt.id
            "response": "y"  # 承認結果("y", "t", "n")
        }
    }
]

# これをagent()に渡すとinterrupt地点から再開
result = agent(responses)

# ApprovalHook.approve()内のinterrupt()呼び出しが
# このresponse値を返り値として受け取る
approval = event.interrupt(...)  # ← 承認した場合は "y" が返る

ハマりポイントと解決策

開発中にハマったポイントとその解決策を共有しておきます。

1. 処理完了後もHealthyBusyのまま(Healthyに戻らない)

一度ローカル環境で試していた際に、以下のような事象に遭遇しました。

# 承認してresumeしたのにHealthyBusyのまま
$ curl http://localhost:9080/ping
{"status": "HealthyBusy", ...}

# pending_approvalsは空
$ curl -X POST http://localhost:9080/invocations -d '{"action": "list_pending"}'
{"pending_approvals": [], "count": 0}

これの原因は、resumeする際に新しいタスクを起動していたためです。
普通、処理開始時点(ここではstart_agent_task())で起動したタスクを完了時に終了することが求められます。

しかし、実装ミスでinterrupt後のresume_agent_task()で処理再開時に新しいタスクを起動してしまっていました。

こうすると最初に開始した処理が宙ぶらりんになってしまい処理が継続している扱い(=ずっとinterruptされているタスクがある状態)になってしまうため、処理が完了してもずっとステータスがHealthyBusyになります。

agent_without_dynamo.py
# start_agent_task
task_id_A = app.add_async_task(...)  # タスクA作成 → HealthyBusy

# interrupt発生 → complete_async_task()呼ばない → タスクA未完了

# resume_agent_task(修正前)
task_id_B = app.add_async_task(...)  # ★ 新しいタスクBを作成

# 処理完了
app.complete_async_task(task_id_B)  # タスクBのみ完了
# タスクAが未完了のまま → HealthyBusy継続

解決策として、本実装ではtask_idをメモリーに保存することで対処しています。
start_agent_task()で起動した際のtask_idを指定することで、処理を正しく終えることができるようになります。結果として、ステータスもHealthyに戻るようになります。

agent_without_dynamo.py
# start_agent_task
task_id_A = app.add_async_task(...)  # タスクA作成 → HealthyBusy

# resume_agent_task(修正後)
# 処理再開時に新しいタスクは作らない、元のtask_idを使用
task_id = state.get("task_id")

# 処理完了
app.complete_async_task(task_id)  # 元のタスクAを完了 → Healthy

2. 承認待ち一覧が空になる

ローカルでは正しく動いたのですが、クラウド上(AgentCore Runtime上)へデプロイした際に正しく動かなくなったという例の共有です。

タスク自体は上手く開始でき、interrupt発生までは確認できたのですが、承認待ちリストを確認すると0件になっていました。

# タスク開始は成功
{"status": "started", "session_id": "845f9d99-...", "task_id": -24242039...}

# CloudWatchログではinterrupt発生を確認
[HITL] Agent interrupted, 1 approval(s) pending
[HITL] Saved pending approval: v1:before_tool_call:...
[HITL] Staying HealthyBusy to preserve memory state

# しかしlist_pendingは空を返す
{"pending_approvals": [], "count": 0}

原因としては、session_idを指定していなかったため、同じVMにルーティングされていなかったことです。
ローカルだと自分のマシンしか無いのでsession_idが違うことでVMが変わることまで意識できなかったのですが、クラウド上だとこういう問題が起こりました。

AgentCore RuntimeはruntimeSessionIdでリクエストをルーティングします。
これを渡しておくと、同じVMにルーティングされます。しかしこれを指定しない場合は新しいセッション(=新しいVM)にルーティングされます。

解決策として、すべてのAPI呼び出しでruntimeSessionId相当のものを指定すればOKです。
Streamlit側から呼び出すとしたらこんな感じです。

streamlit_app.py
# 1. タスク開始時にセッションIDを保存
if "session_id" in result:
    st.session_state.active_session_id = result["session_id"]

# 2. list_pendingで同じセッションIDを使用
active_session = st.session_state.get("active_session_id")
pending_result = invoke_agentcore(
    {"action": "list_pending"},
    session_id=active_session
)

# 3. approve/reject/resumeでも同様
approve_result = invoke_agentcore(
    {"action": "approve", "session_id": session_id, ...},
    session_id=session_id
)

そしてAgentCore Runtimeはboto3で呼び出します。
invoke_agent_runtime()は、同期・非同期どちらの呼び出しも可能なので、同期処理で使う時と比べて特に書き換え不要です!嬉しいところですね〜

streamlit_app.py
def invoke_runtime(payload: dict, session_id: str = None) -> dict:
    client = boto3.client("bedrock-agentcore", region_name=AWS_REGION)

    kwargs = {
        "agentRuntimeArn": AGENT_RUNTIME_ARN,
        "payload": json.dumps(payload),
        "qualifier": "DEFAULT",
    }

    # runtimeSessionIdを指定して同じコンテナにルーティング
    if session_id:
        kwargs["runtimeSessionId"] = session_id

    response = client.invoke_agent_runtime(**kwargs)
    ...

ローカル環境だと、単一プロセスなのでruntimeSessionIdが不要です。
一方AWS環境だと、複数のVMが立ち上がってくるのでruntimeSessionIdが無いと、対象のタスク状況を確認できません。
この差異は注意が必要です。

3. ローカル環境での動作確認

curlでテストを行うことができます。手元の環境でサクッと見てみたい場合はこれで良さそうです。

ただし上記でも述べた通り、AgentCore Runtimeはローカルとクラウドで一部挙動や考慮しないといけない箇所が変わってきます。
基本的にはクラウド上で動作確認することをおすすめします。

ただ、個人的には非同期実行時はローカル環境の方がログ見やすかったです。/ping が打てるのも嬉しかったですね。
この辺はお好みで…と言う感じでしょうか。

エージェント開始
curl -s -X POST http://localhost:9080/invocations \
  -H "Content-Type: application/json" \
  -d '{
    "action": "start",
    "prompt": "Please delete the file /tmp/test.txt"
  }' | jq .

# 期待するレスポンス:
# {
#   "status": "started",
#   "session_id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
#   "task_id": 1737012345678
# }
ステータス確認
# interrupt発生後もHealthyBusy
curl -s http://localhost:9080/ping | jq .
# {"status": "HealthyBusy", ...}
承認待ち確認
curl -s -X POST http://localhost:9080/invocations \
  -H "Content-Type: application/json" \
  -d '{"action": "list_pending"}' | jq .

# 期待するレスポンス:
# {
#   "pending_approvals": [
#     {
#       "session_id": "...",
#       "interrupt_id": "v1:before_tool_call:tooluse_xxx:yyy",
#       "name": "hitl-demo-delete_files-approval",
#       "reason": {...},
#       "status": "pending"
#     }
#   ],
#   "count": 1
# }
承認
curl -s -X POST http://localhost:9080/invocations \
  -H "Content-Type: application/json" \
  -d '{
    "action": "approve",
    "session_id": "YOUR_SESSION_ID",
    "interrupt_id": "YOUR_INTERRUPT_ID",
    "response": "y"
  }' | jq .
エージェント再開
curl -s -X POST http://localhost:9080/invocations \
  -H "Content-Type: application/json" \
  -d '{
    "action": "resume",
    "session_id": "YOUR_SESSION_ID"
  }' | jq .
完了確認
# Healthy に戻っていることを確認
curl -s http://localhost:9080/ping | jq .
# {"status": "Healthy", ...}

# 結果取得
curl -s -X POST http://localhost:9080/invocations \
  -H "Content-Type: application/json" \
  -d '{
    "action": "result",
    "session_id": "YOUR_SESSION_ID"
  }' | jq .

動作イメージ

タスク開始で、Runtimeを非同期実行しエージェントが動き出します。
指示的には「delete_fileというツールを使ってね」という感じにしています。
image.png

ただしそのdelete_fileツールは人間の承認が必要なツールとしているのでそこで処理が止まり、承認待ちリクエストとして表示されます。
承認すると、エージェントが処理を再開します。
image.png

最終的にタスクが完了したことを確認できます。
image.png

余談:非同期実行で8時間経つとどうなるのか

ずっと承認待ちの状態にして、8時間放置してみました。
結果としては、想定通りセッション終了。killedと出て音沙汰が無くなっていました。
image.png

image.png

まとめ

だいぶ自前実装も多くなってしまったのですが、なんとかRuntime上でHITLが実現できました。
次はdurable functionsよろしく、長期に渡って保持できるような仕組みを作りたいですね(ステート管理超大変そうですが)。

ということでDynamoDB版も書いてみました。

GitHubはこちらです。

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?