はじめに
先日、「CopilotKit」 と 「StrandsAgents」を用いた構成で簡単なチャットボットアプリを作りました。アプリケーションとAIエージェント間は「AG-UI」というプロトコルで接続しています。
StrandsAgentsをAG-UIに対応させようとすると、以下のStrandsAgents向けAG-UIアダプタを使用する必要があります。
現状、このAG-UIアダプタを使いつつStrandsAgentsの「SessionManager」を使用しようとしましたが、以下の2点で苦戦しました...
- AG-UIアダプタがStrandsAgentsの「Session Manager」に未対応で機能を使えない
- AG-UIアダプタが提供するヘルパー関数を使うとリクエストヘッダーが取得できない
対策というほどではないですが、自分の行った実装を記載していきます。
わりと力技なので、あくまで参考程度に見ていただければと思います...
前提: 修正前のアプリのアーキテクチャ構成
構成の一番右にあるStrandsAgentsのコードのところで、StrandsAgentsをAG-UI化するために、「ag_ui_strands」を使用しています。
詳細はこちら
やりたいこと:AIエージェントに記憶を持たせたい
AIエージェントに記憶を持たせるために、AgentCoreのMemoryを使用します。
そして、StrandsAgentsの「Sesssion Manager」を使用することで、StrandsAgents上でMemoryとのやりとりを抽象化してくれるため、簡単にAIエージェントがMemoryを使用することができます。
つまるところ、こちらの機能を使ったで先ほど記載したAIエージェントに記憶を持たせたい。というのがやりたいことです。
Memoryには、短期記憶と長期記憶がありますが、今回の実装では短期記憶のみです。
長期記憶はまた別途で試します...
参考
AgentCore Memoryについては以下を参照
StrandsAgentsのSession Managerについては以下を参照
課題
1. AG-UIアダプタがStrandsAgentsの「Session Manager」に未対応で機能を使えない
AIエージェントに記憶を持たせるため「StrandsAgents」の「SessionManager」を使用しようとしたところ、アダプタである 「ag_ui_strands」のv0.1.0が「SessionManager」に対応していないことが分かりました。
どうやらGitHubのIssue上では対応が進んでいるようですが、今回はパッチを当てて使えるようにしました。
ag_ui_strandsのIssuesを見てみると、どうやらSessionManagerに対応していないようです。
ただし、既にプルリクは出ていました。2025/12/10に出ているレビューで止まっているようですので、いつ頃に取り込まれるかは不明です。
2. AG-UIアダプタが提供するヘルパー関数を使うとリクエストヘッダーが取得できない
前提として、AgentCore Runtimeを使用する際、エンドポイントとして「/invovations」「/ping」が必要になります。
AG-UIアダプタでは、以下の「create_strands_app」を呼びだすことで、AgentCore Runtimeに必要なエンドポイントをFastAPI で作成してくれます。
「create_strands_app」の内部では、以下の「add_strands_fastapi_endpoint」が呼び出されます。実際にはこの関数でエンドポイントが作成されます。この時にrequestからリクエストヘッダーを取得したい場合に、外からアクセスできません。
今回の対策
1. 「Session Manager」が使えるようにパッチを当てる
プルリクの修正コードをみると修正量的には多くなさそうです。
これなら自分でパッチを当てても、そこまで手間ではないと考えました。
というわけで対策として、プルリクを参考に下記の「agent.py」と「config.py」にパッチを当てます。
config.pyへのパッチ当て
# ~省略~
from typing import Callable, Optional
from strands.session import SessionManager
SessionManagerProvider = Callable[[RunAgentInput], SessionManager]
original_init = StrandsAgentConfig.__init__
def patched_config_init(self, tool_behaviors=None, state_context_builder=None, session_manager_provider=None):
original_init(self, tool_behaviors or {}, state_context_builder)
self.session_manager_provider: Optional[SessionManagerProvider] = session_manager_provider
StrandsAgentConfig.__init__ = patched_config_init
# ~省略~
agent.pyへのパッチ当て
# ~省略~
# パッチ適用前に元のrun()を保存
original_run = StrandsAgent.run
def patched_run(self, input_data):
"""session_manager対応のrun()メソッド"""
thread_id = input_data.thread_id or "default"
# エージェント作成時にsession_managerを追加
if thread_id not in self._agents_by_thread:
from strands import Agent as StrandsAgentCore
agent_kwargs = {
"model": self._model,
"system_prompt": self._system_prompt,
"tools": self._tools,
**self._agent_kwargs,
}
# session_managerを追加
if self.config and self.config.session_manager_provider:
session_manager = self.config.session_manager_provider(input_data)
agent_kwargs["session_manager"] = session_manager
self._agents_by_thread[thread_id] = StrandsAgentCore(**agent_kwargs)
# 保存済みの元のrun()を呼び出し
return original_run(self, input_data)
# パッチを適用
StrandsAgent.run = patched_run
# ~省略~
2. ヘルパー関数は使わない
ヘルパーが関数内の実装を持ってきて、自力でFastAPIのエンドポイントを作成します。
# ~省略~
# FastAPIアプリを作成
app = FastAPI(title="AWS Strands Agent")
# CORS設定
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# ~省略~
@app.post("/invocations")
async def invocations(input_data: RunAgentInput, request: Request):
# リクエストヘッダーから情報取得
actor_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub", "")
session_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id", "")
def session_provider(input_data: RunAgentInput):
"""AgentCore Memoryのセッションマネージャ作成"""
return create_session_manager(
memory_id=MEMORY_ID,
session_id=session_id,
actor_id=actor_id
)
model = BedrockModel(
model_id="jp.anthropic.claude-haiku-4-5-20251001-v1:0"
)
# エージェント作成
agent = Agent(
model=model,
system_prompt=SYSTEM_PROMPT,
)
strands_agent = StrandsAgent(
agent=agent,
name="strands_agent",
config=StrandsAgentConfig(
session_manager_provider=session_provider
)
)
# レスポンス処理
accept_header = request.headers.get("accept")
encoder = EventEncoder(accept=accept_header)
async def event_generator():
try:
async for event in strands_agent.run(input_data):
yield encoder.encode(event)
except Exception as e:
from ag_ui.core import RunErrorEvent, EventType
error_event = RunErrorEvent(
type=EventType.RUN_ERROR,
message=f"Agent error: {str(e)}",
code="AGENT_ERROR"
)
yield encoder.encode(error_event)
return StreamingResponse(
event_generator(),
media_type=encoder.get_content_type()
)
@app.get("/ping")
async def ping():
"""ヘルスチェック用のエンドポイント"""
return {"status": "ok", "message": "Agent is running"}
# ~省略~
やってみる
実際にStrandsAgents + AG-UI でAgentCore Memoryを使ってAIエージェントに記憶を持たせてみます。
Step1. CDKでAgentCore Memoryを定義する
今回の構成ではAmplify内でCDKを使ってAgentCore Runtimeを定義しているため、同様にAgentCore Memoryも定義します。
const agentMemory = new agentcore.Memory(backend.stack, 'AgentMemory', {
memoryName: 'ai_agent_memory',
expirationDuration: cdk.Duration.days(90),
});
StrandsAgents内でSeesionManagerを定義するには、MemoryIDが必要となるため、Runtimeの環境変数MEMORY_IDとして渡します。
const agentRuntime = new agentcore.Runtime(backend.stack, 'StrandsAgentRuntime', {
runtimeName: 'strandsAgentRuntime',
agentRuntimeArtifact: agentRuntimeArtifact,
authorizerConfiguration:
agentcore.RuntimeAuthorizerConfiguration.usingCognito(
backend.auth.resources.userPool,
[backend.auth.resources.userPoolClient]
),
requestHeaderConfiguration: {
allowlistedHeaders: [
"X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub",
"X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id"
]
},
environmentVariables: {
MEMORY_ID: agentMemory.memoryId
}
});
コードはこちら
import { defineBackend } from '@aws-amplify/backend';
import * as aws_lambda from 'aws-cdk-lib/aws-lambda';
import * as path from "path";
import { fileURLToPath } from "url";
import { auth } from './auth/resource';
import { copilotkitFunction } from './functions/copilotkit/resource';
import * as cdk from "aws-cdk-lib";
import * as agentcore from "@aws-cdk/aws-bedrock-agentcore-alpha";
import * as iam from "aws-cdk-lib/aws-iam";
import * as apigateway from "aws-cdk-lib/aws-apigateway";
import { ResponseTransferMode } from "aws-cdk-lib/aws-apigateway";
const __filename = fileURLToPath(import. meta.url);
const __dirname = path.dirname(__filename);
const backend = defineBackend({
auth,
copilotkitFunction
});
const agentRuntimeArtifact = agentcore.AgentRuntimeArtifact.fromAsset(
path.join(__dirname, "../../agent")
);
const agentMemory = new agentcore.Memory(backend.stack, 'AgentMemory', {
memoryName: 'ai_agent_memory',
expirationDuration: cdk.Duration.days(90),
});
const agentRuntime = new agentcore.Runtime(backend.stack, 'StrandsAgentRuntime', {
runtimeName: 'strandsAgentRuntime',
agentRuntimeArtifact: agentRuntimeArtifact,
authorizerConfiguration:
agentcore.RuntimeAuthorizerConfiguration.usingCognito(
backend.auth.resources.userPool,
[backend.auth.resources.userPoolClient]
),
requestHeaderConfiguration: {
allowlistedHeaders: [
"X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub",
"X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id"
]
},
environmentVariables: {
MEMORY_ID: agentMemory.memoryId
}
});
agentRuntime.role.addToPrincipalPolicy(
new iam.PolicyStatement({
actions: [
"bedrock:InvokeModel",
"bedrock:InvokeModelWithResponseStream",
"bedrock-agentcore:CreateEvent",
"bedrock-agentcore:ListEvents",
"bedrock-agentcore:GetMemory",
"bedrock-agentcore:CreateMemorySession",
"bedrock-agentcore:GetMemorySession",
"bedrock-agentcore:AddMemoryEvents",
"bedrock-agentcore:GetMemoryStrategy",
"bedrock-agentcore:ListMemoryStrategies",
],
resources: ["*"],
}),
);
backend.copilotkitFunction.addEnvironment('USER_POOL_ID', backend.auth.resources.userPool.userPoolId);
backend.copilotkitFunction.addEnvironment('USER_POOL_CLIENT_ID', backend.auth.resources.userPoolClient.userPoolClientId);
backend.copilotkitFunction.addEnvironment('AGENT_RUNTIME_ARN', agentRuntime.agentRuntimeArn);
new aws_lambda.CfnUrl(backend.stack, 'copilotkitFunctionUrl', {
targetFunctionArn: backend.copilotkitFunction.resources.lambda.functionArn,
authType: 'NONE',
invokeMode: 'RESPONSE_STREAM'
});
const restApi = new apigateway.RestApi(
backend.copilotkitFunction.stack,
"StreamingApi",
{
restApiName: "Streaming API",
deployOptions: {
stageName: "dev",
},
defaultCorsPreflightOptions: {
allowOrigins: apigateway.Cors.ALL_ORIGINS,
allowMethods: ["POST", "OPTIONS"],
allowHeaders: ["*"],
},
}
);
restApi.root.addResource("copilotkit").addMethod(
"POST",
new apigateway.LambdaIntegration(
backend.copilotkitFunction.resources.lambda,
{
proxy: true,
responseTransferMode: ResponseTransferMode.STREAM,
}
)
);
backend.addOutput({
custom: {
copilotkitFunctionUrl: restApi.url,
agentRuntimeArn: agentRuntime.agentRuntimeArn
}
});
Step2. 記憶管理のためリクエストヘッダーでユーザーIDを受け渡す
今回は、Memoryのactor_idとsession_idには、以下を指定します。
- actor_id:Amazon CognitoのログインユーザーのSub
- session_id:画面を開いた際に1度生成するユニークな文字列
これらをフロントエンドからAIエージェントを呼び出す際にリクエストヘッダーへ設定します。
注意点としては、AgentCore Runtimeで任意のリクエストヘッダーを受け付けるためには、Runtimeの定義で以下の許可を設定する必要があります。
requestHeaderConfiguration: {
allowlistedHeaders: [
"X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub",
"X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id"
]
設定された各種IDは、FastAPIのrequestから引き出します。
@app.post("/invocations")
async def invocations(input_data: RunAgentInput, request: Request):
# リクエストヘッダーから情報取得
actor_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub", "")
session_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id", "")
Step3.StrandsAgentsでSession Mangaerを実装する
最後にSession Mangaerを実装します。
ここのStrandsAgentsの実装は先ほどの対策の章で述べたとおりなので、コード全文は以下をご参照ください。(コードが汚くてごめんなさいもうわけないです。力が尽きました)
import os
import jwt
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse
from ag_ui_strands import StrandsAgent, StrandsAgentConfig
from ag_ui.core import RunAgentInput
from ag_ui.encoder import EventEncoder
from strands import Agent
from strands.models import BedrockModel
from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig, RetrievalConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager
# FastAPIアプリを作成
app = FastAPI(title="AWS Strands Agent")
# CORS設定
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
MEMORY_ID = os.environ.get("MEMORY_ID", "")
SYSTEM_PROMPT = f"""
あなたは 私の親友エージェントです。ユーザーからの質問に回答してください。
### 回答時のルール
- 日本語で応答してください
"""
from typing import Callable, Optional
from strands.session import SessionManager
SessionManagerProvider = Callable[[RunAgentInput], SessionManager]
original_init = StrandsAgentConfig.__init__
def patched_config_init(self, tool_behaviors=None, state_context_builder=None, session_manager_provider=None):
original_init(self, tool_behaviors or {}, state_context_builder)
self.session_manager_provider: Optional[SessionManagerProvider] = session_manager_provider
# パッチを適用
StrandsAgentConfig.__init__ = patched_config_init
# パッチ適用前に元のrun()を保存
original_run = StrandsAgent.run
def patched_run(self, input_data):
"""session_manager対応のrun()メソッド"""
thread_id = input_data.thread_id or "default"
# エージェント作成時にsession_managerを追加
if thread_id not in self._agents_by_thread:
from strands import Agent as StrandsAgentCore
agent_kwargs = {
"model": self._model,
"system_prompt": self._system_prompt,
"tools": self._tools,
**self._agent_kwargs,
}
# session_managerを追加
if self.config and self.config.session_manager_provider:
session_manager = self.config.session_manager_provider(input_data)
agent_kwargs["session_manager"] = session_manager
self._agents_by_thread[thread_id] = StrandsAgentCore(**agent_kwargs)
# 保存済みの元のrun()を呼び出し
return original_run(self, input_data)
# パッチを適用
StrandsAgent.run = patched_run
def create_session_manager(
memory_id: str,
session_id: str,
actor_id: str,
region: str = "ap-northeast-1"
) -> AgentCoreMemorySessionManager:
"""AgentCore Memory用のセッションマネージャーを作成"""
memory_config = AgentCoreMemoryConfig(
memory_id=memory_id,
session_id=session_id,
actor_id=actor_id,
)
return AgentCoreMemorySessionManager(
agentcore_memory_config=memory_config,
region_name=region
)
@app.post("/invocations")
async def invocations(input_data: RunAgentInput, request: Request):
# リクエストヘッダーから情報取得
actor_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub", "")
session_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id", "")
def session_provider(input_data: RunAgentInput):
"""AgentCore Memoryのセッションマネージャ作成"""
return create_session_manager(
memory_id=MEMORY_ID,
session_id=session_id,
actor_id=actor_id
)
model = BedrockModel(
model_id="jp.anthropic.claude-haiku-4-5-20251001-v1:0"
)
# エージェント作成
agent = Agent(
model=model,
system_prompt=SYSTEM_PROMPT,
)
strands_agent = StrandsAgent(
agent=agent,
name="strands_agent",
config=StrandsAgentConfig(
session_manager_provider=session_provider
)
)
# レスポンス処理
accept_header = request.headers.get("accept")
encoder = EventEncoder(accept=accept_header)
async def event_generator():
try:
async for event in strands_agent.run(input_data):
yield encoder.encode(event)
except Exception as e:
from ag_ui.core import RunErrorEvent, EventType
error_event = RunErrorEvent(
type=EventType.RUN_ERROR,
message=f"Agent error: {str(e)}",
code="AGENT_ERROR"
)
yield encoder.encode(error_event)
return StreamingResponse(
event_generator(),
media_type=encoder.get_content_type()
)
@app.get("/ping")
async def ping():
"""ヘルスチェック用のエンドポイント"""
return {"status": "ok", "message": "Agent is running"}
if __name__ == "__main__":
import uvicorn
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
今回の実装では、AmplifyのSandboxでの動作確認までとなっており、実際にAmplify Hostingでデプロイしたわけではありません。
デプロイしても問題なく動作するとは思いますが...
まとめ
StrandsAgentsのTypeScript版のAG-UIアダプタもまだなので、本番で使うのはもう少し成熟してからの方が良さそうです。
ただCopilotKitを使うと、フロント側のコードやAIエージェントのやりとりのコードが楽になるので、現状でも恩恵はあると思います。
他の実装方法で既にAIエージェントを構築しているのであれば、急いでAG-UIに乗り換えなくてもいいかなという感想です。


