この記事は人力で書きました。
このイベント用の手順書ですが、本記事を読めば1時間ぐらいで誰でも試せます!
Claude Codeなど、AIエージェントを「使う」人はかなり増えてきました。
もはや、AI使うのは全員やって当たり前。
僕らは一歩進んで、AIエージェントを「作る」方にも入門しましょう。100倍楽しいですよ🙌
当日までの事前準備
AWSアカウントの作成
新規作成推奨。既存アカウントを使う際は自己責任でお願いします。
※最近は新しい無料専用プランが出てきますが「有料」を選んでください。
今回の課金は数十円レベルの想定です。(100%保証はできないので、自己責任でお願いします)
GitHubアカウントの作成
既に持っている方は既存アカウントをお使いください。
前説(Strands、AgentCoreって何?)
- Strands: AIエージェント開発用のPythonフレームワーク。AWS製のOSS
- AgentCore: Bedrockの姉妹サービス。AIエージェント用の便利パーツ的なAPI群
0. 環境構築
0-1. 開発環境のセットアップ
GitHubリポジトリの作成
- サインイン後、トップページ左上「New」より新規リポジトリを作成
- Repository name:
strands-agentcore
- Choose visibility: Private
- Add README: On
- Repository name:
GitHub Codespacesの起動
- リポジトリ作成後、画面右上の緑色ボタン「Code > Create codespace on main」
必要なファイルの作成
- コードスペース画面下部のターミナルで以下コマンドを実行
touch .gitignore
touch .env
- 各ファイルに以下を記載する
.env
.bedrock_agentcore.yaml
本ハンズオンではGitHubへのプッシュ(コードのアップロード)は不要ですが、誤って認証情報をアップしてしまわないように .env
を作成しておきます。
# AWS認証情報
AWS_ACCESS_KEY_ID=xxx
AWS_SECRET_ACCESS_KEY=xxx
AWS_DEFAULT_REGION=us-west-2
コードスペースではファイルに入力した瞬間、自動で上書き保存されます。
xxx
部分はこの次の手順で埋めます。
0-2. AWSアカウントのセットアップ
IAMユーザーの作成
- AWSマネコンにサインイン( https://console.aws.amazon.com/ )
このハンズオンは、すべてオレゴンリージョンで行います。
最初に画面右上のリージョンを切り替えておいてください。
- 「IAM」を検索してアクセス
- サイドバー「ユーザー」より新規ユーザーを作成
- ステップ1
- ユーザー名:
codespaces
- ユーザー名:
- ステップ2
- 許可のオプション: ポリシーを直接アタッチする
- 許可ポリシー:
AdministratorAccess
にチェック
- 後はそのまま進む
- ステップ1
IAMアクセスキーの作成
- IAMユーザー作成後、作ったユーザー名をクリックして開く
- 概要セクションより「アクセスキーを作成」
- ステップ1
- ユースケース: コマンドラインインターフェイス (CLI)
- 「上記のレコメンデーションを理解し、アクセスキーを作成します。」にチェック
- 後はそのまま進む
- ステップ1
- 作成後、アクセスキーとシークレットアクセスキーを、コードスペースの
.env
に貼り付ける
この認証情報は、管理者権限を持っているため慎重に扱ってください。
もし誤って公開GitHubリポジトリにPushしたりすると、秒で悪い人に検知されて、あなたのAWSアカウントを悪用されたりします。
Bedrockのモデル有効化
- AWSマネコンで「Amazon Bedrock」を検索してアクセス
- サイドバー「モデルアクセス」より「すべてのモデルを有効化」
- ステップ1:そのまま進む
- ステップ2:Anthropic用にユースケースの詳細を追加
- 対象のユーザーは誰ですか?: 社内の従業員
- ユースケースの詳細を入力してください:
hands-on
- 後はそのまま進む
1. Strands入門
- コードスペースで画面下部のターミナルを利用
- 以下コマンドで、新規ディレクトリを作りそこに移動
mkdir 1_strands
cd 1_strands
1-1. 三行エージェント
- 新規ファイルを作成
touch 1_agent.py
- 以下コードをファイルに貼り付け
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent
# .envファイルから環境変数を読み込む
load_dotenv()
# エージェントを作成して起動
agent = Agent("us.anthropic.claude-3-7-sonnet-20250219-v1:0")
agent("JAWS-UGって何?")
(すいません盛りました。 .env
が無ければ三行なんです)
- 以下コマンドで、必要なモジュールをインストールしてから実行
pip install strands-agents python-dotenv
python 1_agent.py
参考:Bedrockの他モデルはもちろん、OpenAIのモデルなども呼べます!
1-2. ツールを持たせてみる
- 新規ファイルを作成
touch 2_tool.py
- 以下コードをファイルに貼り付け
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent, tool
# .envファイルから環境変数を読み込む
load_dotenv()
# 文字カウント関数をツールとして定義
@tool
def counter(word: str, letter: str):
return word.lower().count(letter.lower())
# エージェントを作成
agent = Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
tools=[counter]
)
# エージェントを呼び出し
agent("Strandsの中にSはいくつある?")
- 以下コマンドで実行
python 2_tool.py
もしAIエージェントの回答が途中で止まってしまう場合は、BedrockのClaudeモデル呼び出し分間クォータが上限に達し、Strandsが裏でリトライをかけ続けている可能性があります。
そのときは他のモデルに変更してみてください。
(代替候補のモデルID)
- us.anthropic.claude-3-5-sonnet-20241022-v2:0
- us.anthropic.claude-3-5-sonnet-20240620-v1:0
- us.anthropic.claude-3-5-haiku-20241022-v1:0
1-3. MCPサーバーを持たせてみる
- 新規ファイルを作成
touch 3_mcp.py
- 以下コードをファイルに貼り付け
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent
from strands.tools.mcp import MCPClient
from mcp import stdio_client, StdioServerParameters
# .envファイルから環境変数を読み込む
load_dotenv()
# MCPクライアントを作成
mcp = MCPClient(lambda: stdio_client(
StdioServerParameters(
command="uvx",
args=["strands-agents-mcp-server"]
)
))
# MCPクライアントを起動しながら、エージェント作成&呼び出し
with mcp:
agent = Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
tools=mcp.list_tools_sync()
)
agent("StrandsでA2Aサーバーの最小サンプルコードを書いて!")
- 以下コマンドで実行
pip install uv strands-agents-tools
python 3_mcp.py
1-4. マルチエージェントにしてみる
監督者がサブエージェントをツールのように呼び出す「Agent-as-Tools」パターンを使います。(一番オーソドックスで実装も簡単)
- 新規ファイルを作成
touch 4_multi_agent.py
- 以下コードをファイルに貼り付け
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent, tool
from strands_tools import calculator
# .envファイルから環境変数を読み込む
load_dotenv()
# サブエージェント1を定義
@tool
def math_agent(query: str):
agent = Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
system_prompt="ツールを使って計算を行ってください",
tools=[calculator]
)
return str(agent(query))
# サブエージェント2を定義
@tool
def haiku_agent(query: str):
agent = Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
system_prompt="与えられたお題で五・七・五の俳句を詠んで"
)
return str(agent(query))
# 監督者エージェントの作成と実行
orchestrator = Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
system_prompt="与えられた問題を計算して、答えを俳句として詠んで",
tools=[math_agent, haiku_agent]
)
orchestrator("十円持っている太郎くんが二十円もらいました。今いくら?")
- 以下コマンドで実行
python 4_multi_agent.py
2. AgentCore入門
これまでは作ったエージェントをデプロイしたいとき、やれLambdaかECSか、認証やストリーミング、監視はどうする…など細かい悩みが尽きませんでしたが、AgentCoreはこれをまるっと解決してくれる救世主です!
今回は、AgentCoreの機能のうち「ランタイム」と「オブザーバビリティ」を体験します。
AgentCoreランタイムはAIエージェント専用Lambdaのようなもので、そこで稼働するアプリを簡単に作るためのPython用SDKが提供されています。(実態はFastAPIのような、StarletteベースのAPIフレームワーク)
さらに、そのアプリを簡単にAWSへデプロイしたり、ローカル実行できるCLIベースのスターターツールキットも提供されています。
- 新規ディレクトリを作って、そこに移動
cd /workspaces/strands-agentcore
mkdir 2_agentcore
cd 2_agentcore
2-1. トレースの有効化
このハンズオンは、すべてオレゴンリージョンで行います。
最初に画面右上のリージョンを切り替えておいてください。
- マネコンで「CloudWatch」を検索してアクセス
- サイドバーの「トランザクション検索」より、「Enable Transaction Search」をクリック
- チェックボックスにチェックを入れて「Save」
2-2. コードにAgentCoreを装着
AgentCoreビルド用のディレクトリを作り、そこに新規ファイルを作成
mkdir docker
cd docker
touch agentcore.py
- 以下コードをファイルに貼り付け
# 必要なライブラリをインポート
from dotenv import load_dotenv
from strands import Agent
from bedrock_agentcore.runtime import BedrockAgentCoreApp # 追加
# .envファイルから環境変数をロード
load_dotenv()
# Strandsでエージェントを作成
agent = Agent("us.anthropic.claude-3-7-sonnet-20250219-v1:0")
# ---------- 以下、追加コード--------------
# AgentCoreのサーバーを作成
app = BedrockAgentCoreApp()
# エージェント呼び出し関数を、AgentCoreの開始点に設定
@app.entrypoint
def invoke_agent(payload, context):
# リクエストのペイロード(中身)からプロンプトを取得
prompt = payload.get("prompt")
# エージェントを呼び出してレスポンスを返却
return {"result": agent(prompt).message}
# AgentCoreサーバーを起動
app.run()
- 以下コマンドで実行
pip install bedrock-agentcore
python agentcore.py
AgentCoreサーバーが起動し、待ち受け状態になります。
ローカルでテスト
- コードスペースのターミナル右上の「+」ボタンから、2つ目のターミナルを起動
- 新規ファイルを作成
cd 2_agentcore
touch 1_client.py
- 以下コードをファイルに貼り付け
# 必要なライブラリをインポート
import requests
# ローカルサーバーにリクエストを実施
response = requests.post(
url="http://localhost:8080/invocations",
headers={"Content-Type": "application/json"},
json={"prompt": "JAWS-UGって何?"}
)
# レスポンスを画面に表示
print(response.json())
- 以下コマンドで実行
python 1_client.py
- 2つ目のターミナルはもう使わないので、右側タブバーの🗑️ボタンから閉じる
2-3. AWSにデプロイ
- 1つ目のターミナルに戻り、
Ctrl + C
で起動中のサーバーを停止
requirements.txt の作成
- 新規ファイルを作成
cd /workspaces/strands-agentcore/2_agentcore/docker
touch requirements.txt
- 以下を記入
strands-agents
bedrock-agentcore
AgentCoreランタイムへのデプロイ
- 以下コマンドを実行
# .env の内容をターミナルの環境変数に設定
export $(cat /workspaces/strands-agentcore/.env | grep -v ^# | xargs)
# AgentCoreのスターターツールキットをインストール
pip install bedrock-agentcore-starter-toolkit
# デプロイ準備
agentcore configure --entrypoint agentcore.py --name jawsug
- ウィザードでは全て
Enter
でOK
🔐 Execution Role
Press Enter to auto-create execution role, or provide execution role ARN/name to use existing
Execution role ARN/name (or press Enter to auto-create):
🏗️ ECR Repository
Press Enter to auto-create ECR repository, or provide ECR Repository URI to use existing
ECR Repository URI (or press Enter to auto-create):
🔍 Detected dependency file: requirements.txt
Press Enter to use this file, or type a different path (use Tab for autocomplete):
🔐 Authorization Configuration
By default, Bedrock AgentCore uses IAM authorization.
Configure OAuth authorizer instead? (yes/no) [no]:
- デプロイを実施
# デプロイ
agentcore launch
IAMロールとかECRリポジトリとか、全部自動で作ってくれるので超便利!
これだけで、AIエージェントをサーバーレス基盤上でホストしてくれます。
2-4. 動作確認
マネコンからテスト
- マネコンで「Bedrock AgentCore」を検索してアクセス
- サイドバー「Agent Sandbox」にアクセス
- Runtime agent: jawsug
- Endpoint: DEFAULT
- Input:
{"prompt": "JAWSUGって何?"}
- ※初期値は削除してください
- 「Run」をクリック
GUIアプリからテスト
- サイドバー「Agent Runtime」から
jawsug
を開く - Pythonサンプルコードに表示されている
agentRuntimeArn
の値の文字列をコピーする - 以下の
=
以降にコピーしたARNを貼り付け、コードスペースで.env
ファイルで新しい行に追記
# AWS認証情報
(中略)
# AgentCore設定
AGENT_RUNTIME_ARN=arn:aws:bedrock-agentcore:xxx(以下略)
- 新規ファイルを作成
cd /workspaces/strands-agentcore/2_agentcore
touch 2_frontend.py
- 以下コードをファイルに貼り付け
# 必要なライブラリをインポート
from dotenv import load_dotenv
import os, asyncio, boto3, json, uuid
import streamlit as st
# .envファイルから環境変数をロード
load_dotenv(override=True)
# タイトルを描画
st.title("Strands on AgentCore")
st.write("何でも聞いてね!")
# チャットボックスを描画
if prompt := st.chat_input("メッセージを入力してね"):
# ユーザーのプロンプトを表示
with st.chat_message("user"):
st.markdown(prompt)
# エージェントの回答を表示
with st.chat_message("assistant"):
# AgentCoreランタイムを呼び出し
with st.spinner("考え中…"):
agentcore = boto3.client('bedrock-agentcore')
response = agentcore.invoke_agent_runtime(
agentRuntimeArn=os.getenv("AGENT_RUNTIME_ARN"),
payload=json.dumps({"prompt": prompt})
)
# 結果のテキストを取り出して表示
response_body = response["response"].read()
response_data = json.loads(response_body.decode('utf-8'))
st.write(response_data["result"]["content"][0]["text"])
- 以下コマンドで起動
pip install streamlit
streamlit run 2_frontend.py
- 右下に出るポップアップの「ブラウザーで開く」ボタンをクリック
- 閉じてしまった場合は、ターミナルの
http://localhost:8501
をクリックすればOK
- 閉じてしまった場合は、ターミナルの
2-5. 運用監視
簡易Langfuse的な機能(LLM版Datadogのようなもの)が自動で付いてるので便利です。
AgentCoreオブザーバビリティでトレース確認
- マネコンで、AgentCoreのサイドバー「Agent Runtime」から
jawsug
をクリック - 上部の「Agent details」を展開して「View observability」をクリック
- 「Traces」タブからトレースをクリックすると、エージェントの動作履歴をドリルダウンして確認できます
CloudWatchログの確認
うまく起動せず、トレースすら見られないときはサーバーログを見ましょう。
- マネコンで「CloudWatch」を検索してアクセス
- サイドバー「ロググループ」から
/aws/bedrock-agentcore/runtimes/jawsug-<ランダム文字列>-DEFAULT
をクリック - 「すべてのログストリームを検索」から、直近のサーバーログを確認できます
3. 本格アプリのデプロイ
マルチエージェントをAgentCoreランタイムに載せて、3体の行動をすべてリアルタイムにストリーミング表示します。(これめっちゃ実装大変でした…)
- 新規ディレクトリを作って、そこに移動
cd /workspaces/strands-agentcore
mkdir -p 3_advanced
cd 3_advanced
3-1. マルチエージェントの開発
- 新規ファイルを作成
mkdir docker
cd docker
touch multiagent.py
- 以下コードをファイルに貼り付け
3_advanced/dokcer/multiagent.py
import os, asyncio
from strands import Agent, tool
from strands.tools.mcp import MCPClient
from mcp import stdio_client, StdioServerParameters
from mcp.client.streamable_http import streamablehttp_client
from bedrock_agentcore.runtime import BedrockAgentCoreApp
# =============================================================================
# サブエージェントのストリーミング処理
# =============================================================================
async def send_event(queue, message, stage, tool_name=None):
"""サブエージェントのステータスを送信"""
if not queue:
return
progress = {"message": message, "stage": stage}
if tool_name:
progress["tool_name"] = tool_name
await queue.put({"event": {"subAgentProgress": progress}})
async def merge_streams(stream, queue):
"""親子エージェントのストリームを統合"""
create_task = asyncio.create_task
main = create_task(anext(stream, None))
sub = create_task(queue.get())
waiting = {main, sub}
# チャンクの到着を待機
while waiting:
ready_chunks, waiting = await asyncio.wait(
waiting, return_when=asyncio.FIRST_COMPLETED
)
for ready_chunk in ready_chunks:
# 監督者エージェントのチャンクを処理
if ready_chunk == main:
event = ready_chunk.result()
if event is not None:
yield event
main = create_task(anext(stream, None))
waiting.add(main)
else:
main = None
# サブエージェントのチャンクを処理
elif ready_chunk == sub:
try:
sub_event = ready_chunk.result()
yield sub_event
sub = create_task(queue.get())
waiting.add(sub)
except Exception:
sub = None
if main is None and queue.empty():
break
# =============================================================================
# サブエージェントの呼び出し
# =============================================================================
async def _extract(queue, agent, event, state):
"""ストリーミングから内容を抽出"""
if isinstance(event, str):
state["text"] += event
if queue:
delta = {"delta": {"text": event}}
await queue.put(
{"event": {"contentBlockDelta": delta}}
)
elif isinstance(event, dict) and "event" in event:
event_data = event["event"]
# ツール使用を検出
if "contentBlockStart" in event_data:
block = event_data["contentBlockStart"]
start_data = block.get("start", {})
if "toolUse" in start_data:
tool_use = start_data["toolUse"]
tool = tool_use.get("name", "unknown")
await send_event(queue,
f"「{agent}」がツール「{tool}」を実行中",
"tool_use", tool
)
# テキスト増分を処理
if "contentBlockDelta" in event_data:
block = event_data["contentBlockDelta"]
delta = block.get("delta", {})
if "text" in delta:
state["text"] += delta["text"]
if queue:
await queue.put(event)
async def invoke_agent(agent, query, mcp, create_agent, queue):
"""サブエージェントを呼び出し"""
state = {"text": ""}
await send_event(
queue, f"サブエージェント「{agent}」が呼び出されました", "start"
)
try:
# MCPクライアントを起動しながら、エージェントを呼び出し
with mcp:
agent_obj = create_agent()
async for event in agent_obj.stream_async(query):
await _extract(queue, agent, event, state)
await send_event(
queue, f"「{agent}」が対応を完了しました", "complete"
)
return state["text"]
except Exception:
return f"{agent}エージェントの処理に失敗しました"
# =============================================================================
# サブエージェント1: AWSマスター
# =============================================================================
class AwsMasterState:
def __init__(self):
self.client = None
self.queue = None
_aws_state = AwsMasterState()
def setup_aws_master(queue):
"""新規キューを受け取り、MCPクライアントを準備"""
_aws_state.queue = queue
if queue and not _aws_state.client:
try:
_aws_state.client = MCPClient(
lambda: streamablehttp_client(
"https://knowledge-mcp.global.api.aws"
)
)
except Exception:
_aws_state.client = None
def _create_aws_agent():
"""AWS知識参照エージェントを作成"""
if not _aws_state.client:
return None
return Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
tools=_aws_state.client.list_tools_sync(),
system_prompt="語尾は「〜ゾイ。」にしてください。検索・参照は手短にね。"
)
@tool
async def aws_master(query):
"""AWSマスターエージェント"""
if not _aws_state.client:
return "MCPクライアントが利用不可です"
return await invoke_agent(
"AWSマスター", query, _aws_state.client,
_create_aws_agent, _aws_state.queue
)
# =============================================================================
# サブエージェント2: APIマスター
# =============================================================================
class ApiMasterState:
def __init__(self):
self.client = None
self.queue = None
_api_state = ApiMasterState()
def setup_api_master(queue):
"""新規キューを受け取り、MCPクライアントを準備"""
_api_state.queue = queue
if queue and not _api_state.client:
try:
_api_state.client = MCPClient(
lambda: stdio_client(StdioServerParameters(
command="uvx", args=["awslabs.aws-api-mcp-server"],
env=os.environ.copy()
))
)
except Exception:
_api_state.client = None
def _create_api_agent():
"""API操作エージェントを作成"""
if not _api_state.client:
return None
return Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
tools=_api_state.client.list_tools_sync(),
system_prompt="ギャル風の口調で応対してください。"
)
@tool
async def api_master(query):
"""APIマスターエージェント"""
if not _api_state.client:
return "MCPクライアントが利用不可です"
return await invoke_agent(
"APIマスター", query, _api_state.client,
_create_api_agent, _api_state.queue
)
# =============================================================================
# メイン処理
# =============================================================================
def _create_orchestrator():
"""監督者エージェントを作成"""
return Agent(
model="us.anthropic.claude-3-7-sonnet-20250219-v1:0",
tools=[aws_master, api_master],
system_prompt="""2体のサブエージェントを使って日本語で応対して。
1. AWSマスター:AWSドキュメントなどを参照できます。
2. APIマスター:AWSアカウントをAPIで操作できます。
語尾は「〜だキュウ。」にしてください。"""
)
# アプリケーションを初期化
app = BedrockAgentCoreApp()
orchestrator = _create_orchestrator()
@app.entrypoint
async def invoke(payload):
"""呼び出し処理の開始地点"""
prompt = payload.get("input", {}).get("prompt", "")
# サブエージェント用のキューを初期化
queue = asyncio.Queue()
setup_aws_master(queue)
setup_api_master(queue)
try:
# 監督者エージェントを呼び出し、ストリームを統合
stream = orchestrator.stream_async(prompt)
async for event in merge_streams(stream, queue):
yield event
finally:
# キューをクリーンアップ
setup_aws_master(None)
setup_api_master(None)
# AgentCoreランタイムを起動
if __name__ == "__main__":
app.run()
3-2. AWSにデプロイ
requirements.txt の作成
- 新規ファイルを作成
cd /workspaces/strands-agentcore/3_advanced/docker
touch requirements.txt
- 以下を記入
strands-agents
bedrock-agentcore
uv
AgentCoreランタイムへのデプロイ
- 以下コマンドを実行
# .env の内容をターミナルの環境変数に設定
export $(cat /workspaces/strands-agentcore/.env | grep -v ^# | xargs)
# AgentCoreのスターターツールキットをインストール
pip install bedrock-agentcore-starter-toolkit
# デプロイ準備
agentcore configure --entrypoint multiagent.py
- ウィザードでは全て
Enter
でOK - デプロイを実施
# デプロイ
agentcore launch
- デプロイ完了時、ターミナルに出力される緑枠内
Agent ARN
をコピーしておく
3-3. 動作確認
.env ファイルの更新
- コードスペースの
.env
ファイルで、最後の行の=
以下に新しいAgent ARNを貼り付ける
AGENT_RUNTIME_ARN=arn:aws:bedrock-agentcore:xxx(以下略)
IAMロールの権限追加
エージェントがAWSアカウント内を調査できるようにします。
- マネコンで、AgentCoreのサイドバー「Agent Runtime」から
multiagent
を開く - 「Versions」から最新バージョンをクリック →「IAM service role」をクリック
- 「許可を追加」→「ボリシーをアタッチ」をクリック
- 「AWS管理 - ジョブ機能」で絞り込み、以下をチェックして「許可を追加」
Billing
ReadOnlyAccess
フロントエンドを開発
- 新規ファイルを作成
cd /workspaces/strands-agentcore/3_advanced
touch frontend.py
- 以下コードをファイルに貼り付け
3_advanced/frontend.py
from dotenv import load_dotenv
import os, json, uuid, asyncio, boto3
import streamlit as st
load_dotenv(override=True)
# =============================================================================
# ストリーミング処理
# =============================================================================
def create_state():
"""新しい状態を作成"""
return {
"containers": [],
"current_status": None,
"current_text": None,
"final_response": ""
}
def think(container, state):
"""思考開始を表示"""
with container:
thinking_status = st.empty()
thinking_status.status("思考中", state="running")
state["containers"].append((thinking_status, "思考中"))
def change_status(event, container, state):
"""サブエージェントのステータスを更新"""
progress_info = event["subAgentProgress"]
message = progress_info.get("message")
stage = progress_info.get("stage", "processing")
# 前のステータスを完了状態に
if state["current_status"]:
status, old_message = state["current_status"]
status.status(old_message, state="complete")
# 新しいステータス表示
with container:
new_status_box = st.empty()
if stage == "complete":
display_state = "complete"
else:
display_state = "running"
new_status_box.status(message, state=display_state)
status_info = (new_status_box, message)
state["containers"].append(status_info)
state["current_status"] = status_info
state["current_text"] = None
state["final_response"] = ""
def stream_text(event, container, state):
"""テキストをストリーミング表示"""
delta = event["contentBlockDelta"]["delta"]
if "text" not in delta:
return
# テキスト出力開始時にステータスを完了に
if state["current_text"] is None:
if state["containers"]:
status, first_message = state["containers"][0]
if "思考中" in first_message:
status.status("思考中", state="complete")
if state["current_status"]:
status, message = state["current_status"]
status.status(message, state="complete")
# テキスト処理
text = delta["text"]
state["final_response"] += text
# テキストコンテナ更新
if state["current_text"] is None:
with container:
state["current_text"] = st.empty()
if state["current_text"]:
response = state["final_response"]
state["current_text"].markdown(response)
def finish(state):
"""表示の終了処理"""
if state["current_text"]:
response = state["final_response"]
state["current_text"].markdown(response)
for status, message in state["containers"]:
status.status(message, state="complete")
# =============================================================================
# サブエージェント呼び出し
# =============================================================================
def extract_stream(data, container, state):
"""ストリーミングから内容を抽出"""
if not isinstance(data, dict):
return
event = data.get("event", {})
if "subAgentProgress" in event:
change_status(event, container, state)
elif "contentBlockDelta" in event:
stream_text(event, container, state)
elif "error" in data:
error_msg = data.get("error", "Unknown error")
error_type = data.get("error_type", "Unknown")
st.error(f"AgentCoreエラー: {error_msg}")
state["final_response"] = f"エラー: {error_msg}"
async def invoke_agent(prompt, container, agent_core):
"""エージェントを呼び出し"""
state = create_state()
session_id = f"session_{str(uuid.uuid4())}"
think(container, state)
payload = json.dumps({
"input": {"prompt": prompt, "session_id": session_id}
}).encode()
try:
agent_response = agent_core.invoke_agent_runtime(
agentRuntimeArn=os.getenv("AGENT_RUNTIME_ARN"),
runtimeSessionId=session_id,
payload=payload,
qualifier="DEFAULT"
)
for line in agent_response["response"].iter_lines():
decoded = line.decode("utf-8")
if not line or not decoded.startswith("data: "):
continue
try:
data = json.loads(decoded[6:])
extract_stream(data, container, state)
except json.JSONDecodeError:
continue
finish(state)
return state["final_response"]
except Exception as e:
st.error(f"エラーが発生しました: {e}")
return ""
# =============================================================================
# メイン画面
# =============================================================================
# タイトル表示
st.title("アマQ Unlimited")
st.write("AWSドキュメントや、あなたのアカウントを調査し放題!")
# セッションを初期化
if 'messages' not in st.session_state:
st.session_state.messages = []
# メッセージ履歴を表示
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# AgentCore APIクライアントを初期化
agent_core = boto3.client('bedrock-agentcore')
# ユーザー入力を表示
if prompt := st.chat_input("メッセージを入力してね"):
with st.chat_message("user"):
st.markdown(prompt)
st.session_state.messages.append(
{"role": "user", "content": prompt}
)
# エージェントの応答を表示
with st.chat_message("assistant"):
container = st.container()
try:
response = asyncio.run(
invoke_agent(prompt, container, agent_core)
)
if response:
st.session_state.messages.append(
{"role": "assistant", "content": response}
)
except Exception as e:
st.error(f"エラーが発生しました: {e}")
- 以下コマンドで起動
pip install streamlit==1.46.1
streamlit run frontend.py
- 右下のポップアップの「ブラウザーで開く」ボタンをクリック
- 閉じてしまった場合は、ターミナルの
http://localhost:8501
をクリックすればOK
- 閉じてしまった場合は、ターミナルの
- エージェントへ以下のように質問してみましょう
さっきAgentCore関連のIAMロールを作成したんだけど、その権限内容を確認して、AWSドキュメントに基づいて解説して。
もしAIエージェントの回答が途中で止まってしまう場合は、BedrockのClaudeモデル呼び出し分間クォータが上限に達し、Strandsが裏でリトライをかけ続けている可能性があります。
そのときは multiagent.py
を編集して、サブエージェントのモデルをそれぞれClaude 3.5 Sonnetのv1/v2などに分散させてみてください。
コードの修正後は、agentcore configure
と agentcore launch
を再実施する必要があります。
(代替候補のモデルID)
- us.anthropic.claude-3-5-sonnet-20241022-v2:0
- us.anthropic.claude-3-5-sonnet-20240620-v1:0
- us.anthropic.claude-3-5-haiku-20241022-v1:0
ちなみに、最近こいつが日本語化されましたが、今回のアプリならサブスク不要の従量課金のみで使い放題です!
調査だけでなく更新系の操作もさせられます(人間の承認が必要のため、若干のアプリ改修が必要)
さいごに
今回は一つのAgentCoreランタイム上にマルチエージェントをまとめてデプロイしましたが、今後は各エージェントを個別のランタイムで運用し、A2Aプロトコルを使って相互通信させるマイクロサービスのような世界観が実現していくかも知れません。
本日利用したコードは以下にまとまってます。
お帰り前に必ずアンケートへ回答ください!
おまけ
図解フルカラーの入門書を書きました!予約受付中。
次のステップ
やっぱりAWS界隈は有志のアウトプットの勢いがすごい👏