tl;dr
- AgentCore RuntimeでHuman in the Loopしたいなら、非同期実行で頑張ろう
- 最大8時間待機可能
- 待機中はメモリのみ課金のサーバーレスサービス
- Human in the Loopと相性が結構良さそう
- 実装としてはStrandsのHooksやInterruptを使おう
- ツール実行時に割り込み処理を入れる
- その後、人間の承認を受け取って処理を再開する流れ
はじめに・概要
こんにちは、ふくちです。
今回はStrands Agents×AgentCore RuntimeでHuman in the Loop(以降HITLと記載)を実現するための方法とその設計思想・実装詳細・動作原理・ハマりポイントなどを解説します。
HITLとは、AIエージェントの出力や実行予定アクションなどを、人間が確認してOK/NGなどの判断を下すようなものです。
コーディングエージェントを使っている際にツール利用やプラン作成時に確認を求められることがあると思いますが、これをStrands×AgentCoreで実現するというのがゴールです。

設計フェーズ
まずは今回の構成を作るにあたっての全体図を解説します。
技術スタック
できるだけシンプルにしています。複雑なステート管理を無くし、最小構成でHITLを実現しています。
| 技術 | 役割 | 採用理由 |
|---|---|---|
| Strands Agents | AIエージェント構築フレームワーク | Interrupts機能でツール実行前の承認フローを実装可能 |
| AgentCore Runtime | エージェント実行基盤 | 非同期タスク実行可能 最大8時間のセッション継続 |
| AgentCore Runtime内のメモリ | ステート管理 | 高速、外部依存なし ※AgentCore Memoryとは異なる |
アーキテクチャ
- AgentCore Runtimeの
POST /invocationsでエージェントを非同期に実行する- 入力ペイロードのaction次第で動作が変わる形(自前実装)
-
start: プロンプトを受けてエージェントをバックグラウンドで開始 -
list_pending: メモリー内の承認待ちリクエスト一覧を返す -
approve/reject: 人間からの承認/拒否をメモリーに記録する -
resume: 記録された承認レスポンスでエージェントを再開 -
result: セッションの実行結果を取得
- エージェントが承認必要ツール(delete_fileなど)を使おうとした際にApprovalHookが起動してセッションが一時停止し、人間の承認を待つ
- 承認を待っている間含め、Runtimeの非同期実行中は
/pingがHealthyBusyになる - セッションは最大8時間継続する
AgentCore Runtimeの非同期実行については以前のブログをご参照ください。
Runtimeの /ping ステータス遷移や、全体フローとしては以下の通りです。
各用語・コンポーネントを整理すると以下のようになります。
| コンポーネント | 責務 |
|---|---|
| AgentCore Runtime | インフラ部分 (HTTPエンドポイント提供、非同期タスク管理、ステート管理) |
| Strands Agent | エージェント実装部分(LLMとの対話、ツール実行) |
| ApprovalHook | HITL実施地点(危険ツール検知、Interrupt発火、承認結果の処理) |
| session_states | Runtime内のメモリー内で保管される エージェントインスタンスID、実行結果、タスクIDなど |
| pending_approvals | Runtime内のメモリー内で保管される 承認待ちリクエストの管理など |
課金形態
AgentCore Runtimeの課金モデルは以下リンクに記載されています。
注意点として、HealthyBusy中もメモリ課金は発生します。ただし、CPUにおいてI/O待ち(承認待ち)中は課金されません。
Lambda durable functionsも似たような感じで、実行そのものにはお金かかりませんがデータ保存のところでお金がかかるようです。
実装詳細解説
ここからは具体的な実装の話をしていきます。
1. メモリ内ストレージでステート管理
エージェントのステートと承認待ちステートをメモリ内で管理します。
外部(DynamoDBなど)でのステート管理も可能だと思いますが、ひとまずRuntime内でステートを保持するこの形が一番シンプルだと思います。
データ構造は以下のとおりです。
# session_states の構造
session_states = {
"abc123-def456": {
"agent": <strands.Agent>, # エージェントインスタンス(再開用)
"result": <AgentResult>, # interrupt情報を含む
"status": "waiting_approval", # pending/completed/error
"prompt": "Delete /tmp/test.txt",
"task_id": xxxxxxxxxx, # タスク開始時に登録するタスクID
}
}
# pending_approvals の構造
pending_approvals = {
"abc123-def456": [
{
"session_id": "abc123-def456",
"interrupt_id": "v1:before_tool_call:tooluse_xxx:yyy",
"name": "hitl-demo-delete_files-approval",
"reason": {
"tool": "delete_files",
"input": {"paths": ["/tmp/test.txt"]},
"message": "Tool 'delete_files' requires human approval..."
},
"status": "pending", # pending/approved/rejected
"response": None, # 承認時: "y", 信頼時: "t", 拒否時: "n"
"created_at": "2024-01-16T12:00:00",
}
]
}
interruptが発生したタイミング、すなわち承認が必要なツールを使おうとしたタイミングでセッションIDや現在の処理ステータスなどをRuntime内メモリに保管しておくような処理になっています。
session_statesにエージェントインスタンスを保持している理由は、interruptResponse形式で再開する際に同一インスタンスが必要なためです。
task_idを保存することで、resume時に元タスクを完了させるという動きに持っていきます。これがないとresumeした際に新タスクが立ち上がってしまいます。
また、リスト形式の承認待ちにすることで、1セッションにおける複数interrupt発生にも対応できるようにしています。
2. ApprovalHook(承認フック)の実装
用途は、危険なツールが実行される前にInterruptを発火し、人間の承認を要求することです。
上記以外にも、例えばエージェントによるメール送信・作業計画作成など、人間のレビューが必要な場面で起動するHookも有効です。
StrandsのHooks機能
Hooksはエージェントの動作に割り込むための仕組みです。
エージェントのライフサイクル(開始→モデル呼び出し→ツール実行→完了)の各ポイントにHooksを掛けて、自分の処理を挟み込むことができます。
エージェントのライフサイクルは以下の形であり、四角で囲ってあるポイントごとにHooksを設定することができます。
もう少し補足を加えると、以下のようになります。
┌──────────────────────────────────────────────────────────────────────┐
│ エージェントのライフサイクル │
├──────────────────────────────────────────────────────────────────────┤
│ │
│ ユーザー入力 │
│ │ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ AgentInitialized │ ← エージェント初期化完了時 │
│ └─────────┬─────────┘ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ BeforeInvocation │ ← リクエスト開始時 │
│ └─────────┬─────────┘ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ BeforeModelCall │ ← LLMに問い合わせる前 │
│ └─────────┬─────────┘ │
│ ▼ │
│ [LLM処理] │
│ │ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ AfterModelCall │ ← LLMから応答が返ってきた後 │
│ └─────────┬─────────┘ │
│ ▼ │
│ LLMが「ツールを使いたい」と判断 │
│ │ │
│ ▼ │
│ ┌───────────────────────────────────────────────────────┐ │
│ │ BeforeToolCall ← ツール実行前(本実装で使用) │ │
│ │ ここで「承認が必要」と判断してInterruptを発火 │ │
│ └─────────┬─────────────────────────────────────────────┘ │
│ ▼ │
│ [ツール実行] │
│ │ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ AfterToolCall │ ← ツール実行後 │
│ └─────────┬─────────┘ │
│ ▼ │
│ ┌───────────────────┐ │
│ │ AfterInvocation │ ← リクエスト完了時 │
│ └───────────────────┘ │
│ │
└──────────────────────────────────────────────────────────────────────┘
利用可能なフックイベント一覧と、用途を纏めると以下のようになります。
| イベント | 発火タイミング | 主な用途 |
|---|---|---|
AgentInitializedEvent |
エージェント初期化完了時 | 初期設定、ログ開始 |
BeforeInvocationEvent |
リクエスト開始時 | 認証チェック、カウンターリセット |
AfterInvocationEvent |
リクエスト完了時(成功/失敗問わず) | ログ記録、クリーンアップ |
MessageAddedEvent |
会話履歴にメッセージ追加時 | 会話ログ、フィルタリング |
BeforeModelCallEvent |
LLMに問い合わせる前 | プロンプト加工、レート制限 |
AfterModelCallEvent |
LLMから応答が返った後 | 応答の検証、リトライ判定 |
BeforeToolCallEvent |
ツール実行前 ★本実装で使用 | 承認要求、引数の検証・修正 |
AfterToolCallEvent |
ツール実行後 | 結果の加工、ログ記録 |
Hooks機能の実装方法
Hooksを使うには、HookProviderインターフェースを実装したクラスを作成します。
このクラス内でresister_hooks()メソッドを使うことで、エージェントのイベントとコールバック関数を紐づけることができます。
from strands.hooks import BeforeToolCallEvent, HookProvider, HookRegistry
class MyHook(HookProvider):
"""
HookProvider を継承すると、
「register_hooks」メソッドでイベントとコールバックを紐付けられる
"""
def register_hooks(self, registry: HookRegistry, **kwargs) -> None:
# BeforeToolCallEvent が発火したら self.my_callback を呼ぶように設定
registry.add_callback(BeforeToolCallEvent, self.my_callback)
def my_callback(self, event: BeforeToolCallEvent) -> None:
# ツールが実行される直前に、この関数が自動的に呼ばれる
print(f"ツール {event.tool_use['name']} が実行されようとしています")
また、余談ですがBeforeToolCallEventにおいては、以下のプロパティを操作することでツール実行を制御できます。
| プロパティ | 型 | 説明 |
|---|---|---|
event.tool_use["name"] |
str | 実行されるツールの名前 |
event.tool_use["input"] |
dict | ツールに渡される引数 |
event.cancel_tool |
str | None | 文字列を設定すると ツール実行をキャンセルする (その文字列がエラーメッセージになる) |
event.selected_tool |
Tool | 別のツールに差し替え可能 |
event.agent |
Agent | エージェントインスタンスへの参照 |
event.agent.state |
State | 状態を保存/取得できるオブジェクト |
Interruptsとは?
Interrupts(割り込み)は、エージェントの実行を一時停止して、人間の入力を待つ仕組みです。
現在のところ、割り込みが可能なのは先程のエージェントのライフサイクルにおける BeforeToolCallEvent のみとなっています。
ここで割り込みを行うことで、ツール呼び出しの実行前に割り込みを行い、人間の承認や追加入力を求めることができます。
まさしくHITLを実現するための機能です。
Interruptの動作フロー
まず、エージェントが特定のツールを使おうとしてHooksが発火し、interrupt状態へ移行し一時停止します。
エージェントのステートとしては、result.stop_reason == "interrupt" です。
event.interrupt()で作成されるInterruptオブジェクトには以下の情報が含まれます。
class Interrupt:
id: str # 一意の識別子(例: "v1:before_tool_call:tooluse_xxx:yyy")
name: str # 開発者が指定した名前(例: "hitl-demo-delete_files-approval")
reason: Any # 開発者が指定した理由(任意のJSON化可能オブジェクト)
response: Any # 人間からの応答(resume後に設定される)
ユーザーがツールの使用を承認したら、delete_filesツールが実行されます。
ただしこの時、複数のInterruptがある場合はすべてに応答しないとresumeできない点に注意が必要です。
# 単一のinterruptに応答する場合
responses = [
{
"interruptResponse": {
"interruptId": interrupt.id, # どのinterruptへの応答か
"response": "y" # 承認の場合は "y"、拒否は "n"、信頼は "t"
}
}
]
# 複数のinterruptがある場合は、すべてに応答が必要
responses = [
{"interruptResponse": {"interruptId": "interrupt_1", "response": "y"}},
{"interruptResponse": {"interruptId": "interrupt_2", "response": "n"}},
]
# エージェントを再開
result = agent(responses)
Interruptを発火できる場所はHooksの他にTools内でも設定できるようです。
今回はHooksしか試していませんが、詳しく知りたい方は公式ドキュメントをご参照ください。
| 場所 | 方法 | 説明 |
|---|---|---|
| Hooks内 | event.interrupt(name, reason) |
BeforeToolCallEventで使用 |
| Tools内 | tool_context.interrupt(name, reason) |
ToolContextを受け取るツールで使用 |
3. エージェント開始処理
エージェントをバックグラウンドで開始します。interrupt発生時でもHealthyBusy状態を維持するため、最長約8時間の待機時間を確保することができます。
interrupt_occurredフラグはcomplete_async_task()の呼び出しを制御しています。
これはPython側の挙動の話が絡んでくるようです。Pythonではreturnしてもfinally節は実行されます。
そのため、interrupt_occurredフラグを使ってcomplete_async_task()の呼び出しを制御しています。
# Pythonの try-finally の挙動
try:
if condition:
return # ← returnしても…↓
finally:
do_something() # ← finallyは必ず実行される
4. エージェント再開処理
承認後にエージェントを中断点(=interruptしていた箇所)から再開するための処理です。
先程までinterruptしていたタスクを指定して再実行する必要があるため、再開時はtask_idを指定しています。
この時はinterruptResponceを用いて以下の形式でレスポンスを渡します。
Runtime内メモリにinterruptIdを保管していたのは、ここで使用するためです。
# Strands Agentsが期待するフォーマット
responses = [
{
"interruptResponse": {
"interruptId": "v1:before_tool_call:tooluse_xxx:yyy", # interrupt.id
"response": "y" # 承認結果("y", "t", "n")
}
}
]
# これをagent()に渡すとinterrupt地点から再開
result = agent(responses)
# ApprovalHook.approve()内のinterrupt()呼び出しが
# このresponse値を返り値として受け取る
approval = event.interrupt(...) # ← 承認した場合は "y" が返る
ハマりポイントと解決策
開発中にハマったポイントとその解決策を共有しておきます。
1. 処理完了後もHealthyBusyのまま(Healthyに戻らない)
一度ローカル環境で試していた際に、以下のような事象に遭遇しました。
# 承認してresumeしたのにHealthyBusyのまま
$ curl http://localhost:9080/ping
{"status": "HealthyBusy", ...}
# pending_approvalsは空
$ curl -X POST http://localhost:9080/invocations -d '{"action": "list_pending"}'
{"pending_approvals": [], "count": 0}
これの原因は、resumeする際に新しいタスクを起動していたためです。
普通、処理開始時点(ここではstart_agent_task())で起動したタスクを完了時に終了することが求められます。
しかし、実装ミスでinterrupt後のresume_agent_task()で処理再開時に新しいタスクを起動してしまっていました。
こうすると最初に開始した処理が宙ぶらりんになってしまい処理が継続している扱い(=ずっとinterruptされているタスクがある状態)になってしまうため、処理が完了してもずっとステータスがHealthyBusyになります。
# start_agent_task
task_id_A = app.add_async_task(...) # タスクA作成 → HealthyBusy
# interrupt発生 → complete_async_task()呼ばない → タスクA未完了
# resume_agent_task(修正前)
task_id_B = app.add_async_task(...) # ★ 新しいタスクBを作成
# 処理完了
app.complete_async_task(task_id_B) # タスクBのみ完了
# タスクAが未完了のまま → HealthyBusy継続
解決策として、本実装ではtask_idをメモリーに保存することで対処しています。
start_agent_task()で起動した際のtask_idを指定することで、処理を正しく終えることができるようになります。結果として、ステータスもHealthyに戻るようになります。
# start_agent_task
task_id_A = app.add_async_task(...) # タスクA作成 → HealthyBusy
# resume_agent_task(修正後)
# 処理再開時に新しいタスクは作らない、元のtask_idを使用
task_id = state.get("task_id")
# 処理完了
app.complete_async_task(task_id) # 元のタスクAを完了 → Healthy
2. 承認待ち一覧が空になる
ローカルでは正しく動いたのですが、クラウド上(AgentCore Runtime上)へデプロイした際に正しく動かなくなったという例の共有です。
タスク自体は上手く開始でき、interrupt発生までは確認できたのですが、承認待ちリストを確認すると0件になっていました。
# タスク開始は成功
{"status": "started", "session_id": "845f9d99-...", "task_id": -24242039...}
# CloudWatchログではinterrupt発生を確認
[HITL] Agent interrupted, 1 approval(s) pending
[HITL] Saved pending approval: v1:before_tool_call:...
[HITL] Staying HealthyBusy to preserve memory state
# しかしlist_pendingは空を返す
{"pending_approvals": [], "count": 0}
原因としては、session_idを指定していなかったため、同じVMにルーティングされていなかったことです。
ローカルだと自分のマシンしか無いのでsession_idが違うことでVMが変わることまで意識できなかったのですが、クラウド上だとこういう問題が起こりました。
AgentCore RuntimeはruntimeSessionIdでリクエストをルーティングします。
これを渡しておくと、同じVMにルーティングされます。しかしこれを指定しない場合は新しいセッション(=新しいVM)にルーティングされます。
解決策として、すべてのAPI呼び出しでruntimeSessionId相当のものを指定すればOKです。
Streamlit側から呼び出すとしたらこんな感じです。
# 1. タスク開始時にセッションIDを保存
if "session_id" in result:
st.session_state.active_session_id = result["session_id"]
# 2. list_pendingで同じセッションIDを使用
active_session = st.session_state.get("active_session_id")
pending_result = invoke_agentcore(
{"action": "list_pending"},
session_id=active_session
)
# 3. approve/reject/resumeでも同様
approve_result = invoke_agentcore(
{"action": "approve", "session_id": session_id, ...},
session_id=session_id
)
そしてAgentCore Runtimeはboto3で呼び出します。
invoke_agent_runtime()は、同期・非同期どちらの呼び出しも可能なので、同期処理で使う時と比べて特に書き換え不要です!嬉しいところですね〜
def invoke_runtime(payload: dict, session_id: str = None) -> dict:
client = boto3.client("bedrock-agentcore", region_name=AWS_REGION)
kwargs = {
"agentRuntimeArn": AGENT_RUNTIME_ARN,
"payload": json.dumps(payload),
"qualifier": "DEFAULT",
}
# runtimeSessionIdを指定して同じコンテナにルーティング
if session_id:
kwargs["runtimeSessionId"] = session_id
response = client.invoke_agent_runtime(**kwargs)
...
ローカル環境だと、単一プロセスなのでruntimeSessionIdが不要です。
一方AWS環境だと、複数のVMが立ち上がってくるのでruntimeSessionIdが無いと、対象のタスク状況を確認できません。
この差異は注意が必要です。
3. ローカル環境での動作確認
curlでテストを行うことができます。手元の環境でサクッと見てみたい場合はこれで良さそうです。
ただし上記でも述べた通り、AgentCore Runtimeはローカルとクラウドで一部挙動や考慮しないといけない箇所が変わってきます。
基本的にはクラウド上で動作確認することをおすすめします。
ただ、個人的には非同期実行時はローカル環境の方がログ見やすかったです。/ping が打てるのも嬉しかったですね。
この辺はお好みで…と言う感じでしょうか。
curl -s -X POST http://localhost:9080/invocations \
-H "Content-Type: application/json" \
-d '{
"action": "start",
"prompt": "Please delete the file /tmp/test.txt"
}' | jq .
# 期待するレスポンス:
# {
# "status": "started",
# "session_id": "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx",
# "task_id": 1737012345678
# }
# interrupt発生後もHealthyBusy
curl -s http://localhost:9080/ping | jq .
# {"status": "HealthyBusy", ...}
curl -s -X POST http://localhost:9080/invocations \
-H "Content-Type: application/json" \
-d '{"action": "list_pending"}' | jq .
# 期待するレスポンス:
# {
# "pending_approvals": [
# {
# "session_id": "...",
# "interrupt_id": "v1:before_tool_call:tooluse_xxx:yyy",
# "name": "hitl-demo-delete_files-approval",
# "reason": {...},
# "status": "pending"
# }
# ],
# "count": 1
# }
curl -s -X POST http://localhost:9080/invocations \
-H "Content-Type: application/json" \
-d '{
"action": "approve",
"session_id": "YOUR_SESSION_ID",
"interrupt_id": "YOUR_INTERRUPT_ID",
"response": "y"
}' | jq .
curl -s -X POST http://localhost:9080/invocations \
-H "Content-Type: application/json" \
-d '{
"action": "resume",
"session_id": "YOUR_SESSION_ID"
}' | jq .
# Healthy に戻っていることを確認
curl -s http://localhost:9080/ping | jq .
# {"status": "Healthy", ...}
# 結果取得
curl -s -X POST http://localhost:9080/invocations \
-H "Content-Type: application/json" \
-d '{
"action": "result",
"session_id": "YOUR_SESSION_ID"
}' | jq .
動作イメージ
タスク開始で、Runtimeを非同期実行しエージェントが動き出します。
指示的には「delete_fileというツールを使ってね」という感じにしています。

ただしそのdelete_fileツールは人間の承認が必要なツールとしているのでそこで処理が止まり、承認待ちリクエストとして表示されます。
承認すると、エージェントが処理を再開します。

余談:非同期実行で8時間経つとどうなるのか
ずっと承認待ちの状態にして、8時間放置してみました。
結果としては、想定通りセッション終了。killedと出て音沙汰が無くなっていました。

まとめ
だいぶ自前実装も多くなってしまったのですが、なんとかRuntime上でHITLが実現できました。
次はdurable functionsよろしく、長期に渡って保持できるような仕組みを作りたいですね(ステート管理超大変そうですが)。
ということでDynamoDB版も書いてみました。
GitHubはこちらです。





