2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

 Strandsの割り込み(Interrupts)機能を試してみた

Posted at

はじめに

こんにちは!やくもです!
今回はStrandsAgentsに入ってた割り込み処理を試してみました。

また、Strands Agents を触っていると必ず出てくるのがエージェントループという考えです。
これは単なる内部処理の話ではなく、Strands が他のエージェントフレームワークと違って使いやすい・柔軟に制御できる理由そのものでもあります。 

本記事では、まずこのエージェントループという概念を押さえた上で、Strands がどうやってhuman-in-the-loopを実現しているのかを見ていきます。

Strandsにおけるエージェントループとは

Strands Agents の中心には、エージェントがタスクを完了するまで繰り返し思考し、外部ツールを呼び出し、結果を受け取ってまた判断するというループがあります。 

図にすると以下のような流れです

image.png

このループは、単純な「質問 → 回答」ではなく、

  • 質問の意図を解釈し
  • 必要な外部情報を取りにいき
  • その結果を踏まえてさらに思考する

という多段階の処理を可能にします。 

たとえば「セキュリティ脆弱性をコードベースから見つけてレポートを作る」といったタスクは、単一のLLM応答では難しいですが、このループを通じてファイル読み取り・解析・判断を何度も繰り返すことで実現できます。 

Strands の場合、このループがSDKとして提供されています。 

human-in-the-Loopとは

一方で、自律的に動くエージェントには制御が難しい側面があります。

  • 意図せず危険な操作をしてしまう
  • 自動で進みすぎて後戻りできない
  • 判断基準がブラックボックスになってしまう

といったケースです。

そこで重要になるのが human-in-the-loop (以下HITL)です。
HITL はその名の通り、自動処理の途中で、人の判断を介入させる仕組みを指します。 

Strands では、このHITL を実装しやすくするための仕組みが Interrupts(割り込み) です。Interrupts を使うと、ループの途中でエージェントを一旦止めて、人の入力を待ってから再開することができます。

image.png

たとえば、

  • 外部APIを叩く前に確認を取る
  • 次の行動に進む前に追加情報をユーザーから取得する

といった制御が、フックやツール定義の中から自然に組み込めるようになっています。 

このように、Strands のエージェンティックループは自律性を担保しつつ、Interrupts を介して human-in-the-loopを実現できるのがポイントです。

以降は実際に割り込みを試してみた例を示していきます。

実際にやってみる

以下で、どの箇所で割り込みを発生させるか、何パターンか試してみました。

前提

今回はStrands Agentsで実施しています。
環境構築や実施方法、基本的な概念は以下を参照ください。

ツール

まずは Strands で一番触る機会が多い ツール周りです。
Strands Agents では、モデルの判断だけではなく外部の処理を実行するためにツールを定義します。たとえばファイル操作系や API 呼び出し系の処理を @tool で包んで渡すだけで、

  • モデルが「ツールを使うべきだ」と判断する
  • 自動でそのツールを呼び出し
  • 結果をループの中で受け取って次の判断に使う

という流れで動きます。 

ただし Interrupts(割り込み)を使う場合は、ツールの中でも interrupt を発生できます。
具体的には、ツール内で tool_context.interrupt(...)を呼ぶと、処理の途中で一旦 Strands のループを止めて 人の入力を待つことができます

import json
from strands import Agent, tool
from strands.types.tools import ToolContext

class DeleteTool:
    def __init__(self, app_name: str) -> None:
        self.app_name = app_name

    @tool(context=True)
    def delete_files(self, tool_context: ToolContext, paths: list[str]) -> bool:
        approval = tool_context.interrupt(
            f"{self.app_name}-approval",
            reason={"paths": paths},
        )
        if approval.lower() != "y":
            return False
        # 実際の削除処理(省略)
        return True

agent = Agent(
    system_prompt="You delete files when asked.",
    tools=[DeleteTool("myapp").delete_files],
    callback_handler=None,
)

result = agent("Delete these files: ['/tmp/a.txt', '/tmp/b.txt']")

while result.stop_reason == "interrupt":
    responses = []
    for intr in result.interrupts:
        if intr.name == "myapp-approval":
            user_input = input(f"Delete {intr.reason['paths']} ? (y/N): ")
            responses.append({
                "interruptResponse": {
                    "interruptId": intr.id,
                    "response": user_input
                }
            })
    result = agent(responses)

print("MESSAGE:", json.dumps(result.message, ensure_ascii=False))

この仕組みを使うと、ユーザー確認が必要な操作を安全に実装できるようになります。
実際に動かすと以下のような感じ

image.png

今回はお試しでダミーのモック処理を行うツールを登録していますが、MCPやエージェントを登録した場合も同様に処理できます!
ぜひ試してみてください!

セッションマネージャー

次に、Interruptsを使って最初に戸惑ったのがセッションマネージャーの役割です。

一見すると、割り込みはその場で入力待ちになるように見えますが、実際にはそうではありません。
Strands の interrupt は処理を一時停止したという情報を AgentResult に載せて返すだけで、実行自体は一度終了します。

そのため、割り込みのあとに処理を再開するには、

  • どこで止まっていたのか
  • どのツールを呼ぼうとしていたのか
  • どんな状態(state)を持っていたのか

といった情報を保持しておく必要があります。
これを担っているのが セッションマネージャーです。

以下、コード本文

import json
from pathlib import Path
from typing import Any

from strands import Agent, tool
from strands.agent import AgentResult
from strands.hooks import BeforeToolCallEvent, HookProvider, HookRegistry
from strands.session import FileSessionManager
from strands.types.agent import AgentInput

@tool
def inspect_files(paths: list[str]) -> dict[str, Any]:
    # ここではダミーで見たことにする。
    return {
        "count": len(paths),
        "paths": paths,
        "note": "This is a dummy inspect tool.",
    }


@tool
def delete_files(paths: list[str]) -> bool:
    # 本当はファイル削除などをする想定。
    print(f"[delete_files] (dry-run) would delete: {paths}")
    return True


class ApprovalHook(HookProvider):

    def __init__(self, app_name: str) -> None:
        self.app_name = app_name

    def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
        registry.add_callback(BeforeToolCallEvent, self.approve)

    def approve(self, event: BeforeToolCallEvent) -> None:
        # delete_files 以外は何もしない
        if event.tool_use.get("name") != "delete_files":
            return

        state_key = f"{self.app_name}-approval"
        prev = event.agent.state.get(state_key)

        # すでに trust 済みならスキップ
        if prev == "t":
            return

        # tool input から paths を取得(無い場合は空扱い)
        paths = []
        try:
            paths = event.tool_use.get("input", {}).get("paths", []) or []
        except Exception:
            paths = []

        # ここで割り込み
        approval = event.interrupt(
            f"{self.app_name}-approval",
            reason={"paths": paths},
        )

        # 承認されなければツール呼び出しをキャンセル
        if approval.lower() not in ["y", "t"]:
            event.cancel_tool = "User denied permission to delete files"
            # state は残してもいいが、ここでは明示的に入れない
            return

        # y / t をセッション状態に保存
        event.agent.state.set(state_key, approval.lower())


APP_NAME = "myapp"

# 書き込み可能な場所(このファイルと同じディレクトリ配下)
SESSION_DIR = Path(__file__).parent / ".sessions"
SESSION_DIR.mkdir(parents=True, exist_ok=True)

AGENT = Agent(
    hooks=[ApprovalHook(APP_NAME)],
    session_manager=FileSessionManager(session_id=APP_NAME, storage_dir=str(SESSION_DIR)),
    system_prompt=(
        "You are an assistant that can inspect files and delete files. "
        "If the user asks to delete files, call delete_files with the given paths."
    ),
    tools=[delete_files, inspect_files],
    callback_handler=None,
)


def server(prompt: AgentInput) -> AgentResult:

    return AGENT(prompt)


def client(paths: list[str]) -> AgentResult:
    result = server(f"Delete these files: {paths}")

    while True:
        if result.stop_reason != "interrupt":
            break

        responses = []
        for intr in result.interrupts:
            if intr.name == f"{APP_NAME}-approval":
                user_input = input(
                    f'Do you want to delete {intr.reason["paths"]} (t/y/N): '
                )
                responses.append(
                    {
                        "interruptResponse": {
                            "interruptId": intr.id,
                            "response": user_input,
                        }
                    }
                )

        result = server(responses)

    return result


if __name__ == "__main__":
    paths = ["a/b/c.txt", "d/e/f.txt"]
    result = client(paths)
    print("STOP_REASON:", result.stop_reason)
    print("MESSAGE:", json.dumps(result.message, ensure_ascii=False))

今回のコードでは、FileSessionManagerを使っています。

session_manager=FileSessionManager(
    session_id="myapp",
    storage_dir="./.sessions"
)

これを指定すると、エージェントが割り込みに入ったタイミングで、

エージェントの内部状態

  • 未処理の interrupt
  • state に保存された値

などがファイルとして保存されます。
私の場合、以下のような形で保存されました。

image.png

その後、interruptResponseを渡して再度エージェントを呼び出すと、保存されていた状態を読み込んで処理が再開されます。

もしセッションマネージャーを指定しない場合、割り込み後にエージェントを呼び直しても、
最初から考え直すことになり、割り込みの続きが分からなくなります。

なお、agent.stateに保存している値(例:一度承認したら次からスキップするためのフラグ)は、このセッション内で共有される簡易的な状態管理用のメモです。
セッションマネージャーは「どこで止まったか」、state は「何を判断したか」を記録している、と考えると理解しやすいです。

ちょっと具体例

セッションマネージャーはわかりにくい可能性があるので、一つわかりやすい例を試してみました。

動作イメージ

主に以下のような形で動作します。

  • いくつまで数値を増やすか決める
  • 1加算するごとに、承認する
  • 途中で打ち切った場合、途中までの数値を保存する
  • もう一度実行した場合、前回の途中の数字からスタートできる

image.png

コード全部

実際に実行したコードはこんな感じです。

import json
from pathlib import Path
from typing import Any

from strands import Agent, tool
from strands.agent import AgentResult
from strands.hooks import BeforeToolCallEvent, HookProvider, HookRegistry
from strands.session import FileSessionManager
from strands.types.agent import AgentInput
from strands.types.tools import ToolContext

STATE_TOTAL_KEY = "counter_total"


@tool(context=True)
def get_total(tool_context: ToolContext) -> int:
    """
    現在の合計値を返す(agent.state から読む)
    """
    total = tool_context.agent.state.get(STATE_TOTAL_KEY)
    if total is None:
        total = 0
        tool_context.agent.state.set(STATE_TOTAL_KEY, total)
    return int(total)


@tool(context=True)
def add_one(tool_context: ToolContext) -> int:
    """
    合計に 1 を加算して、加算後の合計を返す(agent.state に保存)
    """
    total = tool_context.agent.state.get(STATE_TOTAL_KEY)
    if total is None:
        total = 0

    total = int(total) + 1
    tool_context.agent.state.set(STATE_TOTAL_KEY, total)

    
    print(f"[add_one] current total = {total}")
    return total

class ApprovalHook(HookProvider):

    def __init__(self, app_name: str) -> None:
        self.app_name = app_name

    def register_hooks(self, registry: HookRegistry, **kwargs: Any) -> None:
        registry.add_callback(BeforeToolCallEvent, self.approve)

    def approve(self, event: BeforeToolCallEvent) -> None:
        if event.tool_use.get("name") != "add_one":
            return

        current_total = event.agent.state.get(STATE_TOTAL_KEY)
        if current_total is None:
            current_total = 0

        approval = event.interrupt(
            f"{self.app_name}-approval",
            reason={"current_total": int(current_total)},
        )

        if approval.lower() != "y":
            event.cancel_tool = "User denied increment"
            return

APP_NAME = "counter_app"

SESSION_DIR = Path(__file__).parent / ".sessions"
SESSION_DIR.mkdir(parents=True, exist_ok=True)

AGENT = Agent(
    hooks=[ApprovalHook(APP_NAME)],
    session_manager=FileSessionManager(session_id=APP_NAME, storage_dir=str(SESSION_DIR)),
    system_prompt=(
        "You are a simple counter assistant.\n"
        "Rules:\n"
        "- Always call get_total first to know the current total.\n"
        "- Then, if the user requests to reach a target number, repeatedly call add_one until total >= target.\n"
        "- After each add_one, briefly state the current total.\n"
        "- Do not skip add_one calls.\n"
    ),
    tools=[get_total, add_one],
    callback_handler=None,
)


def server(prompt: AgentInput) -> AgentResult:
    return AGENT(prompt)

def run_counter(target: int) -> AgentResult:
    # いまの合計が state に残っている前提で「target まで進める」依頼
    result = server(f"Increase the total by 1 repeatedly until it reaches {target}.")

    while True:
        if result.stop_reason != "interrupt":
            break

        responses = []
        for intr in result.interrupts:
            if intr.name == f"{APP_NAME}-approval":
                cur = intr.reason.get("current_total", "?")
                user_input = input(f"Approve increment? current_total={cur} (y/N/q): ").strip()

                if user_input.lower() == "q":
                    print("Quit requested. Session is saved. Run again to resume.")
                    return result  # 途中終了(保存は FileSessionManager が担当)

                responses.append(
                    {
                        "interruptResponse": {
                            "interruptId": intr.id,
                            "response": user_input,
                        }
                    }
                )

        result = server(responses)

    return result

if __name__ == "__main__":
    try:
        target = int(input("Target total? (e.g. 10): ").strip())
    except ValueError:
        print("Please input an integer.")
        raise SystemExit(1)

    result = run_counter(target)

    print("STOP_REASON:", result.stop_reason)
    print("MESSAGE:", json.dumps(result.message, ensure_ascii=False))

実行状況は以下のような感じになります。
一度処理を打ち切り、再度実行しても前回の数値から再開できていることがわかるかと思います。
これは前回の数値をセッションとして保存しているからです。

image.png

セッションマネージャがないとどうなるか

セッションマネージャーがないと、これは毎回処理は最初からやり直しになります。
それは流石に色々面倒ですし、場合によっては処理の生合成が取れなくなってしまいますよね。
必要ならば使ってみるのもいいかもしれないです。

さいごに

ということで、Strandsの割り込み処理を試してみました。
自動化を進めるほど人がどこで介入できるかは設計上の大きなテーマになりますよね。
Strands の Interrupts は、その境界をコードレベルで分かりやすく表現できる仕組みだと感じます。

一方で、Interrupts を成立させるためには セッションマネージャーの存在が重要で、これがないと割り込み後の再開が難しくなってしまいます。
エージェントの途中状態を保持し、続きを再生できる仕組みがあるからこそ、human-in-the-loop を前提としたワークフローが現実的になります。

この辺のAIのワークフローは、他にもLambdaのDurable FunctionsやStep Functionsでも代用が可能な場合があります。
human-in-the-loopに限らず、これらの使い分けについても今後の記事で掘り下げていきたいと思います。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?