2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

「さっき話したやつ」を理解するAI秘書LINE Bot:Strands Agents + AgentCore Memoryで会話の文脈を覚えさせる

Last updated at Posted at 2025-12-31

「さっき話したやつ」を理解するAI秘書LINE Bot:Strands Agents + AgentCore Memoryで会話の文脈を覚えさせる

はじめに

久しぶりのアウトプットになります。AWS Bedrock AgentCoreにはずっと興味がありました。プレビュー時から触りたいと思いつつも、特に作りたいものがなかったり、本業が死に体でなかなかアウトプットせずに2025年が終わろうとしていました。

しかし、年末に入って時間ができ、少し作りたいものもできたので、2025年最後のアウトプットをすることにしました。それが、AgentCoreを使ったLINE秘書Botです。

なぜLINE秘書Botを作ろうと思ったか

人とのLINEで横に控えておいて、勝手に予定登録してくれたり、自分の予定を大体把握してくれている秘書がLINEにいたら嬉しいなぁ〜とふと思いました。

例えば、こんな会話ができる秘書がいたら便利です:

私: 「来週の月曜日、午後2時から会議があるよ」
秘書: 「了解しました。来週の月曜日(12月16日)午後2時からの会議を登録しました。」

(数日後)

私: 「月曜日の会議、何時からだっけ?」
秘書: 「来週の月曜日(12月16日)午後2時からの会議ですね。」

正直、短期記憶のないAI Botは一問一答以外、全く使い物になりません。
スクリーンショット 2025-12-31 22.30.36.png

本当は高機能にしたいですが、まずは短期記憶を持ったエージェントの実装からやっていきます。
会話の内容を覚えてくれるだけでも、かなり秘書に近づきます。
スクリーンショット 2025-12-31 22.34.53.png

本記事で解説すること

本記事では、AgentCore Memoryによる短期記憶機能を実装し、会話の文脈を理解できるAIアシスタントを実現する方法を、以下の3つの要点に絞って、コードレベルで詳細に解説します:

  1. CDKによるAgentCore実装 - インフラストラクチャの構築
  2. Strands Agentsによるメモリ実装 - エージェントとメモリ管理の実装
  3. API Gateway/Lambdaによる呼び出し - LINE Webhookの処理

AgentCoreは比較的新しいサービスで、日本語の実装例もまだ少ないため、実際のコードとともに解説することで、同じようにAgentCoreに興味を持っている方の参考になればと思います。

アーキテクチャ概要

システムは以下のコンポーネントで構成されています:

LINE → API Gateway → Lambda (Webhook Handler) → AgentCore Runtime → Strands Agent → Bedrock (Nova 2 Lite)
                                                      ↓
                                                AgentCore Memory
  • AgentCore Runtime: AIエージェントの実行環境(コンテナベース)
  • AgentCore Memory: 会話履歴を保存するマネージドな短期記憶ストレージ
  • Strands Agents: エージェントフレームワーク(Bedrock統合)
  • API Gateway + Lambda: LINE Webhookの受信とAgentCore Runtimeの呼び出し

1. CDKによるAgentCore実装

まず、AgentCore MemoryとRuntimeをCDKで定義します。

AgentCoreとは

2025年07月17日、New Yorkで開催されたAWS Summitにおいて発表されたAWS サービスになります。

Amazon Bedrock AgentCore

Amazon Bedrock AgentCore は、効果的なエージェントを大規模かつ安全に構築、デプロイ、運用するためのエージェントプラットフォームです。インフラストラクチャの管理は不要です。インテリジェントなメモリとゲートウェイを使用してエージェントの実稼働を加速し、ツールとデータへの安全で制御されたアクセスを実現します。エンタープライズグレードのセキュリティと動的なスケーリングでエージェントを実行します。

なんて説明が書いてありますが、要するにAIエージェントのデプロイだけでなく、メモリやツール実行、監視などの機能実装を広範にサポートしてくれる機能群です。

登場当時は、マネジメントコンソールからかCLIでの実装しかサポートされていませんでした。CDK L1 Construct は早々にサポートされましたが、最近L2 Construct もサポートするようになりました。

AWS CDK ver 2.221.0 リリース

実装が楽になるので、これは大変嬉しいアップデートです。

@aws-cdk/aws-bedrock-agentcore-alphaパッケージを使用します。

AgentCore CDKドキュメント

AgentCore MemoryとRuntimeの定義

import * as agentcore from '@aws-cdk/aws-bedrock-agentcore-alpha';

...


    this.memory = new agentcore.Memory(this, 'Memory', {
      memoryName: `PersonalSecretaryMemory_${environment}`,
      description: `Short-term memory for Personal Secretary Agent (${environment}) - Application enforces 48h TTL`,
      // expirationDuration defaults to 90 days (AgentCore minimum is 7 days)
    });

    // Create AgentRuntimeArtifact from local source code
    const agentRuntimeArtifact = agentcore.AgentRuntimeArtifact.fromAsset(
      path.join(__dirname, '../agent-code')
    );

    // AgentCore Runtime using L2 Construct
    this.agentRuntime = new agentcore.Runtime(this, 'AgentRuntime', {
      runtimeName: `PersonalSecretaryAgent_${environment}`,
      agentRuntimeArtifact: agentRuntimeArtifact,
      description: `Personal Secretary Agent Runtime for ${environment} with Nova2 Lite model`,
      environmentVariables: {
        MEMORY_ID: this.memory.memoryId,
      },
    });

ポイント:

  • AgentRuntimeArtifact.fromAsset()でディレクトリのコードをコンテナイメージとしてビルド
  • MEMORY_IDを環境変数として渡し、エージェントコードから参照可能にする

IAM権限の設定

AgentCore RuntimeがBedrockモデルを呼び出し、Memoryにアクセスできるよう権限を付与します:

lib/minimal-agentcore-stack.ts
    // Add Bedrock model invocation permissions for foundation models and inference profiles
    // Include all regions that the inference profile might route to
    this.agentRuntime.addToRolePolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: [
          'bedrock:InvokeModel',
          'bedrock:InvokeModelWithResponseStream',
        ],
        resources: [
          // Foundation models in all APAC regions for Nova 2 Lite inference profile
          `arn:aws:bedrock:ap-northeast-1::foundation-model/*`,
          // Inference profiles in current account/region
          `arn:aws:bedrock:${this.region}:${this.account}:inference-profile/*`,
        ],
      })
    );

    // Add AgentCore Memory permissions for the runtime
    this.agentRuntime.addToRolePolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: [
          'bedrock-agentcore:CreateEvent',
          'bedrock-agentcore:ListEvents',
          'bedrock-agentcore:GetEvent',
          'bedrock-agentcore:DeleteEvent',
          'bedrock-agentcore:UpdateEvent',
        ],
        resources: [this.memory.memoryArn, `${this.memory.memoryArn}/*`],
      })
    );

ポイント:

  • Memory操作にはCreateEventListEventsなどが必要

スタックの統合

AgentCore StackとLINE Bot Stackを統合します。LINE Bot Stackは後述します。:

lib/personal-secretary-stack.ts

import * as cdk from 'aws-cdk-lib';
import { Construct } from 'constructs';
import { MinimalAgentCoreStack } from './minimal-agentcore-stack';
import { LineBotStack } from './line-bot-stack';

export interface PersonalSecretaryStackProps extends cdk.StackProps {
  readonly environment?: string;
  readonly lineChannelSecret: string;
  readonly lineChannelAccessToken: string;
}

export class PersonalSecretaryStack extends cdk.Stack {
  constructor(scope: Construct, id: string, props: PersonalSecretaryStackProps) {
    super(scope, id, props);

    const environment = props.environment || 'dev';

    // Deploy AgentCore stack
    const agentCoreStack = new MinimalAgentCoreStack(this, 'AgentCore', {
      environment,
      env: props.env,
    });

    // Deploy LINE Bot stack
    const lineBotStack = new LineBotStack(this, 'LineBot', {
      environment,
      agentRuntimeArn: agentCoreStack.agentRuntime.agentRuntimeArn,
      memoryId: agentCoreStack.memory.memoryId,
      memoryArn: agentCoreStack.memory.memoryArn,
      lineChannelSecret: props.lineChannelSecret,
      lineChannelAccessToken: props.lineChannelAccessToken,
      env: props.env,
    });

    // Add dependency
    lineBotStack.addDependency(agentCoreStack);
  }
}

ポイント:

  • AgentCore Stackの出力(ARN、ID)をLINE Bot Stackに渡す
  • addDependency()でデプロイ順序を制御

2. Strands Agentsによるメモリ実装

Strands Agentsは、Bedrockと統合されたエージェントフレームワークです。AgentCore Memoryとの統合もサポートしています。

Strans Agentとは

AWSが開発したOSSのAIエージェントフレームワークで、非常にシンプルにコードを記述できる、かつ、AWSサービスとの統合が容易です。

今回のAgent coreへのデプロイや、メモリ統合についても公式ドキュメントに記載されています。
AgentCore Runtimeへのデプロイ

AgentCore Memoryの利用

Strands Agentの初期化

agent-code/personal_secretary_agent.py
from bedrock_agentcore.runtime import BedrockAgentCoreApp
import boto3

# Strands Agents imports
from strands import Agent
from strands.models import BedrockModel
from bedrock_agentcore.memory.integrations.strands.config import AgentCoreMemoryConfig
from bedrock_agentcore.memory.integrations.strands.session_manager import AgentCoreMemorySessionManager

...

class StrandsPersonalSecretaryAgent:
    """Strands Agents wrapper for Personal Secretary functionality"""
    
    def __init__(self):
        """Initialize the Strands Agent wrapper"""
        # Configure BedrockModel to use Nova 2 Lite explicitly
        self.model = BedrockModel(
            model_id="jp.amazon.nova-2-lite-v1:0"
        )
        # Initialize fallback memory manager
        self.memory_manager = SimpleMemoryManager()
        logger.info("StrandsPersonalSecretaryAgent initialized with Nova 2 Lite and fallback memory")
    
    def generate_session_id(self, source_data: Dict[str, Any]) -> str:
        """Generate session ID based on source information (preserved from current implementation)
        
        Args:
            source_data: Source information from LINE webhook
            
        Returns:
            str: Session ID for memory storage
        """
        user_id = source_data.get("userId", "")
        source_type = source_data.get("sourceType", "user")
        
        if source_type == "group":
            group_id = source_data.get("groupId", "")
            if group_id:
                return f"session_group_{group_id}"
        elif source_type == "room":
            room_id = source_data.get("roomId", "")
            if room_id:
                return f"session_room_{room_id}"
        
        # Default to user session for 1:1 chat or fallback
        return f"session_user_{user_id}"
    
    def get_system_prompt(self) -> str:
        """Get the system prompt for the agent (preserved from current implementation)"""
        return """あなたは日本語で応答するパーソナルセクレタリーです。会話の文脈を理解し、過去のやり取りを参考にして、簡潔で実用的な回答をしてください。マークダウン形式は禁止。応答は必ず通常のテキスト形式。"""
    
    def create_agent(self, source_data: Dict[str, Any]) -> Agent:
        """Create a Strands Agent with AgentCore Memory integration
        
        Args:
            source_data: Source information from LINE webhook
            
        Returns:
            Agent: Configured Strands Agent instance
        """
        session_id = self.generate_session_id(source_data)
        memory_id = os.getenv('MEMORY_ID')
        actor_id = source_data.get('userId')
        
        logger.info("Creating agent with memory config", 
                   memory_id=memory_id,
                   session_id=session_id,
                   actor_id=actor_id)
        
        # Always create agent without Strands memory for now due to integration issues
        # TODO: Re-enable Strands memory integration once the JSON parsing issue is resolved
        logger.info("Creating agent without Strands memory (using fallback memory)")
        return Agent(
            system_prompt=self.get_system_prompt(),
            model=self.model
        )
        
    
    async def process_message(self, message: str, source_data: Dict[str, Any]) -> str:
        """Process message using Strands Agent with fallback memory
        
        Args:
            message: User message text
            source_data: Source information from LINE webhook
            
        Returns:
            str: Agent response text
        """
        try:
            session_id = self.generate_session_id(source_data)
            user_id = source_data.get('userId', 'unknown')
            
            logger.info("Processing message with Strands Agent", 
                       user_id=user_id,
                       session_id=session_id,
                       message_length=len(message))
            
            # Create agent (currently without Strands memory due to integration issues)
            agent = self.create_agent(source_data)
            
            # Use fallback memory manager for conversation history
            logger.info("Using fallback memory manager", user_id=user_id)
            
            # Get conversation history
            conversation_history = await self.memory_manager.get_conversation_history(user_id)
            context = self.memory_manager.format_conversation_context(conversation_history)
            
            # Create enhanced prompt with context
            enhanced_message = f"{context}\n\n現在のメッセージ: {message}" if context else message
            
            # Save user message
            await self.memory_manager.save_message(user_id, message, "USER")
            
            # Process with agent
            logger.info("Invoking agent with context", 
                       user_id=user_id,
                       has_context=bool(context))
            
            agent_result = await agent.invoke_async(enhanced_message)
            response = agent_result.text if hasattr(agent_result, 'text') else str(agent_result)
            
            # Save assistant response
            await self.memory_manager.save_message(user_id, response, "ASSISTANT")
            
            logger.info("Successfully processed message with fallback memory", 
                       user_id=user_id,
                       response_length=len(response))
            
            return response
            
        except Exception as e:
            # Preserve existing error handling format for LINE compatibility
            logger.error("Error in process_message", 
                        user_id=source_data.get('userId', 'unknown'),
                        error=str(e),
                        error_type=type(e).__name__)
            return self.format_error_response(e, source_data)

ポイント:

  • BedrockModelでNova 2 Liteを明示的に指定
  • SimpleMemoryManagerを初期化(後述)

AgentCore Memoryへのアクセス

今回は独自のメモリアクセス用クラスを定義しています。のAgentCoreMemorySessionManagerを使えばもっと簡単に実装できるはずですが、正直使いこなせず、独自実装にしています。:

agent-code/personal_secretary_agent.py
class SimpleMemoryManager:
    """Simple memory manager for short-term conversation history using direct AgentCore Memory API"""
    
    def __init__(self):
        self.bedrock_client = boto3.client('bedrock-agentcore', region_name='ap-northeast-1')
        self.memory_id = os.getenv('MEMORY_ID')
        self.max_history_messages = 20
        logger.info("SimpleMemoryManager initialized", memory_id=self.memory_id)
    
    async def save_message(self, user_id: str, message: str, role: str) -> bool:
        """Save a message to AgentCore Memory
        
        Args:
            user_id: Unique identifier for the user
            message: Message content to save
            role: Role of the message sender (USER/ASSISTANT)
            
        Returns:
            bool: True if successful, False otherwise
        """
        try:
            session_id = f"session_{user_id}"
            
            params = {
                "memoryId": self.memory_id,
                "actorId": user_id,
                "sessionId": session_id,
                "eventTimestamp": datetime.utcnow(),
                "payload": [
                    {
                        "conversational": {
                            "role": role,
                            "content": {
                                "text": message
                            }
                        }
                    }
                ],
                "clientToken": str(uuid.uuid4())
            }
            
            response = self.bedrock_client.create_event(**params)
            logger.info("Message saved to memory", 
                       user_id=user_id, 
                       role=role,
                       event_id=response.get("eventId"))
            return True
            
        except Exception as e:
            logger.error("Failed to save message to memory", 
                        user_id=user_id, 
                        error=str(e),
                        error_type=type(e).__name__)
            return False

ポイント:

  • create_eventで会話イベントを保存
  • payloadconversational形式でメッセージを格納
  • actorIdsessionIdでユーザーとセッションを識別

会話履歴の取得

agent-code/personal_secretary_agent.py
    async def get_conversation_history(self, user_id: str) -> List[Dict[str, Any]]:
        """Get conversation history from AgentCore Memory
        
        Args:
            user_id: Unique identifier for the user
            
        Returns:
            List[Dict]: List of conversation messages
        """
        try:
            session_id = f"session_{user_id}"
            
            params = {
                "memoryId": self.memory_id,
                "actorId": user_id,
                "sessionId": session_id,
                "maxResults": self.max_history_messages  # Use maxResults instead of maxItems
            }
            
            response = self.bedrock_client.list_events(**params)
            events = response.get("events", [])
            
            conversation = []
            for event in events:
                payload = event.get("payload", [])
                for item in payload:
                    if "conversational" in item:
                        conv = item["conversational"]
                        conversation.append({
                            "role": conv.get("role", "USER"),
                            "content": conv.get("content", {}).get("text", ""),
                            "timestamp": event.get("eventTimestamp")
                        })
            
            # Sort by timestamp (oldest first)
            conversation.sort(key=lambda x: x.get("timestamp", ""))
            
            logger.info("Retrieved conversation history", 
                       user_id=user_id, 
                       message_count=len(conversation))
            return conversation
            
        except Exception as e:
            logger.error("Failed to retrieve conversation history", 
                        user_id=user_id, 
                        error=str(e),
                        error_type=type(e).__name__)
            return []

ポイント:

  • list_eventsで会話履歴を取得
  • maxResultsで取得件数を制限(デフォルト20件)
  • タイムスタンプでソートして時系列順に並べ替え

Strands Agentによるメッセージ処理

Strands Agentを使ってメッセージを処理し、会話履歴をプロンプトに組み込みます:

agent-code/personal_secretary_agent.py
    async def process_message(self, message: str, source_data: Dict[str, Any]) -> str:
        """Process message using Strands Agent with fallback memory
        
        Args:
            message: User message text
            source_data: Source information from LINE webhook
            
        Returns:
            str: Agent response text
        """
        try:
            session_id = self.generate_session_id(source_data)
            user_id = source_data.get('userId', 'unknown')
            
            logger.info("Processing message with Strands Agent", 
                       user_id=user_id,
                       session_id=session_id,
                       message_length=len(message))
            
            # Create agent (currently without Strands memory due to integration issues)
            agent = self.create_agent(source_data)
            
            # Use fallback memory manager for conversation history
            logger.info("Using fallback memory manager", user_id=user_id)
            
            # Get conversation history
            conversation_history = await self.memory_manager.get_conversation_history(user_id)
            context = self.memory_manager.format_conversation_context(conversation_history)
            
            # Create enhanced prompt with context
            enhanced_message = f"{context}\n\n現在のメッセージ: {message}" if context else message
            
            # Save user message
            await self.memory_manager.save_message(user_id, message, "USER")
            
            # Process with agent
            logger.info("Invoking agent with context", 
                       user_id=user_id,
                       has_context=bool(context))
            
            agent_result = await agent.invoke_async(enhanced_message)
            response = agent_result.text if hasattr(agent_result, 'text') else str(agent_result)
            
            # Save assistant response
            await self.memory_manager.save_message(user_id, response, "ASSISTANT")
            
            logger.info("Successfully processed message with fallback memory", 
                       user_id=user_id,
                       response_length=len(response))
            
            return response

ポイント:

  • 会話履歴を取得してプロンプトに組み込み
  • agent.invoke_async()でStrands Agentを非同期実行
  • ユーザーメッセージとアシスタント応答の両方をMemoryに保存

AgentCoreエントリーポイント

AgentCore Runtimeから呼び出されるエントリーポイントを実装します:

agent-code/personal_secretary_agent.py
@app.entrypoint
async def entrypoint(payload: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
    """Simplified entrypoint using Strands Agents"""
    try:
        logger.info("Processing message with Strands Agents", payload=payload)
        
        # Validate payload
        if not payload:
            return {"status": "error", "error": "No payload provided"}
        
        # Extract message content
        message_data = payload.get("message", {})
        source_data = payload.get("source", {})
        
        message_content = message_data.get("text", "")
        user_id = source_data.get("userId", "")
        
        if not message_content or not user_id:
            return {"status": "error", "error": "Missing message content or user ID"}
        
        # Process message using Strands Agents (handles memory automatically)
        agent_result = await strands_agent.process_message(message_content, source_data)
        
        logger.info("Generated response with Strands Agents", 
                   user_id=user_id, 
                   response_length=len(agent_result),
                   memory_enabled=True)  # Strands Agents handles memory automatically
        
        return {
            "status": "success",
            "response": agent_result,
            "memory_enabled": True,  # Memory is enabled via fallback implementation
            "debug_info": {
                "memory_available": True,  # Memory is available via fallback
                "memory_id": os.getenv('MEMORY_ID'),
                "framework": "strands-agents-with-fallback-memory",
                "memory_type": "fallback_agentcore_direct"
            }
        }

ポイント:

  • @app.entrypointデコレータでAgentCore Runtimeのエントリーポイントを定義
  • strands_agent.process_message()でメッセージを処理
  • レスポンスにメモリの状態を含める

3. API Gateway/Lambdaによる呼び出し

LINE Webhookを受信し、AgentCore Runtimeを呼び出すLambda関数とAPI Gatewayを実装します。

Lambda関数の定義(CDK)

lib/line-bot-stack.ts
    // Lambda function for LINE webhook
    const webhookFunction = new lambda.Function(this, 'LineWebhookFunction', {
      runtime: lambda.Runtime.PYTHON_3_11,
      handler: 'line_webhook.lambda_handler',
      code: lambda.Code.fromAsset(path.join(__dirname, '../lambda/line-webhook')),
      timeout: cdk.Duration.seconds(30),
      memorySize: 256,
      environment: {
        LINE_CHANNEL_SECRET: props.lineChannelSecret,
        LINE_CHANNEL_ACCESS_TOKEN: props.lineChannelAccessToken,
        AGENT_RUNTIME_ARN: agentRuntimeArn,
        MEMORY_ID: memoryId,
        MEMORY_ARN: memoryArn,
        ENABLE_SHORT_TERM_MEMORY: 'true',
        MEMORY_TTL_HOURS: '48',
        MAX_CONTEXT_MESSAGES: '20',
        LOG_LEVEL: 'INFO',
      },
      logRetention: logs.RetentionDays.ONE_WEEK,
    });

ポイント:

  • AgentCore Runtime ARNとMemory IDを環境変数として設定
  • タイムアウト30秒、メモリ256MB
  • LINEのチャネルシークレットや、トークンも取得して環境変数に設定しておこう。

IAM権限の設定

Lambda関数がAgentCore RuntimeとMemoryにアクセスできるよう権限を付与します:

lib/line-bot-stack.ts
    // Add permissions to invoke AgentCore and access Memory
    webhookFunction.addToRolePolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: [
          'bedrock-agentcore:InvokeAgentRuntime',
          'bedrock-agentcore:GetAgent',
        ],
        resources: [agentRuntimeArn, `${agentRuntimeArn}/*`],
      })
    );

    // Add permissions to access AgentCore Memory
    webhookFunction.addToRolePolicy(
      new iam.PolicyStatement({
        effect: iam.Effect.ALLOW,
        actions: [
          'bedrock-agentcore:CreateEvent',
          'bedrock-agentcore:ListEvents',
          'bedrock-agentcore:GetEvent',
          'bedrock-agentcore:DeleteEvent',
          'bedrock-agentcore:UpdateEvent',
        ],
        resources: [memoryArn, `${memoryArn}/*`],
      })
    );

ポイント:

  • InvokeAgentRuntimeでAgentCore Runtimeを呼び出し
  • Memory操作の権限も一応付与。※デバックように入れただけなので、AgentからしかMemory操作しなければ不要。

API Gatewayの設定

lib/line-bot-stack.ts
    // API Gateway for LINE webhook
    const api = new apigateway.RestApi(this, 'LineWebhookApi', {
      restApiName: `PersonalSecretaryLineBot-${environment}`,
      description: `LINE Bot API for Personal Secretary Agent (${environment})`,
      deployOptions: {
        stageName: 'prod',
        loggingLevel: apigateway.MethodLoggingLevel.INFO,
        dataTraceEnabled: true,
        metricsEnabled: true,
      },
      defaultCorsPreflightOptions: {
        allowOrigins: apigateway.Cors.ALL_ORIGINS,
        allowMethods: apigateway.Cors.ALL_METHODS,
        allowHeaders: ['Content-Type', 'X-Amz-Date', 'Authorization', 'X-Api-Key'],
      },
    });

    // Webhook endpoint
    const webhookIntegration = new apigateway.LambdaIntegration(webhookFunction, {
      requestTemplates: { 'application/json': '{ "statusCode": "200" }' },
    });

    api.root.addResource('webhook').addMethod('POST', webhookIntegration);

ポイント:

  • REST APIで/webhookエンドポイントを作成
  • Lambda統合でLambda関数を呼び出し
  • CORS設定とロギングを有効化

Lambda関数の実装

LINE Webhookを受信し、AgentCore Runtimeを呼び出します:

lambda/line-webhook/line_webhook.py
def invoke_agent(user_message: str, source_info: Dict[str, Any]) -> Optional[str]:
    """Invoke AgentCore with user message and source information"""
    try:
        user_id = source_info.get('userId', '')
        source_type = source_info.get('type', 'user')
        
        logger.info(f"Invoking agent for user: {user_id}, source_type: {source_type}")
        
        payload = {
            "message": {"text": user_message},
            "source": {
                "userId": user_id,
                "type": "LINE",
                "sourceType": source_type,
                "groupId": source_info.get('groupId'),
                "roomId": source_info.get('roomId')
            },
            "context": {
                "platform": "LINE",
                "timestamp": str(int(time.time() * 1000)),
                "request_memory": ENABLE_MEMORY and bool(MEMORY_ID),
                "memory_limit": 20
            }
        }
        
        response = bedrock_agentcore.invoke_agent_runtime(
            agentRuntimeArn=AGENT_RUNTIME_ARN,
            payload=json.dumps(payload, ensure_ascii=False).encode('utf-8'),
            contentType='application/json',
            accept='application/json'
        )

ポイント:

  • invoke_agent_runtimeでAgentCore Runtimeを呼び出し
  • ペイロードにメッセージ、ソース情報、コンテキストを含める
  • request_memory: trueでメモリ機能を有効化

LINE Webhookの処理

lambda/line-webhook/line_webhook.py
def lambda_handler(event: Dict[str, Any], context: Any) -> Dict[str, Any]:
    """Lambda handler for LINE webhook"""
    try:
        logger.info(f"Received webhook event")
        
        # Get request body and signature
        body = event.get('body', '')
        signature = event.get('headers', {}).get('x-line-signature', '')
        
        # Verify signature
        if not verify_signature(body, signature):
            logger.warning("Invalid signature")
            return {
                'statusCode': 401,
                'body': json.dumps({'error': 'Invalid signature'})
            }
        
        # Parse webhook data
        webhook_data = json.loads(body)
        events = webhook_data.get('events', [])
        
        if not events:
            logger.info("No events to process")
            return {
                'statusCode': 200,
                'body': json.dumps({'message': 'No events'})
            }
        
        # Process events
        process_line_events(events)

ポイント:

  • LINE署名を検証してセキュリティを確保
  • イベントをパースして処理

動作の流れ

  1. ユーザーがLINEでメッセージを送信
  2. API GatewayがWebhookを受信
  3. Lambda関数が署名を検証し、イベントを処理
  4. AgentCore Runtimeを呼び出し、メッセージとソース情報を渡す
  5. Strands Agentが会話履歴を取得(AgentCore Memoryから)
  6. 会話履歴をプロンプトに組み込み、Bedrockで応答生成
  7. ユーザーメッセージとアシスタント応答をMemoryに保存
  8. LINEに応答を返送

LINE向け実装のポイント

セッション管理

LINE Botでは、1対1チャット、グループチャット、ルームチャットなど、異なるソースタイプがあります。それぞれに適切なセッションIDを生成します。※実際の稼働確認は個人でしかしていないので、グループやルームで想定通りに動くかは試せていません。グループやルームなんかでは、メンションされた時だけ返信する。それ以外が会話を記憶に貯めるだけできると良いなと思っています。:

agent-code/personal_secretary_agent.py
    def generate_session_id(self, source_data: Dict[str, Any]) -> str:
        """Generate session ID based on source information (preserved from current implementation)
        
        Args:
            source_data: Source information from LINE webhook
            
        Returns:
            str: Session ID for memory storage
        """
        user_id = source_data.get("userId", "")
        source_type = source_data.get("sourceType", "user")
        
        if source_type == "group":
            group_id = source_data.get("groupId", "")
            if group_id:
                return f"session_group_{group_id}"
        elif source_type == "room":
            room_id = source_data.get("roomId", "")
            if room_id:
                return f"session_room_{room_id}"
        
        # Default to user session for 1:1 chat or fallback
        return f"session_user_{user_id}"

まとめ

本記事では、以下の3つの要点に絞って、コードレベルで紹介しました:

  1. CDKによるAgentCore実装 - MemoryとRuntimeの定義、IAM権限の設定
  2. Strands Agentsによるメモリ実装 - AgentCore Memory APIの直接使用、会話履歴の管理
  3. API Gateway/Lambdaによる呼び出し - LINE Webhookの処理、AgentCore Runtimeの呼び出し

この実装により、会話の文脈を理解できるLINE Botを構築できました。Strands AgentsとAgentCore Memoryを組み合わせることで、シンプルな実装を実現できました。しかし、まだまだ、AgentCoreMemorySessionManagerのライブラリを使えていなかったり、ツール実装など改善、拡張の余地は多分にあります。

来年はより、高機能は自分だけの秘書を作れたらいいな〜なんて思います。インターフェースはLINEに限らず、メールでもSlackでも、色々使えたら嬉しいですよね。

そんな拡張を夢見ながら、今年は終わりにしようと思います。
皆さま、良いお年を。。。

参考資料


技術スタック

  • AWS Bedrock AgentCore
  • Strands Agents
  • AWS Bedrock (Nova 2 Lite)
  • AWS Lambda
  • API Gateway
  • AWS CDK
  • LINE Messaging API
  • Python 3.11
  • TypeScript
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?