51
44

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

StreamlitとStrands Agentsでチャットを作りながらBedrock AgentCoreに入門(Runtime、Observability、Memory)

Posted at

Bedrock AgentCoreのRuntime、Observability、Memoryをやってみました。

長くなったのでソースはこちらに格納しております。


過去のAgentCore関連の投稿

AgentCore Browser(Bedrock AgentCoreのビルトインツールの一つ)の記事はこちら

AgentCore GatewayとAgentCore Identityの記事はこちら

これで一通り、浅めに触ったことにします。😅


Step1: どシンプルなチャットを作る

ライブラリーの導入

ライブラリーをインストールします。uv使ってます。

uv add bedrock-agentcore strands-agents

アプリケーションコードを作成

Pythonのコードを作っていきます。
まずStrands Agentsでエージェントを最小のコードで作ります。

model = BedrockModel(model_id="us.amazon.nova-lite-v1:0", region_name="us-west-2")
agent = Agent(model=model, callback_handler=None)

result = agent(user_message)

そして、AgentCoreのSDKを使って、AgentCore Runtimeへデプロイできる形にします。

app.py
from bedrock_agentcore import RequestContext
from bedrock_agentcore.runtime import BedrockAgentCoreApp
from strands import Agent
from strands.models import BedrockModel

model = BedrockModel(model_id="us.amazon.nova-lite-v1:0", region_name="us-west-2")
agent = Agent(model=model, callback_handler=None)

app = BedrockAgentCoreApp()


@app.entrypoint
def invoke(payload: dict, context: RequestContext):
    """Process user input and return a response"""
    user_message = payload.get("prompt")
    result = agent(user_message)
    return {"result": result.message}


if __name__ == "__main__":
    app.run()

ソースはこちら

動作確認

AgentCore Runtimeへデプロイする前に、ローカル起動し動作確認します。

uv run app.py
INFO:     Started server process [235808]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:8080 (Press CTRL+C to quit)

cURLでリクエストを送信します。

curl -X POST http://localhost:8080/invocations \
  -H "Content-Type: application/json" \
  -d '{"prompt": "こんにちは!今日も暑いですね!10年後には50度ぐらいいっちゃうんじゃないですか!?"}'
{
  "result": {
    "role": "assistant",
    "content": [
      {
        "text": "こんにちは!確かに最近は暑い日が続いていますね。しかし、気温が10年後に50度になるというのは、現実的ではないとされています。気候変動の影響はありますが、気温が劇的に上昇する見込みはありません。とはいえ、気候変動対策には十分な注意が必要です。地球温暖化を抑制するために、各国が協力して取り組む必要があります。一人一人の行動も重要です。環境に配慮した生活習慣を取り入れることで、地球温暖化の進行を抑えることができます。また、気候変動の影響は地域によって異なります。例えば、北極圏では気温の上昇が顕著で、海氷の融解が進んでいます。一方、赤道付近では気温の上昇が緩やかで、気候変動の影響は比較的小さいとされています。したがって、気候変動対策は地域に応じて適切な対策を講じる必要があります。"
      }
    ]
  }
}

真面目な答えが返ってきましたね。とりあえず動作していることが確認できました。

AgentCore Runtimeへデプロイ

それではAgentCore Runtimeへデプロイしましょう。
AgentCore Runtimeは、ECRに登録したコンテナイメージを使用して起動する仕組みなので、コンテナイメージを作成します。

Dockerfile
FROM public.ecr.aws/docker/library/python:3.11-slim
WORKDIR /app

ENV AWS_REGION=us-west-2
ENV AWS_DEFAULT_REGION=us-west-2
ENV DOCKER_CONTAINER=1

COPY pyproject.toml .
RUN python -m pip install --no-cache-dir .
RUN python -m pip install aws-opentelemetry-distro>=0.10.0

COPY app.py .

RUN useradd -m -u 1000 bedrock_agentcore
USER bedrock_agentcore

EXPOSE 8080

CMD ["opentelemetry-instrument", "python", "-m", "app"]

しれっと、aws-opentelemetry-distroをインストールして、opentelemetry-instrumentというコマンドを使ってますが、これだけでAgentCore Observabilityが有効化されます。

コンテナイメージをビルドしてECRに登録します。

Shell
ACCOUNT_ID=$(aws sts get-caller-identity --query Account --output text)
AWS_REGION=us-west-2

AGENT_NAME=agentcore-basic

docker buildx create --use

aws ecr create-repository --repository-name ${AGENT_NAME} --region ${AWS_REGION}
aws ecr get-login-password --region ${AWS_REGION} | docker login --username AWS --password-stdin ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com

docker buildx build --platform linux/arm64 -t ${ACCOUNT_ID}.dkr.ecr.${AWS_REGION}.amazonaws.com/${AGENT_NAME}:latest --push .

Windows 11のWSL環境でビルドしました。理屈はわかりませんが、amd64環境でarm64のビルドができるんですね。

それでは、AgentCore Runtimeにデプロイします。
マネジメントコンソールで作業します。

  1. マネジメントコンソールで「Amazon Bedrock AgentCore」の管理画面を表示します。
    左メニューの「Agent Runtime」を選択します。
    「Host Agent」ボタンをクリックします。

  2. 「Brouse images」ボタンをクリックし、RCRに登録したコンテナイメージを選択します。
    「Host Agent」ボタンをクリックします。

これでデプロイが完了です。

実行ロールとしてbedrock:InvokeModelbedrock:InvokeModelWithResponseStreamが付与されているので同じリージョンのBedrockにはアクセス可能です。

画面中央のサンプルコードにある「agentRuntimeArn」を後で使うのでメモっておきます。

Streamlitでフロントエンドを作成

デプロイしたエージェントを呼び出すフロントエンドを作成します。必要なライブラリーはstreamlitboto3です。

uv add streamlit boto3

Bedrock AgentCoreを呼ぶクライアントはboto3.client("bedrock-agentcore", region_name="us-west-2")で作成します。サービス名は「bedrock-agentcore」です。

ちなみに「bedrock-agentcore-control」ってのもあります。もともと「bedrock-agent」「bedrock-agent-runtime」とかもありました。カオスですね!

AgentCore Runtimeを呼び出すAPIはinvoke_agent_runtimeです。

response = agent_core_client.invoke_agent_runtime(
    agentRuntimeArn=runtime_arn,
    runtimeSessionId=session_id,
    payload=json.dumps({"prompt": prompt}),
    qualifier="DEFAULT",
)

ドキュメントによると、セッションが重要っぽいです。多分こんな感じです。

  • runtimeSessionIdは明示的に指定可能。未指定の場合はサーバー側で自動生成
  • セッションIDごとに、マイクロVMが分離される
  • 同じセッションIDのリクエストは同じマイクロVMにルーティングされる
  • 最大8時間まで起動

Lambdaと違う特徴がありますね。AIエージェントを動作することを考えて作られれている雰囲気が感じ取れます

Streamlitと組み合わせたPythonコードはこんな感じです。

1_basic.py
import json
import time
import uuid

import boto3
import streamlit as st

# bedrock-agentcoreのクライアント
agent_core_client = boto3.client("bedrock-agentcore", region_name="us-west-2")

# セッションID。33文字以上ないとエラーになる
session_id = str(int(time.time())) + "_" + str(uuid.uuid4()).replace("-", "")


st.title("Basic Chat")
st.write("Streamlit + Bedrock AgentCore + Strands Agents")

runtime_arn = st.text_input(label="AgentRuntime ARN")


if prompt := st.chat_input():
    with st.chat_message("user"):
        st.write(prompt)

    with st.spinner():
        response = agent_core_client.invoke_agent_runtime(
            agentRuntimeArn=runtime_arn,
            runtimeSessionId=session_id,
            payload=json.dumps({"prompt": prompt}),
            qualifier="DEFAULT",
        )

        response_body = response["response"].read()
        response_data = json.loads(response_body)

        with st.chat_message("assistant"):
            for content in response_data["result"]["content"]:
                st.write(content["text"])

起動します。

uv run streamlit run 1_basic.py

いい感じです。

image.png

AgentCore Observabilityの内容を確認

CloudWatchの画面で色々見られます。

アプリとしての可観測性には申し分ないと思います。欲を言うと会話のやり取りがもうちょっと見やすいといいかもな、と思いました。

Step2: ストリームに対応する

チャットといえば、ストリーム。

まずはサーバー側(AgentCore Runtime)です。invoke関数でyield eventすることで、ストリームでレスポンスを返却できます。

@app.entrypoint
async def invoke(payload: dict, context: RequestContext):
    """Process user input and return a response"""
    user_message = payload.get("prompt", "Hello")
    agent_stream = agent.stream_async(user_message)
    async for event in agent_stream:
        if "event" in event:
            yield event

クライアント側(Streamlit)は、st.writeの代わりにst.write_streamを使います。ストリームを処理するstreaming関数も必要です。

def streaming(response):
    """invoke_agent_runtimeリクエストのストリームを処理"""

    for line in response["response"].iter_lines(chunk_size=10):
        if line:
            line = line.decode("utf-8")
            line = line[6:]  # 先頭の`data: `を除去
            line = json.loads(line)
            yield (
                line.get("event", {})
                .get("contentBlockDelta", {})
                .get("delta", {})
                .get("text", "")
            )

...

        with st.chat_message("assistant"):
            st.write_stream(streaming(response))

ソースはこちら

Step3: AgentCore Memoryで会話履歴を保持する

AgentCoreのMemory機能を使って、会話履歴を保持します。

AgentCore Memoryを作成

マネジメントコンソールでMemoryを作成します。

作成後、Memory IDをコピーします。後で使います。

サーバー側コードを作成

AgentCore Memoryは、

メモリー -> アクター -> セッション

という構造になっています。

メモリーIDは先程作成したメモリーのIDで、アクターIDとセッションIDは利用者が自由に指定できます。

Bedrock AgentCore SDKにはメモリーを便利に使うためのMemoryClientが用意されています。

from bedrock_agentcore.memory import MemoryClient

memory_client = MemoryClient(region_name="us-west-2")

例えば、指定した件数の会話を取得する機能があります。

recent_turns = memory_client.get_last_k_turns(
    memory_id=memory_id, actor_id=user_id, session_id=session_id, k=3
)

取得したターンは、新しいもの順に並んでいるようなので、逆順にして整形します。

messages: Messages = []

for turn in reversed(recent_turns):
    for message in turn:
        messages.append(
            {
                "role": message["role"].lower(),
                "content": [{"text": message["content"]["text"]}],
            }
        )

このメッセージをStrangs AgentsのAgentを初期化する際に渡します。

agent = Agent(model=model, messages=messages, callback_handler=None)

agent_stream = agent.stream_async(user_message)

Strands Agentsのストリーム処理は、event["result"]にまとまったレスポンスが格納されるので、こいつをメモリーに保存します。メモリへの保存はcreate_event関数を使用します。

async for event in agent_stream:
    if "event" in event:
        yield event
    if "result" in event:
        # AgentCore Memoryに保存するため、アシスタントの最終回答を取得
        agent_result = event["result"]
        if isinstance(agent_result, AgentResult):
            assistant_message = agent_result.message["content"][0]["text"]

# 会話をAgentCore Memoryに保存
memory_client.create_event(
    memory_id=memory_id,
    actor_id=user_id,
    session_id=session_id,
    messages=[(user_message, "USER"), (assistant_message, "ASSISTANT")],
)

最後に、クライアント側からmemory_iduser_idsession_idを受け取ります。

memory_id = payload.get("memory_id")
user_id = payload.get("user_id")
session_id = payload.get("session_id")

payloadがJSONなので任意の値の受け渡しが可能です。

これでサーバー側の修正は完了です。

ソースはこちら。

クライアント側コードを作成

ユーザーIDをとりあえず固定で作成し、セッションIDはst.session_stateに保持するようにします。

# ユーザーID。とりあえず固定
user_id = "USER001"

# セッションID。33文字以上ないとエラーになる
if "session_id" not in st.session_state:
    session_id = str(int(time.time())) + "_" + str(uuid.uuid4()).replace("-", "")
    st.session_state["session_id"] = session_id

画面上に会話を表示するためにst.session_state["messages"]を使用します。

if "messages" not in st.session_state:
    st.session_state["messages"] = []

...

for message in st.session_state["messages"]:
    with st.chat_message(message["role"]):
        st.write(message["content"])

...

st.session_state["messages"].append({"role": "user", "content": prompt})
st.session_state["messages"].append(
    {"role": "assistant", "content": assistant_message}
)

invoke_agent_runtimepayloadでメモリーID、ユーザーID、セッションIDを渡すようにしました。

今回は理解のため画面側から渡してますが、偽装されないように適切な方法を考慮ください。

response = agent_core_client.invoke_agent_runtime(
    agentRuntimeArn=runtime_arn,
    runtimeSessionId=st.session_state["session_id"],
    payload=json.dumps(
        {
            "memory_id": memory_id,
            "user_id": user_id,
            "session_id": st.session_state["session_id"],
            "prompt": prompt,
        }
    ),
    qualifier="DEFAULT",
)

ソースはこちら

AgentCore Runtimeにデプロイする際の注意点

マネジメントコンソールでAgentCore Runtimeを作成すると自動でIAMロールが生成されますが、AgentCoreへアクセス権限は付与されません。

テスト的に使用する際はBedrockAgentCoreFullAccessを付与すると良いでしょう。

Step4: スレッド機能を追加する

会話履歴をMemoryが永続化できるようになったので、スレッド管理を追加しましょう。

サーバー側に、スレッド一覧(セッションID一覧)を取得する機能と、セッションの過去メッセージ一覧を取得する機能を追加します。

これらの機能は、API Gateway + Lambdaで一般的なAPIとして作成してもいいのですが、せっかくAgentCore Runtimeにデプロイしている環境があるので、この環境を使うことにしました。

推奨される方法かどうかは微妙なので、あくまでできますレベルでご理解ください。

payloadでactionを渡して、アクションごとに処理を切り分けます。

action = payload.get("action", "")

if action == "get_session_id_list":
    """スレッド一覧(セッションID一覧)を取得する処理"""
    yield ...
    return
if action == "get_message_list":
    """セッションの過去メッセージ一覧を取得する処理"""
    yield ...
    return

"""エージェント呼び出し処理"""

スレッド一覧(セッションID一覧)を取得する機能

この処理はBedrock AgentCore SDKでは用意されていないので、Boto3でbedrock-agentcoreを使います。

SDKに追加されることを期待しましょう!

def get_session_id_list(memory_id: str, user_id: str, max_results: int = 100) -> dict:
    """セッションIDの一覧会話履歴を取得する。"""

    # Bedrock AgentCore SDKで機能が用意されていないので、list_sessions APIを直接利用
    agentcore_client = boto3.client("bedrock-agentcore", region_name="us-west-2")

    try:
        response = agentcore_client.list_sessions(
            memoryId=memory_id, actorId=user_id, maxResults=max_results
        )
    except Exception:
        response = {"sessionSummaries": []}

    sessions = response["sessionSummaries"]

    return sessions

セッションの過去メッセージ一覧を取得する機能

こちらはMemoryClientを使います。

def get_message_list(
    memory_id: str,
    user_id: str,
    session_id: str,
):
    """セッションID内の会話を取得する。"""

    recent_turns = memory_client.get_last_k_turns(
        memory_id=memory_id, actor_id=user_id, session_id=session_id, k=3
    )

    messages: Messages = []

    for turn in reversed(recent_turns):
        for message in turn:
            messages.append(
                {
                    "role": message["role"].lower(),
                    "content": [{"text": message["content"]["text"]}],
                }
            )

    return messages

クライアント側コードを作成

愚直に実装しましょう。

image.png

Step5: Long-term memory

AgentCoreのMemoryには、Long-term memoryというものもあります。

Long-term memoryでは単純に会話のやり取りをそのまま保存するのではなく、要約したり抽出した必要な情報を保持します。

ビルトインのストラテジーとして2つ用意があります。

  • Summarization: サマリーを保持。セッションID単位で作成する想定
  • Semantic memory: 一般的な事実(general factual knowledge)を抽出して保持。アクターID単位で作成する想定

Memoryにeventを追加すると、自動でLong-term memoryが更新されます。

Long-term memoryを有効化

マネジメントコンソールで有効化します。

Long-term memoryを取得

Long-term memoryを取得を取得するコードはこんな感じです。

def get_summary_memory(memory_id: str, strategy_id: str, user_id: str, session_id: str):
    """Long-term memoryから`Summarization (built-in)`を取得する。"""
    memory = memory_client.retrieve_memories(
        memory_id=memory_id,
        namespace=f"/strategies/{strategy_id}/actors/{user_id}/sessions/{session_id}",
        actor_id=user_id,
        query="summary",
        top_k=3,
    )

    return list(
        map(lambda x: {"text": x["content"]["text"], "score": x["score"]}, memory)
    )
def get_semantic_memory(memory_id: str, strategy_id: str, user_id: str):
    """Long-term memoryから`Semantic (built-in)`を取得する。"""
    memory = memory_client.retrieve_memories(
        memory_id=memory_id,
        namespace=f"/strategies/{strategy_id}/actors/{user_id}",
        actor_id=user_id,
        query="summary",
        top_k=3,
    )

    return list(
        map(lambda x: {"text": x["content"]["text"], "score": x["score"]}, memory)
    )

画面に表示してみました。
左下がSemantic memory(これまでの複数の会話から抽出した一般的な事実)で、右側がSummarization(会話のサマリー)です。

image.png

まとめ

シンプルなチャットからスレッド管理するものまで一通り作成できました。

StreamlitもAgentCore Runtimeにホスティングしてくれたら最高!

51
44
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
51
44

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?