Bedrock AgentCoreのRuntime、Observability、Memoryをやってみました。
長くなったのでソースはこちらに格納しております。
過去のAgentCore関連の投稿
AgentCore Browser(Bedrock AgentCoreのビルトインツールの一つ)の記事はこちら
AgentCore GatewayとAgentCore Identityの記事はこちら
これで一通り、浅めに触ったことにします。😅
Step1: どシンプルなチャットを作る
ライブラリーの導入
ライブラリーをインストールします。uv使ってます。
uv add bedrock-agentcore strands-agents
アプリケーションコードを作成
Pythonのコードを作っていきます。
まずStrands Agentsでエージェントを最小のコードで作ります。
model = BedrockModel(model_id="us.amazon.nova-lite-v1:0", region_name="us-west-2")
agent = Agent(model=model, callback_handler=None)
result = agent(user_message)
そして、AgentCoreのSDKを使って、AgentCore Runtimeへデプロイできる形にします。
from bedrock_agentcore import RequestContext
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from strands import Agent
from strands.models import BedrockModel
model = BedrockModel(model_id="us.amazon.nova-lite-v1:0", region_name="us-west-2")
agent = Agent(model=model, callback_handler=None)
app = BedrockAgentCoreApp()
@app.entrypoint
def invoke(payload: dict, context: RequestContext):
"""Process user input and return a response"""
user_message = payload.get("prompt")
result = agent(user_message)
return {"result": result.message}
if __name__ == "__main__":
app.run()
ソースはこちら
動作確認
AgentCore Runtimeへデプロイする前に、ローカル起動し動作確認します。
uv run app.py
INFO: Started server process [235808]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)
cURLでリクエストを送信します。
curl -X POST http://localhost:8080/invocations \
-H "Content-Type: application/json" \
-d '{"prompt": "こんにちは!今日も暑いですね!10年後には50度ぐらいいっちゃうんじゃないですか!?"}'
{
"result": {
"role": "assistant",
"content": [
{
"text": "こんにちは!確かに最近は暑い日が続いていますね。しかし、気温が10年後に50度になるというのは、現実的ではないとされています。気候変動の影響はありますが、気温が劇的に上昇する見込みはありません。とはいえ、気候変動対策には十分な注意が必要です。地球温暖化を抑制するために、各国が協力して取り組む必要があります。一人一人の行動も重要です。環境に配慮した生活習慣を取り入れることで、地球温暖化の進行を抑えることができます。また、気候変動の影響は地域によって異なります。例えば、北極圏では気温の上昇が顕著で、海氷の融解が進んでいます。一方、赤道付近では気温の上昇が緩やかで、気候変動の影響は比較的小さいとされています。したがって、気候変動対策は地域に応じて適切な対策を講じる必要があります。"
}
]
}
}
真面目な答えが返ってきましたね。とりあえず動作していることが確認できました。
AgentCore Runtimeへデプロイ
それではAgentCore Runtimeへデプロイしましょう。
AgentCore Runtimeは、ECRに登録したコンテナイメージを使用して起動する仕組みなので、コンテナイメージを作成します。
FROM public.ecr.aws/docker/library/python:3.11-slim
WORKDIR /app
ENV AWS_REGION=us-west-2
ENV AWS_DEFAULT_REGION=us-west-2
ENV DOCKER_CONTAINER=1
COPY pyproject.toml .
RUN python -m pip install --no-cache-dir .
RUN python -m pip install aws-opentelemetry-distro>=0.10.0
COPY app.py .
RUN useradd -m -u 1000 bedrock_agentcore
USER bedrock_agentcore
EXPOSE 8080
CMD ["opentelemetry-instrument", "python", "-m", "app"]
しれっと、aws-opentelemetry-distro
をインストールして、opentelemetry-instrument
というコマンドを使ってますが、これだけでAgentCore Observabilityが有効化されます。
コンテナイメージをビルドしてECRに登録します。
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
AWS_REGION=us-west-2
AGENT_NAME=agentcore-basic
docker buildx create --use
aws ecr create-repository --repository-name ${AGENT_NAME} --region ${AWS_REGION}
aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com
docker buildx build --platform linux/arm64 -t ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${AGENT_NAME}:latest --push .
Windows 11のWSL環境でビルドしました。理屈はわかりませんが、amd64環境でarm64のビルドができるんですね。
それでは、AgentCore Runtimeにデプロイします。
マネジメントコンソールで作業します。
-
マネジメントコンソールで「Amazon Bedrock AgentCore」の管理画面を表示します。
左メニューの「Agent Runtime」を選択します。
「Host Agent」ボタンをクリックします。 -
「Brouse images」ボタンをクリックし、RCRに登録したコンテナイメージを選択します。
「Host Agent」ボタンをクリックします。
これでデプロイが完了です。
実行ロールとしてbedrock:InvokeModel
とbedrock:InvokeModelWithResponseStream
が付与されているので同じリージョンのBedrockにはアクセス可能です。
画面中央のサンプルコードにある「agentRuntimeArn」を後で使うのでメモっておきます。
Streamlitでフロントエンドを作成
デプロイしたエージェントを呼び出すフロントエンドを作成します。必要なライブラリーはstreamlit
とboto3
です。
uv add streamlit boto3
Bedrock AgentCoreを呼ぶクライアントはboto3.client("bedrock-agentcore", region_name="us-west-2")
で作成します。サービス名は「bedrock-agentcore」です。
ちなみに「bedrock-agentcore-control」ってのもあります。もともと「bedrock-agent」「bedrock-agent-runtime」とかもありました。カオスですね!
AgentCore Runtimeを呼び出すAPIはinvoke_agent_runtime
です。
response = agent_core_client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps({"prompt": prompt}),
qualifier="DEFAULT",
)
ドキュメントによると、セッションが重要っぽいです。多分こんな感じです。
-
runtimeSessionId
は明示的に指定可能。未指定の場合はサーバー側で自動生成 - セッションIDごとに、マイクロVMが分離される
- 同じセッションIDのリクエストは同じマイクロVMにルーティングされる
- 最大8時間まで起動
Lambdaと違う特徴がありますね。AIエージェントを動作することを考えて作られれている雰囲気が感じ取れます
Streamlitと組み合わせたPythonコードはこんな感じです。
import json
import time
import uuid
import boto3
import streamlit as st
# bedrock-agentcoreのクライアント
agent_core_client = boto3.client("bedrock-agentcore", region_name="us-west-2")
# セッションID。33文字以上ないとエラーになる
session_id = str(int(time.time())) + "_" + str(uuid.uuid4()).replace("-", "")
st.title("Basic Chat")
st.write("Streamlit + Bedrock AgentCore + Strands Agents")
runtime_arn = st.text_input(label="AgentRuntime ARN")
if prompt := st.chat_input():
with st.chat_message("user"):
st.write(prompt)
with st.spinner():
response = agent_core_client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=session_id,
payload=json.dumps({"prompt": prompt}),
qualifier="DEFAULT",
)
response_body = response["response"].read()
response_data = json.loads(response_body)
with st.chat_message("assistant"):
for content in response_data["result"]["content"]:
st.write(content["text"])
起動します。
uv run streamlit run 1_basic.py
いい感じです。
AgentCore Observabilityの内容を確認
CloudWatchの画面で色々見られます。
アプリとしての可観測性には申し分ないと思います。欲を言うと会話のやり取りがもうちょっと見やすいといいかもな、と思いました。
Step2: ストリームに対応する
チャットといえば、ストリーム。
まずはサーバー側(AgentCore Runtime)です。invoke関数でyield event
することで、ストリームでレスポンスを返却できます。
@app.entrypoint
async def invoke(payload: dict, context: RequestContext):
"""Process user input and return a response"""
user_message = payload.get("prompt", "Hello")
agent_stream = agent.stream_async(user_message)
async for event in agent_stream:
if "event" in event:
yield event
クライアント側(Streamlit)は、st.write
の代わりにst.write_stream
を使います。ストリームを処理するstreaming関数も必要です。
def streaming(response):
"""invoke_agent_runtimeリクエストのストリームを処理"""
for line in response["response"].iter_lines(chunk_size=10):
if line:
line = line.decode("utf-8")
line = line[6:] # 先頭の`data: `を除去
line = json.loads(line)
yield (
line.get("event", {})
.get("contentBlockDelta", {})
.get("delta", {})
.get("text", "")
)
...
with st.chat_message("assistant"):
st.write_stream(streaming(response))
ソースはこちら
Step3: AgentCore Memoryで会話履歴を保持する
AgentCoreのMemory機能を使って、会話履歴を保持します。
AgentCore Memoryを作成
マネジメントコンソールでMemoryを作成します。
作成後、Memory IDをコピーします。後で使います。
サーバー側コードを作成
AgentCore Memoryは、
メモリー -> アクター -> セッション
という構造になっています。
メモリーIDは先程作成したメモリーのIDで、アクターIDとセッションIDは利用者が自由に指定できます。
Bedrock AgentCore SDKにはメモリーを便利に使うためのMemoryClient
が用意されています。
from bedrock_agentcore.memory import MemoryClient
memory_client = MemoryClient(region_name="us-west-2")
例えば、指定した件数の会話を取得する機能があります。
recent_turns = memory_client.get_last_k_turns(
memory_id=memory_id, actor_id=user_id, session_id=session_id, k=3
)
取得したターンは、新しいもの順に並んでいるようなので、逆順にして整形します。
messages: Messages = []
for turn in reversed(recent_turns):
for message in turn:
messages.append(
{
"role": message["role"].lower(),
"content": [{"text": message["content"]["text"]}],
}
)
このメッセージをStrangs AgentsのAgentを初期化する際に渡します。
agent = Agent(model=model, messages=messages, callback_handler=None)
agent_stream = agent.stream_async(user_message)
Strands Agentsのストリーム処理は、event["result"]にまとまったレスポンスが格納されるので、こいつをメモリーに保存します。メモリへの保存はcreate_event
関数を使用します。
async for event in agent_stream:
if "event" in event:
yield event
if "result" in event:
# AgentCore Memoryに保存するため、アシスタントの最終回答を取得
agent_result = event["result"]
if isinstance(agent_result, AgentResult):
assistant_message = agent_result.message["content"][0]["text"]
# 会話をAgentCore Memoryに保存
memory_client.create_event(
memory_id=memory_id,
actor_id=user_id,
session_id=session_id,
messages=[(user_message, "USER"), (assistant_message, "ASSISTANT")],
)
最後に、クライアント側からmemory_id
、user_id
、session_id
を受け取ります。
memory_id = payload.get("memory_id")
user_id = payload.get("user_id")
session_id = payload.get("session_id")
payloadがJSONなので任意の値の受け渡しが可能です。
これでサーバー側の修正は完了です。
ソースはこちら。
クライアント側コードを作成
ユーザーIDをとりあえず固定で作成し、セッションIDはst.session_state
に保持するようにします。
# ユーザーID。とりあえず固定
user_id = "USER001"
# セッションID。33文字以上ないとエラーになる
if "session_id" not in st.session_state:
session_id = str(int(time.time())) + "_" + str(uuid.uuid4()).replace("-", "")
st.session_state["session_id"] = session_id
画面上に会話を表示するためにst.session_state["messages"]
を使用します。
if "messages" not in st.session_state:
st.session_state["messages"] = []
...
for message in st.session_state["messages"]:
with st.chat_message(message["role"]):
st.write(message["content"])
...
st.session_state["messages"].append({"role": "user", "content": prompt})
st.session_state["messages"].append(
{"role": "assistant", "content": assistant_message}
)
invoke_agent_runtime
のpayload
でメモリーID、ユーザーID、セッションIDを渡すようにしました。
今回は理解のため画面側から渡してますが、偽装されないように適切な方法を考慮ください。
response = agent_core_client.invoke_agent_runtime(
agentRuntimeArn=runtime_arn,
runtimeSessionId=st.session_state["session_id"],
payload=json.dumps(
{
"memory_id": memory_id,
"user_id": user_id,
"session_id": st.session_state["session_id"],
"prompt": prompt,
}
),
qualifier="DEFAULT",
)
ソースはこちら
AgentCore Runtimeにデプロイする際の注意点
マネジメントコンソールでAgentCore Runtimeを作成すると自動でIAMロールが生成されますが、AgentCoreへアクセス権限は付与されません。
テスト的に使用する際はBedrockAgentCoreFullAccessを付与すると良いでしょう。
Step4: スレッド機能を追加する
会話履歴をMemoryが永続化できるようになったので、スレッド管理を追加しましょう。
サーバー側に、スレッド一覧(セッションID一覧)を取得する機能と、セッションの過去メッセージ一覧を取得する機能を追加します。
これらの機能は、API Gateway + Lambdaで一般的なAPIとして作成してもいいのですが、せっかくAgentCore Runtimeにデプロイしている環境があるので、この環境を使うことにしました。
推奨される方法かどうかは微妙なので、あくまでできますレベルでご理解ください。
payloadでaction
を渡して、アクションごとに処理を切り分けます。
action = payload.get("action", "")
if action == "get_session_id_list":
"""スレッド一覧(セッションID一覧)を取得する処理"""
yield ...
return
if action == "get_message_list":
"""セッションの過去メッセージ一覧を取得する処理"""
yield ...
return
"""エージェント呼び出し処理"""
スレッド一覧(セッションID一覧)を取得する機能
この処理はBedrock AgentCore SDKでは用意されていないので、Boto3でbedrock-agentcore
を使います。
SDKに追加されることを期待しましょう!
def get_session_id_list(memory_id: str, user_id: str, max_results: int = 100) -> dict:
"""セッションIDの一覧会話履歴を取得する。"""
# Bedrock AgentCore SDKで機能が用意されていないので、list_sessions APIを直接利用
agentcore_client = boto3.client("bedrock-agentcore", region_name="us-west-2")
try:
response = agentcore_client.list_sessions(
memoryId=memory_id, actorId=user_id, maxResults=max_results
)
except Exception:
response = {"sessionSummaries": []}
sessions = response["sessionSummaries"]
return sessions
セッションの過去メッセージ一覧を取得する機能
こちらはMemoryClientを使います。
def get_message_list(
memory_id: str,
user_id: str,
session_id: str,
):
"""セッションID内の会話を取得する。"""
recent_turns = memory_client.get_last_k_turns(
memory_id=memory_id, actor_id=user_id, session_id=session_id, k=3
)
messages: Messages = []
for turn in reversed(recent_turns):
for message in turn:
messages.append(
{
"role": message["role"].lower(),
"content": [{"text": message["content"]["text"]}],
}
)
return messages
クライアント側コードを作成
愚直に実装しましょう。
Step5: Long-term memory
AgentCoreのMemoryには、Long-term memoryというものもあります。
Long-term memoryでは単純に会話のやり取りをそのまま保存するのではなく、要約したり抽出した必要な情報を保持します。
ビルトインのストラテジーとして2つ用意があります。
- Summarization: サマリーを保持。セッションID単位で作成する想定
- Semantic memory: 一般的な事実(general factual knowledge)を抽出して保持。アクターID単位で作成する想定
Memoryにeventを追加すると、自動でLong-term memoryが更新されます。
Long-term memoryを有効化
マネジメントコンソールで有効化します。
Long-term memoryを取得
Long-term memoryを取得を取得するコードはこんな感じです。
def get_summary_memory(memory_id: str, strategy_id: str, user_id: str, session_id: str):
"""Long-term memoryから`Summarization (built-in)`を取得する。"""
memory = memory_client.retrieve_memories(
memory_id=memory_id,
namespace=f"/strategies/{strategy_id}/actors/{user_id}/sessions/{session_id}",
actor_id=user_id,
query="summary",
top_k=3,
)
return list(
map(lambda x: {"text": x["content"]["text"], "score": x["score"]}, memory)
)
def get_semantic_memory(memory_id: str, strategy_id: str, user_id: str):
"""Long-term memoryから`Semantic (built-in)`を取得する。"""
memory = memory_client.retrieve_memories(
memory_id=memory_id,
namespace=f"/strategies/{strategy_id}/actors/{user_id}",
actor_id=user_id,
query="summary",
top_k=3,
)
return list(
map(lambda x: {"text": x["content"]["text"], "score": x["score"]}, memory)
)
画面に表示してみました。
左下がSemantic memory(これまでの複数の会話から抽出した一般的な事実)で、右側がSummarization(会話のサマリー)です。
まとめ
シンプルなチャットからスレッド管理するものまで一通り作成できました。
StreamlitもAgentCore Runtimeにホスティングしてくれたら最高!