5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

StrandsAgents + AG-UI の構成で SessionManagerを使ってAIエージェントに記憶を持たせたい

Posted at

はじめに

先日、「CopilotKit」 と 「StrandsAgents」を用いた構成で簡単なチャットボットアプリを作りました。アプリケーションとAIエージェント間は「AG-UI」というプロトコルで接続しています。
StrandsAgentsをAG-UIに対応させようとすると、以下のStrandsAgents向けAG-UIアダプタを使用する必要があります。

現状、このAG-UIアダプタを使いつつStrandsAgentsの「SessionManager」を使用しようとしましたが、以下の2点で苦戦しました...

  1. AG-UIアダプタがStrandsAgentsの「Session Manager」に未対応で機能を使えない
  2. AG-UIアダプタが提供するヘルパー関数を使うとリクエストヘッダーが取得できない

対策というほどではないですが、自分の行った実装を記載していきます。

わりと力技なので、あくまで参考程度に見ていただければと思います...

前提: 修正前のアプリのアーキテクチャ構成

構成の一番右にあるStrandsAgentsのコードのところで、StrandsAgentsをAG-UI化するために、「ag_ui_strands」を使用しています。

image.png

詳細はこちら

やりたいこと:AIエージェントに記憶を持たせたい

AIエージェントに記憶を持たせるために、AgentCoreのMemoryを使用します。

そして、StrandsAgentsの「Sesssion Manager」を使用することで、StrandsAgents上でMemoryとのやりとりを抽象化してくれるため、簡単にAIエージェントがMemoryを使用することができます。

つまるところ、こちらの機能を使ったで先ほど記載したAIエージェントに記憶を持たせたい。というのがやりたいことです。

Memoryには、短期記憶と長期記憶がありますが、今回の実装では短期記憶のみです。
長期記憶はまた別途で試します...

参考

AgentCore Memoryについては以下を参照

StrandsAgentsのSession Managerについては以下を参照

課題

1. AG-UIアダプタがStrandsAgentsの「Session Manager」に未対応で機能を使えない

AIエージェントに記憶を持たせるため「StrandsAgents」の「SessionManager」を使用しようとしたところ、アダプタである 「ag_ui_strands」のv0.1.0が「SessionManager」に対応していないことが分かりました。

どうやらGitHubのIssue上では対応が進んでいるようですが、今回はパッチを当てて使えるようにしました。

ag_ui_strandsのIssuesを見てみると、どうやらSessionManagerに対応していないようです。

ただし、既にプルリクは出ていました。2025/12/10に出ているレビューで止まっているようですので、いつ頃に取り込まれるかは不明です。

2. AG-UIアダプタが提供するヘルパー関数を使うとリクエストヘッダーが取得できない

前提として、AgentCore Runtimeを使用する際、エンドポイントとして「/invovations」「/ping」が必要になります。

AG-UIアダプタでは、以下の「create_strands_app」を呼びだすことで、AgentCore Runtimeに必要なエンドポイントをFastAPI で作成してくれます。

「create_strands_app」の内部では、以下の「add_strands_fastapi_endpoint」が呼び出されます。実際にはこの関数でエンドポイントが作成されます。この時にrequestからリクエストヘッダーを取得したい場合に、外からアクセスできません。

今回の対策

1. 「Session Manager」が使えるようにパッチを当てる

プルリクの修正コードをみると修正量的には多くなさそうです。
これなら自分でパッチを当てても、そこまで手間ではないと考えました。

スクリーンショット 2026-01-04 22.48.46.png

スクリーンショット 2026-01-04 22.49.28.png

というわけで対策として、プルリクを参考に下記の「agent.py」と「config.py」にパッチを当てます。

config.pyへのパッチ当て

# ~省略~

from typing import Callable, Optional
from strands.session import SessionManager

SessionManagerProvider = Callable[[RunAgentInput], SessionManager]

original_init = StrandsAgentConfig.__init__

def patched_config_init(self, tool_behaviors=None, state_context_builder=None, session_manager_provider=None):
    original_init(self, tool_behaviors or {}, state_context_builder)
    self.session_manager_provider: Optional[SessionManagerProvider] = session_manager_provider

StrandsAgentConfig.__init__ = patched_config_init

# ~省略~

agent.pyへのパッチ当て

# ~省略~

# パッチ適用前に元のrun()を保存
original_run = StrandsAgent.run

def patched_run(self, input_data):
    """session_manager対応のrun()メソッド"""
    
    thread_id = input_data.thread_id or "default"
    
    # エージェント作成時にsession_managerを追加
    if thread_id not in self._agents_by_thread:
        from strands import Agent as StrandsAgentCore

        agent_kwargs = {
            "model": self._model,
            "system_prompt": self._system_prompt,
            "tools": self._tools,
            **self._agent_kwargs,
        }

        # session_managerを追加
        if self.config and self.config.session_manager_provider:
            session_manager = self.config.session_manager_provider(input_data)
            agent_kwargs["session_manager"] = session_manager
        
        self._agents_by_thread[thread_id] = StrandsAgentCore(**agent_kwargs)
    
    # 保存済みの元のrun()を呼び出し
    return original_run(self, input_data)

# パッチを適用
StrandsAgent.run = patched_run

# ~省略~

2. ヘルパー関数は使わない

ヘルパーが関数内の実装を持ってきて、自力でFastAPIのエンドポイントを作成します。

# ~省略~

# FastAPIアプリを作成
app = FastAPI(title="AWS Strands Agent")

# CORS設定
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


# ~省略~

@app.post("/invocations")
async def invocations(input_data: RunAgentInput, request: Request):
    
    # リクエストヘッダーから情報取得
    actor_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub", "")
    session_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id", "")

    def session_provider(input_data: RunAgentInput):
        """AgentCore Memoryのセッションマネージャ作成"""
        return create_session_manager(
            memory_id=MEMORY_ID,
            session_id=session_id,
            actor_id=actor_id
        )

    model = BedrockModel(
        model_id="jp.anthropic.claude-haiku-4-5-20251001-v1:0"
    )

    # エージェント作成
    agent = Agent(
        model=model,
        system_prompt=SYSTEM_PROMPT,
    )

    strands_agent = StrandsAgent(
        agent=agent,
        name="strands_agent",
        config=StrandsAgentConfig(
            session_manager_provider=session_provider
        )
    )

    # レスポンス処理
    accept_header = request.headers.get("accept")
    encoder = EventEncoder(accept=accept_header)

    async def event_generator():
        try:
            async for event in strands_agent.run(input_data):
                yield encoder.encode(event)
        except Exception as e:
            from ag_ui.core import RunErrorEvent, EventType
            error_event = RunErrorEvent(
                type=EventType.RUN_ERROR,
                message=f"Agent error: {str(e)}",
                code="AGENT_ERROR"
            )
            yield encoder.encode(error_event)
    
    return StreamingResponse(
        event_generator(),
        media_type=encoder.get_content_type()
    )

@app.get("/ping")
async def ping():
    """ヘルスチェック用のエンドポイント"""
    return {"status": "ok", "message": "Agent is running"}

# ~省略~

やってみる

実際にStrandsAgents + AG-UI でAgentCore Memoryを使ってAIエージェントに記憶を持たせてみます。

Step1. CDKでAgentCore Memoryを定義する

今回の構成ではAmplify内でCDKを使ってAgentCore Runtimeを定義しているため、同様にAgentCore Memoryも定義します。

const agentMemory = new agentcore.Memory(backend.stack, 'AgentMemory', {
  memoryName: 'ai_agent_memory',
  expirationDuration: cdk.Duration.days(90),
});

StrandsAgents内でSeesionManagerを定義するには、MemoryIDが必要となるため、Runtimeの環境変数MEMORY_IDとして渡します。

const agentRuntime = new agentcore.Runtime(backend.stack, 'StrandsAgentRuntime', {
  runtimeName: 'strandsAgentRuntime', 
  agentRuntimeArtifact: agentRuntimeArtifact,
  authorizerConfiguration:
    agentcore.RuntimeAuthorizerConfiguration.usingCognito(
      backend.auth.resources.userPool,
      [backend.auth.resources.userPoolClient]
    ),
  requestHeaderConfiguration: {
    allowlistedHeaders: [
      "X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub",
      "X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id"
    ]
  },
  environmentVariables: {
    MEMORY_ID: agentMemory.memoryId
  }
});

コードはこちら
backend.ts
import { defineBackend } from '@aws-amplify/backend';
import * as aws_lambda from 'aws-cdk-lib/aws-lambda';
import * as path from "path";
import { fileURLToPath } from "url";
import { auth } from './auth/resource';
import { copilotkitFunction } from './functions/copilotkit/resource';
import * as cdk from "aws-cdk-lib";
import * as agentcore from "@aws-cdk/aws-bedrock-agentcore-alpha";
import * as iam from "aws-cdk-lib/aws-iam";
import * as apigateway from "aws-cdk-lib/aws-apigateway";
import { ResponseTransferMode } from "aws-cdk-lib/aws-apigateway";

const __filename = fileURLToPath(import. meta.url);
const __dirname = path.dirname(__filename);

const backend = defineBackend({
  auth,
  copilotkitFunction
});

const agentRuntimeArtifact = agentcore.AgentRuntimeArtifact.fromAsset(
  path.join(__dirname, "../../agent")
);

const agentMemory = new agentcore.Memory(backend.stack, 'AgentMemory', {
  memoryName: 'ai_agent_memory',
  expirationDuration: cdk.Duration.days(90),
});

const agentRuntime = new agentcore.Runtime(backend.stack, 'StrandsAgentRuntime', {
  runtimeName: 'strandsAgentRuntime', 
  agentRuntimeArtifact: agentRuntimeArtifact,
  authorizerConfiguration:
    agentcore.RuntimeAuthorizerConfiguration.usingCognito(
      backend.auth.resources.userPool,
      [backend.auth.resources.userPoolClient]
    ),
  requestHeaderConfiguration: {
    allowlistedHeaders: [
      "X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub",
      "X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id"
    ]
  },
  environmentVariables: {
    MEMORY_ID: agentMemory.memoryId
  }
});

agentRuntime.role.addToPrincipalPolicy(
  new iam.PolicyStatement({
    actions: [
      "bedrock:InvokeModel",
      "bedrock:InvokeModelWithResponseStream",
      "bedrock-agentcore:CreateEvent",
      "bedrock-agentcore:ListEvents",
      "bedrock-agentcore:GetMemory",
      "bedrock-agentcore:CreateMemorySession",
      "bedrock-agentcore:GetMemorySession",
      "bedrock-agentcore:AddMemoryEvents",
      "bedrock-agentcore:GetMemoryStrategy",
      "bedrock-agentcore:ListMemoryStrategies",
    ],
    resources: ["*"],
  }),
);

backend.copilotkitFunction.addEnvironment('USER_POOL_ID', backend.auth.resources.userPool.userPoolId);
backend.copilotkitFunction.addEnvironment('USER_POOL_CLIENT_ID', backend.auth.resources.userPoolClient.userPoolClientId);
backend.copilotkitFunction.addEnvironment('AGENT_RUNTIME_ARN', agentRuntime.agentRuntimeArn);

new aws_lambda.CfnUrl(backend.stack, 'copilotkitFunctionUrl', {
  targetFunctionArn: backend.copilotkitFunction.resources.lambda.functionArn,
  authType: 'NONE',
  invokeMode: 'RESPONSE_STREAM'
});

const restApi = new apigateway.RestApi(
  backend.copilotkitFunction.stack,
  "StreamingApi",
  {
    restApiName: "Streaming API",
    deployOptions: {
      stageName: "dev",
    },
    defaultCorsPreflightOptions: {
      allowOrigins: apigateway.Cors.ALL_ORIGINS,
      allowMethods: ["POST", "OPTIONS"],
      allowHeaders: ["*"],
    },
  }
);

restApi.root.addResource("copilotkit").addMethod(
  "POST",
  new apigateway.LambdaIntegration(
    backend.copilotkitFunction.resources.lambda,
    {
      proxy: true,
      responseTransferMode: ResponseTransferMode.STREAM,
    }
  )
);

backend.addOutput({
  custom: {
    copilotkitFunctionUrl: restApi.url,
    agentRuntimeArn: agentRuntime.agentRuntimeArn
  }
});

Step2. 記憶管理のためリクエストヘッダーでユーザーIDを受け渡す

今回は、Memoryのactor_idsession_idには、以下を指定します。

  • actor_id:Amazon CognitoのログインユーザーのSub
  • session_id:画面を開いた際に1度生成するユニークな文字列

これらをフロントエンドからAIエージェントを呼び出す際にリクエストヘッダーへ設定します。

注意点としては、AgentCore Runtimeで任意のリクエストヘッダーを受け付けるためには、Runtimeの定義で以下の許可を設定する必要があります。

  requestHeaderConfiguration: {
    allowlistedHeaders: [
      "X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub",
      "X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id"
    ]

設定された各種IDは、FastAPIのrequestから引き出します。

@app.post("/invocations")
async def invocations(input_data: RunAgentInput, request: Request):
    
    # リクエストヘッダーから情報取得
    actor_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub", "")
    session_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id", "")

Step3.StrandsAgentsでSession Mangaerを実装する

最後にSession Mangaerを実装します。
ここのStrandsAgentsの実装は先ほどの対策の章で述べたとおりなので、コード全文は以下をご参照ください。(コードが汚くてごめんなさいもうわけないです。力が尽きました)

agent.py
import os
import jwt
from datetime import datetime
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import StreamingResponse

from ag_ui_strands import StrandsAgent, StrandsAgentConfig
from ag_ui.core import RunAgentInput
from ag_ui.encoder import EventEncoder
from strands import Agent
from strands.models import BedrockModel
from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig, RetrievalConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager

# FastAPIアプリを作成
app = FastAPI(title="AWS Strands Agent")

# CORS設定
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

MEMORY_ID = os.environ.get("MEMORY_ID", "")

SYSTEM_PROMPT = f"""
あなたは 私の親友エージェントです。ユーザーからの質問に回答してください。

### 回答時のルール
- 日本語で応答してください
"""

from typing import Callable, Optional
from strands.session import SessionManager

SessionManagerProvider = Callable[[RunAgentInput], SessionManager]

original_init = StrandsAgentConfig.__init__

def patched_config_init(self, tool_behaviors=None, state_context_builder=None, session_manager_provider=None):
    original_init(self, tool_behaviors or {}, state_context_builder)
    self.session_manager_provider: Optional[SessionManagerProvider] = session_manager_provider

# パッチを適用
StrandsAgentConfig.__init__ = patched_config_init

# パッチ適用前に元のrun()を保存
original_run = StrandsAgent.run

def patched_run(self, input_data):
    """session_manager対応のrun()メソッド"""
    
    thread_id = input_data.thread_id or "default"
    
    # エージェント作成時にsession_managerを追加
    if thread_id not in self._agents_by_thread:
        from strands import Agent as StrandsAgentCore

        agent_kwargs = {
            "model": self._model,
            "system_prompt": self._system_prompt,
            "tools": self._tools,
            **self._agent_kwargs,
        }

        # session_managerを追加
        if self.config and self.config.session_manager_provider:
            session_manager = self.config.session_manager_provider(input_data)
            agent_kwargs["session_manager"] = session_manager
        
        self._agents_by_thread[thread_id] = StrandsAgentCore(**agent_kwargs)
    
    # 保存済みの元のrun()を呼び出し
    return original_run(self, input_data)

# パッチを適用
StrandsAgent.run = patched_run

def create_session_manager(
    memory_id: str,
    session_id: str,
    actor_id: str,
    region: str = "ap-northeast-1"
) -> AgentCoreMemorySessionManager:
    """AgentCore Memory用のセッションマネージャーを作成"""

    memory_config = AgentCoreMemoryConfig(
        memory_id=memory_id,
        session_id=session_id,
        actor_id=actor_id,

    )
    return AgentCoreMemorySessionManager(
        agentcore_memory_config=memory_config,
        region_name=region
    )

@app.post("/invocations")
async def invocations(input_data: RunAgentInput, request: Request):
    
    # リクエストヘッダーから情報取得
    actor_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-User-Sub", "")
    session_id = request.headers.get("X-Amzn-Bedrock-AgentCore-Runtime-Custom-Session-Id", "")

    def session_provider(input_data: RunAgentInput):
        """AgentCore Memoryのセッションマネージャ作成"""
        return create_session_manager(
            memory_id=MEMORY_ID,
            session_id=session_id,
            actor_id=actor_id
        )

    model = BedrockModel(
        model_id="jp.anthropic.claude-haiku-4-5-20251001-v1:0"
    )

    # エージェント作成
    agent = Agent(
        model=model,
        system_prompt=SYSTEM_PROMPT,
    )

    strands_agent = StrandsAgent(
        agent=agent,
        name="strands_agent",
        config=StrandsAgentConfig(
            session_manager_provider=session_provider
        )
    )

    # レスポンス処理
    accept_header = request.headers.get("accept")
    encoder = EventEncoder(accept=accept_header)

    async def event_generator():
        try:
            async for event in strands_agent.run(input_data):
                yield encoder.encode(event)
        except Exception as e:
            from ag_ui.core import RunErrorEvent, EventType
            error_event = RunErrorEvent(
                type=EventType.RUN_ERROR,
                message=f"Agent error: {str(e)}",
                code="AGENT_ERROR"
            )
            yield encoder.encode(error_event)
    
    return StreamingResponse(
        event_generator(),
        media_type=encoder.get_content_type()
    )

@app.get("/ping")
async def ping():
    """ヘルスチェック用のエンドポイント"""
    return {"status": "ok", "message": "Agent is running"}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

今回の実装では、AmplifyのSandboxでの動作確認までとなっており、実際にAmplify Hostingでデプロイしたわけではありません。
デプロイしても問題なく動作するとは思いますが...

まとめ

StrandsAgentsのTypeScript版のAG-UIアダプタもまだなので、本番で使うのはもう少し成熟してからの方が良さそうです。

ただCopilotKitを使うと、フロント側のコードやAIエージェントのやりとりのコードが楽になるので、現状でも恩恵はあると思います。

他の実装方法で既にAIエージェントを構築しているのであれば、急いでAG-UIに乗り換えなくてもいいかなという感想です。

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?