169
126

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Strands & AgentCoreハンズオン! MCPマルチエージェントをAWSに簡単デプロイ

Last updated at Posted at 2025-08-02

この記事は人力で書きました。

このイベント用の手順書ですが、本記事を読めば1時間ぐらいで誰でも試せます!

Claude Codeなど、AIエージェントを「使う」人はかなり増えてきました。

もはや、AI使うのは全員やって当たり前。
僕らは一歩進んで、AIエージェントを「作る」方にも入門しましょう。100倍楽しいですよ🙌

当日までの事前準備

AWSアカウントの作成

新規作成推奨。既存アカウントを使う際は自己責任でお願いします。

※最近は新しい無料専用プランが出てきますが「有料」を選んでください。

今回の課金は数十円レベルの想定です。(100%保証はできないので、自己責任でお願いします)

GitHubアカウントの作成

既に持っている方は既存アカウントをお使いください。

前説(Strands、AgentCoreって何?)

  • Strands: AIエージェント開発用のPythonフレームワーク。AWS製のOSS
  • AgentCore: Bedrockの姉妹サービス。AIエージェント用の便利パーツ的なAPI群

0. 環境構築

0-1. 開発環境のセットアップ

GitHubリポジトリの作成

  • サインイン後、トップページ左上「New」より新規リポジトリを作成
    • Repository name: strands-agentcore
    • Choose visibility: Private
    • Add README: On

GitHub Codespacesの起動

  • リポジトリ作成後、画面右上の緑色ボタン「Code > Create codespace on main」

必要なファイルの作成

  • コードスペース画面下部のターミナルで以下コマンドを実行
touch .gitignore
touch .env
  • 各ファイルに以下を記載する
.gitignore
.env
.bedrock_agentcore.yaml

本ハンズオンではGitHubへのプッシュ(コードのアップロード)は不要ですが、誤って認証情報をアップしてしまわないように .env を作成しておきます。

.env
# AWS認証情報
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
AWS_DEFAULT_REGION=us-west-2

コードスペースではファイルに入力した瞬間、自動で上書き保存されます。
xxx 部分はこの次の手順で埋めます。

0-2. AWSアカウントのセットアップ

IAMユーザーの作成

このハンズオンは、すべてオレゴンリージョンで行います。
最初に画面右上のリージョンを切り替えておいてください。

  • 「IAM」を検索してアクセス
  • サイドバー「ユーザー」より新規ユーザーを作成
    • ステップ1
      • ユーザー名: codespaces
    • ステップ2
      • 許可のオプション: ポリシーを直接アタッチする
      • 許可ポリシー: AdministratorAccess にチェック
    • 後はそのまま進む

IAMアクセスキーの作成

  • IAMユーザー作成後、作ったユーザー名をクリックして開く
  • 概要セクションより「アクセスキーを作成」
    • ステップ1
      • ユースケース: コマンドラインインターフェイス (CLI)
      • 「上記のレコメンデーションを理解し、アクセスキーを作成します。」にチェック
    • 後はそのまま進む
  • 作成後、アクセスキーとシークレットアクセスキーを、コードスペースの .env に貼り付ける

この認証情報は、管理者権限を持っているため慎重に扱ってください。

もし誤って公開GitHubリポジトリにPushしたりすると、秒で悪い人に検知されて、あなたのAWSアカウントを悪用されたりします。

Bedrockのモデル有効化

  • AWSマネコンで「Amazon Bedrock」を検索してアクセス
  • サイドバー「モデルアクセス」より「すべてのモデルを有効化」
    • ステップ1:そのまま進む
    • ステップ2:Anthropic用にユースケースの詳細を追加
      • 対象のユーザーは誰ですか?: 社内の従業員
      • ユースケースの詳細を入力してください: hands-on
    • 後はそのまま進む

1. Strands入門

  • コードスペースで画面下部のターミナルを利用
  • 以下コマンドで、新規ディレクトリを作りそこに移動
mkdir 1_strands
cd 1_strands

1-1. 三行エージェント

  • 新規ファイルを作成
touch 1_agent.py
  • 以下コードをファイルに貼り付け
1_strands/1_agent.py
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent

# .envファイルから環境変数を読み込む
load_dotenv()

# エージェントを作成して起動
agent = Agent("us.anthropic.claude-3-7-sonnet-20250219-v1:0")
agent("JAWS-UGって何?")

(すいません盛りました。 .envが無ければ三行なんです)

  • 以下コマンドで、必要なモジュールをインストールしてから実行
pip install strands-agents python-dotenv
python 1_agent.py

参考:Bedrockの他モデルはもちろん、OpenAIのモデルなども呼べます!

1-2. ツールを持たせてみる

  • 新規ファイルを作成
touch 2_tool.py
  • 以下コードをファイルに貼り付け
1_strands/2_tool.py
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent, tool

# .envファイルから環境変数を読み込む
load_dotenv()

# 文字カウント関数をツールとして定義
@tool
def counter(word: str, letter: str):
    return word.lower().count(letter.lower())

# エージェントを作成
agent = Agent(
    model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    tools=[counter]
)

# エージェントを呼び出し
agent("Strandsの中にSはいくつある?")
  • 以下コマンドで実行
python 2_tool.py

もしAIエージェントの回答が途中で止まってしまう場合は、BedrockのClaudeモデル呼び出し分間クォータが上限に達し、Strandsが裏でリトライをかけ続けている可能性があります。
そのときは他のモデルに変更してみてください。

(代替候補のモデルID)

  • us.anthropic.claude-3-5-sonnet-20241022-v2:0
  • us.anthropic.claude-3-5-sonnet-20240620-v1:0
  • us.anthropic.claude-3-5-haiku-20241022-v1:0

1-3. MCPサーバーを持たせてみる

  • 新規ファイルを作成
touch 3_mcp.py
  • 以下コードをファイルに貼り付け
1_strands/3_mcp.py
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent
from strands.tools.mcp import MCPClient
from mcp import stdio_client, StdioServerParameters

# .envファイルから環境変数を読み込む
load_dotenv()

# MCPクライアントを作成
mcp = MCPClient(lambda: stdio_client(
    StdioServerParameters(
        command="uvx", 
        args=["strands-agents-mcp-server"]
    )
))

# MCPクライアントを起動しながら、エージェント作成&呼び出し
with mcp:
    agent = Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        tools=mcp.list_tools_sync()
    )
    agent("StrandsでA2Aサーバーの最小サンプルコードを書いて!")
  • 以下コマンドで実行
pip install uv strands-agents-tools
python 3_mcp.py

1-4. マルチエージェントにしてみる

監督者がサブエージェントをツールのように呼び出す「Agent-as-Tools」パターンを使います。(一番オーソドックスで実装も簡単)

  • 新規ファイルを作成
touch 4_multi_agent.py
  • 以下コードをファイルに貼り付け
1_strands/4_multi_agent.py
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent, tool
from strands_tools import calculator

# .envファイルから環境変数を読み込む
load_dotenv()

# サブエージェント1を定義
@tool
def math_agent(query: str):
    agent = Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        system_prompt="ツールを使って計算を行ってください",
        tools=[calculator]
    )
    return str(agent(query))

# サブエージェント2を定義
@tool
def haiku_agent(query: str):
    agent = Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        system_prompt="与えられたお題で五・七・五の俳句を詠んで"
    )
    return str(agent(query))

# 監督者エージェントの作成と実行
orchestrator = Agent(
    model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
    system_prompt="与えられた問題を計算して、答えを俳句として詠んで",
    tools=[math_agent, haiku_agent]
)
orchestrator("十円持っている太郎くんが二十円もらいました。今いくら?")
  • 以下コマンドで実行
python 4_multi_agent.py

2. AgentCore入門

これまでは作ったエージェントをデプロイしたいとき、やれLambdaかECSか、認証やストリーミング、監視はどうする…など細かい悩みが尽きませんでしたが、AgentCoreはこれをまるっと解決してくれる救世主です!

今回は、AgentCoreの機能のうち「ランタイム」と「オブザーバビリティ」を体験します。

スクリーンショット 2025-08-03 23.14.45.png

AgentCoreランタイムはAIエージェント専用Lambdaのようなもので、そこで稼働するアプリを簡単に作るためのPython用SDKが提供されています。(実態はFastAPIのような、StarletteベースのAPIフレームワーク)

さらに、そのアプリを簡単にAWSへデプロイしたり、ローカル実行できるCLIベースのスターターツールキットも提供されています。

  • 新規ディレクトリを作って、そこに移動
cd /workspaces/strands-agentcore

mkdir 2_agentcore
cd 2_agentcore

2-1. トレースの有効化

このハンズオンは、すべてオレゴンリージョンで行います。
最初に画面右上のリージョンを切り替えておいてください。

  • マネコンで「CloudWatch」を検索してアクセス
  • サイドバーの「トランザクション検索」より、「Enable Transaction Search」をクリック
  • チェックボックスにチェックを入れて「Save」

2-2. コードにAgentCoreを装着

AgentCoreビルド用のディレクトリを作り、そこに新規ファイルを作成

mkdir docker
cd docker

touch agentcore.py
  • 以下コードをファイルに貼り付け
2_strands/docker/agentcore.py
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent
from bedrock_agentcore.runtime import BedrockAgentCoreApp # 追加

# .envファイルから環境変数をロード
load_dotenv()

# Strandsでエージェントを作成
agent = Agent("us.anthropic.claude-3-7-sonnet-20250219-v1:0")

# ---------- 以下、追加コード--------------

# AgentCoreのサーバーを作成
app = BedrockAgentCoreApp()

# エージェント呼び出し関数を、AgentCoreの開始点に設定
@app.entrypoint
def invoke_agent(payload, context):

    # リクエストのペイロード(中身)からプロンプトを取得
    prompt = payload.get("prompt")
    
    # エージェントを呼び出してレスポンスを返却
    return {"result": agent(prompt).message}

# AgentCoreサーバーを起動
app.run()
  • 以下コマンドで実行
pip install bedrock-agentcore
python agentcore.py

AgentCoreサーバーが起動し、待ち受け状態になります。

ローカルでテスト

  • コードスペースのターミナル右上の「+」ボタンから、2つ目のターミナルを起動
  • 新規ファイルを作成
2つ目のターミナル
cd 2_agentcore
touch 1_client.py
  • 以下コードをファイルに貼り付け
2_agentcore/1_client.py
# 必要なライブラリをインポート
import requests

# ローカルサーバーにリクエストを実施
response = requests.post(
    url="http://localhost:8080/invocations",
    headers={"Content-Type": "application/json"},
    json={"prompt": "JAWS-UGって何?"}
)

# レスポンスを画面に表示
print(response.json())
  • 以下コマンドで実行
2つ目のターミナル
python 1_client.py
  • 2つ目のターミナルはもう使わないので、右側タブバーの🗑️ボタンから閉じる

2-3. AWSにデプロイ

  • 1つ目のターミナルに戻り、Ctrl + C で起動中のサーバーを停止

requirements.txt の作成

  • 新規ファイルを作成
cd /workspaces/strands-agentcore/2_agentcore/docker
touch requirements.txt
  • 以下を記入
2_agentcore/docker/requirements.txt
strands-agents
bedrock-agentcore

AgentCoreランタイムへのデプロイ

  • 以下コマンドを実行
# .env の内容をターミナルの環境変数に設定
export $(cat /workspaces/strands-agentcore/.env | grep -v ^# | xargs)

# AgentCoreのスターターツールキットをインストール
pip install bedrock-agentcore-starter-toolkit

# デプロイ準備
agentcore configure --entrypoint agentcore.py --name jawsug
  • ウィザードでは全て Enter でOK
🔐 Execution Role
Press Enter to auto-create execution role, or provide execution role ARN/name to use existing
Execution role ARN/name (or press Enter to auto-create):

🏗️  ECR Repository
Press Enter to auto-create ECR repository, or provide ECR Repository URI to use existing
ECR Repository URI (or press Enter to auto-create):

🔍 Detected dependency file: requirements.txt
Press Enter to use this file, or type a different path (use Tab for autocomplete):

🔐 Authorization Configuration
By default, Bedrock AgentCore uses IAM authorization.
Configure OAuth authorizer instead? (yes/no) [no]:
  • デプロイを実施
# デプロイ
agentcore launch

IAMロールとかECRリポジトリとか、全部自動で作ってくれるので超便利!
これだけで、AIエージェントをサーバーレス基盤上でホストしてくれます。

2-4. 動作確認

マネコンからテスト

  • マネコンで「Bedrock AgentCore」を検索してアクセス
  • サイドバー「Agent Sandbox」にアクセス
    • Runtime agent: jawsug
    • Endpoint: DEFAULT
    • Input: {"prompt": "JAWSUGって何?"}
      • ※初期値は削除してください
  • 「Run」をクリック

GUIアプリからテスト

  • サイドバー「Agent Runtime」から jawsug を開く
  • Pythonサンプルコードに表示されている agentRuntimeArn の値の文字列をコピーする
  • 以下の = 以降にコピーしたARNを貼り付け、コードスペースで .env ファイルで新しい行に追記
strands-agentcore/.env
# AWS認証情報
(中略)

# AgentCore設定
AGENT_RUNTIME_ARN=arn:aws:bedrock-agentcore:xxx(以下略)
  • 新規ファイルを作成
cd /workspaces/strands-agentcore/2_agentcore
touch 2_frontend.py
  • 以下コードをファイルに貼り付け
2_agentcore/2_frontend.py
# 必要なライブラリをインポート
from dotenv import load_dotenv
import os, asyncio, boto3, json, uuid
import streamlit as st

# .envファイルから環境変数をロード
load_dotenv(override=True)

# タイトルを描画
st.title("Strands on AgentCore")
st.write("何でも聞いてね!")

# チャットボックスを描画
if prompt := st.chat_input("メッセージを入力してね"):

    # ユーザーのプロンプトを表示
    with st.chat_message("user"):
        st.markdown(prompt)

    # エージェントの回答を表示
    with st.chat_message("assistant"):

        # AgentCoreランタイムを呼び出し
        with st.spinner("考え中…"):
            agentcore = boto3.client('bedrock-agentcore')
            response = agentcore.invoke_agent_runtime(
                agentRuntimeArn=os.getenv("AGENT_RUNTIME_ARN"),
                payload=json.dumps({"prompt": prompt})
            )

        # 結果のテキストを取り出して表示
        response_body = response["response"].read()
        response_data = json.loads(response_body.decode('utf-8'))
        st.write(response_data["result"]["content"][0]["text"])
  • 以下コマンドで起動
pip install streamlit
streamlit run 2_frontend.py
  • 右下に出るポップアップの「ブラウザーで開く」ボタンをクリック
    • 閉じてしまった場合は、ターミナルの http://localhost:8501 をクリックすればOK

2-5. 運用監視

簡易Langfuse的な機能(LLM版Datadogのようなもの)が自動で付いてるので便利です。

AgentCoreオブザーバビリティでトレース確認

  • マネコンで、AgentCoreのサイドバー「Agent Runtime」から jawsug をクリック
  • 上部の「Agent details」を展開して「View observability」をクリック
  • 「Traces」タブからトレースをクリックすると、エージェントの動作履歴をドリルダウンして確認できます

CloudWatchログの確認

うまく起動せず、トレースすら見られないときはサーバーログを見ましょう。

  • マネコンで「CloudWatch」を検索してアクセス
  • サイドバー「ロググループ」から /aws/bedrock-agentcore/runtimes/jawsug-<ランダム文字列>-DEFAULT をクリック
  • 「すべてのログストリームを検索」から、直近のサーバーログを確認できます

3. 本格アプリのデプロイ

マルチエージェントをAgentCoreランタイムに載せて、3体の行動をすべてリアルタイムにストリーミング表示します。(これめっちゃ実装大変でした…)

スクリーンショット 2025-08-03 13.40.28.png

  • 新規ディレクトリを作って、そこに移動
cd /workspaces/strands-agentcore

mkdir -p 3_advanced
cd 3_advanced

3-1. マルチエージェントの開発

  • 新規ファイルを作成
mkdir docker
cd docker

touch multiagent.py
  • 以下コードをファイルに貼り付け
3_advanced/dokcer/multiagent.py
3_advanced/dokcer/multiagent.py
import os, asyncio
from strands import Agent, tool
from strands.tools.mcp import MCPClient
from mcp import stdio_client, StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client
from bedrock_agentcore.runtime import BedrockAgentCoreApp

# =============================================================================
# サブエージェントのストリーミング処理
# =============================================================================

async def send_event(queue, message, stage, tool_name=None):
    """サブエージェントのステータスを送信"""
    if not queue:
        return
    
    progress = {"message": message, "stage": stage}
    if tool_name:
        progress["tool_name"] = tool_name
    await queue.put({"event": {"subAgentProgress": progress}})

async def merge_streams(stream, queue):
    """親子エージェントのストリームを統合"""
    create_task = asyncio.create_task
    main = create_task(anext(stream, None))
    sub = create_task(queue.get())
    waiting = {main, sub}
    
    # チャンクの到着を待機
    while waiting:
        ready_chunks, waiting = await asyncio.wait(
            waiting, return_when=asyncio.FIRST_COMPLETED
        )
        for ready_chunk in ready_chunks:
            # 監督者エージェントのチャンクを処理
            if ready_chunk == main:
                event = ready_chunk.result()
                if event is not None:
                    yield event
                    main = create_task(anext(stream, None))
                    waiting.add(main)
                else:
                    main = None
            
            # サブエージェントのチャンクを処理
            elif ready_chunk == sub:
                try:
                    sub_event = ready_chunk.result()
                    yield sub_event
                    sub = create_task(queue.get())
                    waiting.add(sub)
                except Exception:
                    sub = None
        
        if main is None and queue.empty():
            break

# =============================================================================
# サブエージェントの呼び出し
# =============================================================================

async def _extract(queue, agent, event, state):
    """ストリーミングから内容を抽出"""
    if isinstance(event, str):
        state["text"] += event
        if queue:
            delta = {"delta": {"text": event}}
            await queue.put(
                {"event": {"contentBlockDelta": delta}}
            )
    elif isinstance(event, dict) and "event" in event:
        event_data = event["event"]
        
        # ツール使用を検出
        if "contentBlockStart" in event_data:
            block = event_data["contentBlockStart"]
            start_data = block.get("start", {})
            if "toolUse" in start_data:
                tool_use = start_data["toolUse"]
                tool = tool_use.get("name", "unknown")
                await send_event(queue, 
                    f"{agent}」がツール「{tool}」を実行中", 
                    "tool_use", tool
                )
        
        # テキスト増分を処理
        if "contentBlockDelta" in event_data:
            block = event_data["contentBlockDelta"]
            delta = block.get("delta", {})
            if "text" in delta:
                state["text"] += delta["text"]
        
        if queue:
            await queue.put(event)

async def invoke_agent(agent, query, mcp, create_agent, queue):
    """サブエージェントを呼び出し"""
    state = {"text": ""}
    await send_event(
        queue, f"サブエージェント「{agent}」が呼び出されました", "start"
    )
    
    try:
        # MCPクライアントを起動しながら、エージェントを呼び出し
        with mcp:
            agent_obj = create_agent()
            async for event in agent_obj.stream_async(query):
                await _extract(queue, agent, event, state)
        
        await send_event(
            queue, f"{agent}」が対応を完了しました", "complete"
        )
        return state["text"]
    
    except Exception:
        return f"{agent}エージェントの処理に失敗しました"

# =============================================================================
# サブエージェント1: AWSマスター
# =============================================================================

class AwsMasterState:
    def __init__(self):
        self.client = None
        self.queue = None

_aws_state = AwsMasterState()

def setup_aws_master(queue):
    """新規キューを受け取り、MCPクライアントを準備"""
    _aws_state.queue = queue
    if queue and not _aws_state.client:
        try:
            _aws_state.client = MCPClient(
                lambda: streamablehttp_client(
                    "https://knowledge-mcp.global.api.aws"
                )
            )
        except Exception:
            _aws_state.client = None

def _create_aws_agent():
    """AWS知識参照エージェントを作成"""
    if not _aws_state.client:
        return None
    return Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        tools=_aws_state.client.list_tools_sync(),
        system_prompt="語尾は「〜ゾイ。」にしてください。検索・参照は手短にね。"
    )

@tool
async def aws_master(query):
    """AWSマスターエージェント"""
    if not _aws_state.client:
        return "MCPクライアントが利用不可です"
    return await invoke_agent(
        "AWSマスター", query, _aws_state.client,
        _create_aws_agent, _aws_state.queue
    )

# =============================================================================
# サブエージェント2: APIマスター
# =============================================================================

class ApiMasterState:
    def __init__(self):
        self.client = None
        self.queue = None

_api_state = ApiMasterState()

def setup_api_master(queue):
    """新規キューを受け取り、MCPクライアントを準備"""
    _api_state.queue = queue
    if queue and not _api_state.client:
        try:
            _api_state.client = MCPClient(
                lambda: stdio_client(StdioServerParameters(
                    command="uvx", args=["awslabs.aws-api-mcp-server"],
                    env=os.environ.copy()
                ))
            )
        except Exception:
            _api_state.client = None

def _create_api_agent():
    """API操作エージェントを作成"""
    if not _api_state.client:
        return None
    return Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        tools=_api_state.client.list_tools_sync(),
        system_prompt="ギャル風の口調で応対してください。"
    )

@tool
async def api_master(query):
    """APIマスターエージェント"""
    if not _api_state.client:
        return "MCPクライアントが利用不可です"
    return await invoke_agent(
        "APIマスター", query, _api_state.client,
        _create_api_agent, _api_state.queue
    )

# =============================================================================
# メイン処理
# =============================================================================

def _create_orchestrator():
    """監督者エージェントを作成"""
    return Agent(
        model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
        tools=[aws_master, api_master],
        system_prompt="""2体のサブエージェントを使って日本語で応対して。
1. AWSマスター:AWSドキュメントなどを参照できます。
2. APIマスター:AWSアカウントをAPIで操作できます。
語尾は「〜だキュウ。」にしてください。"""
    )

# アプリケーションを初期化
app = BedrockAgentCoreApp()
orchestrator = _create_orchestrator()

@app.entrypoint
async def invoke(payload):
    """呼び出し処理の開始地点"""
    prompt = payload.get("input", {}).get("prompt", "")
    
    # サブエージェント用のキューを初期化
    queue = asyncio.Queue()
    setup_aws_master(queue)
    setup_api_master(queue)
    
    try:
        # 監督者エージェントを呼び出し、ストリームを統合
        stream = orchestrator.stream_async(prompt)
        async for event in merge_streams(stream, queue):
            yield event
            
    finally:
        # キューをクリーンアップ
        setup_aws_master(None)
        setup_api_master(None)

# AgentCoreランタイムを起動
if __name__ == "__main__":
    app.run()

3-2. AWSにデプロイ

requirements.txt の作成

  • 新規ファイルを作成
cd /workspaces/strands-agentcore/3_advanced/docker
touch requirements.txt
  • 以下を記入
3_advanced/docker/requirements.txt
strands-agents
bedrock-agentcore
uv

AgentCoreランタイムへのデプロイ

  • 以下コマンドを実行
# .env の内容をターミナルの環境変数に設定
export $(cat /workspaces/strands-agentcore/.env | grep -v ^# | xargs)

# AgentCoreのスターターツールキットをインストール
pip install bedrock-agentcore-starter-toolkit

# デプロイ準備
agentcore configure --entrypoint multiagent.py
  • ウィザードでは全て Enter でOK
  • デプロイを実施
# デプロイ
agentcore launch
  • デプロイ完了時、ターミナルに出力される緑枠内 Agent ARN をコピーしておく

3-3. 動作確認

.env ファイルの更新

  • コードスペースの .env ファイルで、最後の行の = 以下に新しいAgent ARNを貼り付ける
strands-agentcore/.env
AGENT_RUNTIME_ARN=arn:aws:bedrock-agentcore:xxx(以下略)

IAMロールの権限追加

エージェントがAWSアカウント内を調査できるようにします。

  • マネコンで、AgentCoreのサイドバー「Agent Runtime」から multiagent を開く
  • 「Versions」から最新バージョンをクリック →「IAM service role」をクリック
  • 「許可を追加」→「ボリシーをアタッチ」をクリック
  • 「AWS管理 - ジョブ機能」で絞り込み、以下をチェックして「許可を追加」
    • Billing
    • ReadOnlyAccess

フロントエンドを開発

  • 新規ファイルを作成
cd /workspaces/strands-agentcore/3_advanced
touch frontend.py
  • 以下コードをファイルに貼り付け
3_advanced/frontend.py
3_advanced/frontend.py
from dotenv import load_dotenv
import os, json, uuid, asyncio, boto3
import streamlit as st

load_dotenv(override=True)

# =============================================================================
# ストリーミング処理
# =============================================================================

def create_state():
    """新しい状態を作成"""
    return {
        "containers": [],
        "current_status": None,
        "current_text": None,
        "final_response": ""
    }

def think(container, state):
    """思考開始を表示"""
    with container:
        thinking_status = st.empty()
        thinking_status.status("思考中", state="running")
    state["containers"].append((thinking_status, "思考中"))

def change_status(event, container, state):
    """サブエージェントのステータスを更新"""
    progress_info = event["subAgentProgress"]
    message = progress_info.get("message")
    stage = progress_info.get("stage", "processing")
    
    # 前のステータスを完了状態に
    if state["current_status"]:
        status, old_message = state["current_status"]
        status.status(old_message, state="complete")
    
    # 新しいステータス表示
    with container:
        new_status_box = st.empty()
        if stage == "complete":
            display_state = "complete"
        else:
            display_state = "running"
        new_status_box.status(message, state=display_state)
    
    status_info = (new_status_box, message)
    state["containers"].append(status_info)
    state["current_status"] = status_info
    state["current_text"] = None
    state["final_response"] = ""

def stream_text(event, container, state):
    """テキストをストリーミング表示"""
    delta = event["contentBlockDelta"]["delta"]
    if "text" not in delta:
        return
    
    # テキスト出力開始時にステータスを完了に
    if state["current_text"] is None:
        if state["containers"]:
            status, first_message = state["containers"][0]
            if "思考中" in first_message:
                status.status("思考中", state="complete")
        if state["current_status"]:
            status, message = state["current_status"]
            status.status(message, state="complete")
    
    # テキスト処理
    text = delta["text"]
    state["final_response"] += text
    
    # テキストコンテナ更新
    if state["current_text"] is None:
        with container:
            state["current_text"] = st.empty()
    if state["current_text"]:
        response = state["final_response"]
        state["current_text"].markdown(response)

def finish(state):
    """表示の終了処理"""
    if state["current_text"]:
        response = state["final_response"]
        state["current_text"].markdown(response)
    for status, message in state["containers"]:
        status.status(message, state="complete")

# =============================================================================
# サブエージェント呼び出し
# =============================================================================

def extract_stream(data, container, state):
    """ストリーミングから内容を抽出"""
    if not isinstance(data, dict):
        return

    event = data.get("event", {})    
    if "subAgentProgress" in event:
        change_status(event, container, state)
    elif "contentBlockDelta" in event:
        stream_text(event, container, state)
    elif "error" in data:
        error_msg = data.get("error", "Unknown error")
        error_type = data.get("error_type", "Unknown")
        st.error(f"AgentCoreエラー: {error_msg}")
        state["final_response"] = f"エラー: {error_msg}"

async def invoke_agent(prompt, container, agent_core):
    """エージェントを呼び出し"""
    state = create_state()
    session_id = f"session_{str(uuid.uuid4())}"
    think(container, state)
    
    payload = json.dumps({
        "input": {"prompt": prompt, "session_id": session_id}
    }).encode()
    
    try:
        agent_response = agent_core.invoke_agent_runtime(
            agentRuntimeArn=os.getenv("AGENT_RUNTIME_ARN"),
            runtimeSessionId=session_id,
            payload=payload,
            qualifier="DEFAULT"
        )
        for line in agent_response["response"].iter_lines():
            decoded = line.decode("utf-8")
            if not line or not decoded.startswith("data: "):
                continue
            try:
                data = json.loads(decoded[6:])
                extract_stream(data, container, state)
            except json.JSONDecodeError:
                continue
        
        finish(state)
        return state["final_response"]
    
    except Exception as e:
        st.error(f"エラーが発生しました: {e}")
        return ""

# =============================================================================
# メイン画面
# =============================================================================

# タイトル表示
st.title("アマQ Unlimited")
st.write("AWSドキュメントや、あなたのアカウントを調査し放題!")

# セッションを初期化
if 'messages' not in st.session_state:
    st.session_state.messages = []

# メッセージ履歴を表示
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

# AgentCore APIクライアントを初期化
agent_core = boto3.client('bedrock-agentcore')

# ユーザー入力を表示
if prompt := st.chat_input("メッセージを入力してね"):
    with st.chat_message("user"):
        st.markdown(prompt)
    st.session_state.messages.append(
        {"role": "user", "content": prompt}
    )
    
    # エージェントの応答を表示
    with st.chat_message("assistant"):
        container = st.container()
        try:
            response = asyncio.run(
                invoke_agent(prompt, container, agent_core)
            )
            if response:
                st.session_state.messages.append(
                    {"role": "assistant", "content": response}
                )
        except Exception as e:
            st.error(f"エラーが発生しました: {e}")
  • 以下コマンドで起動
pip install streamlit==1.46.1
streamlit run frontend.py
  • 右下のポップアップの「ブラウザーで開く」ボタンをクリック
    • 閉じてしまった場合は、ターミナルの http://localhost:8501 をクリックすればOK
  • エージェントへ以下のように質問してみましょう
    • さっきAgentCore関連のIAMロールを作成したんだけど、その権限内容を確認して、AWSドキュメントに基づいて解説して。

スクリーンショット 2025-08-03 0.16.15.png

もしAIエージェントの回答が途中で止まってしまう場合は、BedrockのClaudeモデル呼び出し分間クォータが上限に達し、Strandsが裏でリトライをかけ続けている可能性があります。

そのときは multiagent.py を編集して、サブエージェントのモデルをそれぞれClaude 3.5 Sonnetのv1/v2などに分散させてみてください。
コードの修正後は、agentcore configureagentcore launch を再実施する必要があります。

(代替候補のモデルID)

  • us.anthropic.claude-3-5-sonnet-20241022-v2:0
  • us.anthropic.claude-3-5-sonnet-20240620-v1:0
  • us.anthropic.claude-3-5-haiku-20241022-v1:0

ちなみに、最近こいつが日本語化されましたが、今回のアプリならサブスク不要の従量課金のみで使い放題です!
調査だけでなく更新系の操作もさせられます(人間の承認が必要のため、若干のアプリ改修が必要)

さいごに

今回は一つのAgentCoreランタイム上にマルチエージェントをまとめてデプロイしましたが、今後は各エージェントを個別のランタイムで運用し、A2Aプロトコルを使って相互通信させるマイクロサービスのような世界観が実現していくかも知れません。

本日利用したコードは以下にまとまってます。

お帰り前に必ずアンケートへ回答ください!

おまけ

図解フルカラーの入門書を書きました!予約受付中。

ビラ.png

次のステップ

やっぱりAWS界隈は有志のアウトプットの勢いがすごい👏

169
126
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
169
126

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?