0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AIエージェントのメモリ設計 — RAG・CAG・構造化忘却の実装

0
Posted at

人間は忘れます。そして、忘れることで効率的に思考できます。しかし現在のAIエージェントは、全てを記憶するか全てを忘れるかの二択しかありません。セッションが終われば全て消える。コンテキストウィンドウに入れれば全て覚えている。この極端さが、AIエージェントの長期運用における最大のボトルネックです。

この記事では、24体のAIエージェントを運用するAEGISプロジェクトで設計・実装した4層メモリアーキテクチャを全て公開します。RAG(Retrieval-Augmented Generation)、CAG(Cache-Augmented Generation)、エピソディックストア、そして最も重要な構造化忘却(Structured Forgetting) ——「何を覚え、何を忘れるか」を戦略的に設計する方法を解説します。

Last updated: 2026-03-15

この記事でわかること

  • 4層メモリアーキテクチャとは何か?各層の役割は?
  • エピソディックストア(体験記憶)の実装方法
  • 蒸留パイプラインで記憶を圧縮する方法
  • 構造化忘却戦略——何を忘れるべきか、どう忘れるか
  • ChromaDBを使ったベクトル検索の統合
  • RAGとCAGの使い分け基準

目次

  1. なぜAIエージェントにメモリが必要なのか
  2. 4層メモリアーキテクチャの全体設計
  3. Layer 1: ワーキングメモリ(CAG)
  4. Layer 2: エピソディックストア
  5. Layer 3: セマンティックストア(RAG)
  6. Layer 4: 手続き記憶
  7. 蒸留パイプラインの実装
  8. 構造化忘却戦略
  9. ChromaDB統合の実装
  10. RAGとCAGの使い分け
  11. まとめ

1. なぜAIエージェントにメモリが必要なのか?

メモリなしのAIエージェントの限界とは?

メモリのないAIエージェントは、毎回ゼロからスタートします。

  • 昨日の会話内容を覚えていない
  • 同じミスを何度も繰り返す
  • ユーザーの好みを学習できない
  • 過去の判断の整合性を保てない

24体のエージェントが同時に動く環境では、これは致命的です。CEOエージェントが昨日承認した方針をCTOエージェントが知らない。デザイナーエージェントが先週却下されたデザインを再提案する。こうした非効率が指数関数的に増大します。

現在のAIメモリソリューションの問題点は何か?

アプローチ 問題
全てをコンテキストに入れる トークン上限がある。128Kトークンでも、1年分の会話は入らない
全てをベクトルDBに入れる 検索ノイズが増える。古い情報が新しい情報を汚染する
要約して保存する 要約の過程で重要な詳細が失われる
何も保存しない 毎回ゼロからで非効率

どれも「人間の記憶の仕組み」を模倣していません。人間は階層的に記憶し、戦略的に忘れるのです。


2. 4層メモリアーキテクチャの全体設計はどうなっているのか?

┌─────────────────────────────────────────────────────┐
│  Layer 1: ワーキングメモリ (CAG)                      │
│  ├── 現在のタスクコンテキスト                          │
│  ├── 直近の会話履歴 (最新20メッセージ)                  │
│  └── アクティブな指示・制約                            │
│  容量: ~32K tokens | 保持: セッション中 | 速度: 即座     │
├─────────────────────────────────────────────────────┤
│  Layer 2: エピソディックストア                         │
│  ├── 具体的な体験の記録 (成功・失敗・判断)              │
│  ├── タイムスタンプ付きイベントログ                     │
│  └── 感情バリアンス(重要度スコア)                     │
│  容量: ~10K entries | 保持: 90日 | 速度: <100ms        │
├─────────────────────────────────────────────────────┤
│  Layer 3: セマンティックストア (RAG)                   │
│  ├── ベクトル化された知識                              │
│  ├── ドキュメント・コードベースのインデックス             │
│  └── 外部情報の構造化キャッシュ                        │
│  容量: ~100K embeddings | 保持: 永続 | 速度: <500ms    │
├─────────────────────────────────────────────────────┤
│  Layer 4: 手続き記憶                                  │
│  ├── 学習したパターン・テンプレート                     │
│  ├── ワークフロー定義                                 │
│  └── 最適化されたプロンプト                            │
│  容量: ~500 patterns | 保持: 永続 | 速度: 即座          │
└─────────────────────────────────────────────────────┘
         ↑                    ↑
    ┌────┴────┐         ┌────┴────┐
    │ 蒸留     │         │ 忘却     │
    │Pipeline  │         │Strategy │
    └─────────┘         └─────────┘

この4層は人間の記憶システムに対応しています:

人間の記憶 AIの実装
Layer 1 短期記憶・作業記憶 コンテキストウィンドウ(CAG)
Layer 2 エピソード記憶 イベントログ + 重要度スコア
Layer 3 意味記憶 ベクトルDB(ChromaDB)
Layer 4 手続き記憶 プロンプト + ワークフロー定義

3. Layer 1: ワーキングメモリ(CAG)はどう設計するのか?

CAG(Cache-Augmented Generation)とは何か?

CAGとは、頻繁にアクセスする情報をコンテキストウィンドウ内にキャッシュとして保持する手法です。RAGが「必要な時に検索する」のに対し、CAGは「常にアクセス可能な状態にしておく」アプローチです。

# engine/memory/working_memory.py
class WorkingMemory:
    """Layer 1: ワーキングメモリ (CAG)"""

    def __init__(self, max_tokens: int = 32000):
        self.max_tokens = max_tokens
        self.context_slots = {
            "system":      None,   # システムプロンプト (~2K tokens)
            "active_task": None,   # 現在のタスク (~1K tokens)
            "constraints": None,   # アクティブな制約 (~1K tokens)
            "recent_msgs": [],     # 直近の会話 (~20K tokens)
            "pinned":      [],     # ピン留めされた重要情報 (~4K tokens)
            "scratchpad":  None,   # 一時的な作業領域 (~4K tokens)
        }

    def add_message(self, message: dict):
        """メッセージを追加し、容量超過時は古いものを削除"""
        self.context_slots["recent_msgs"].append(message)

        while self._total_tokens() > self.max_tokens:
            # 最も古い非ピン留めメッセージを削除
            removed = self.context_slots["recent_msgs"].pop(0)
            # 重要なメッセージはエピソディックストアに移動
            if removed.get("importance", 0) > 0.7:
                self._promote_to_episodic(removed)

    def pin(self, key: str, content: str, reason: str):
        """重要情報をピン留めして常にコンテキストに保持"""
        self.context_slots["pinned"].append({
            "key": key,
            "content": content,
            "reason": reason,
            "pinned_at": datetime.now(),
        })

    def build_context(self) -> str:
        """LLMに送信するコンテキストを構築"""
        parts = []
        for slot_name, slot_value in self.context_slots.items():
            if slot_value:
                parts.append(f"[{slot_name}]\n{self._serialize(slot_value)}")
        return "\n\n".join(parts)

なぜCAGがRAGより速いのか?

項目 CAG RAG
検索時間 0ms(既にコンテキスト内) 50-500ms(ベクトル検索)
精度 100%(完全一致) 70-95%(類似度ベース)
容量 制限あり(~32K tokens) 事実上無制限
コスト トークン消費量増加 API呼び出しコスト
用途 頻繁にアクセスする情報 大量の知識ベース

ここから先は有料エリアです。Layer 2(エピソディックストア)の完全な実装、Layer 3(セマンティックストア/RAG)のChromaDB統合、Layer 4(手続き記憶)のパターン学習、蒸留パイプライン、そして構造化忘却戦略の全設計を公開します。


4. Layer 2: エピソディックストアはどう実装するのか?

エピソディックストアとは何か?

エピソディックストアとは、具体的な体験(エピソード)を時系列で記録する記憶層です。「何が起きたか」「いつ起きたか」「どう感じたか(重要度)」を保存します。

# engine/memory/episodic_store.py
from dataclasses import dataclass, field
from datetime import datetime
from typing import Optional
import json

@dataclass
class Episode:
    """一つの体験記録"""
    id: str
    timestamp: datetime
    agent_name: str
    event_type: str          # "decision" | "error" | "success" | "feedback"
    description: str
    context: dict            # その時の状況
    outcome: Optional[str]   # 結果(判明している場合)
    importance: float        # 0.0 - 1.0
    tags: list[str] = field(default_factory=list)
    references: list[str] = field(default_factory=list)  # 関連エピソードID
    access_count: int = 0
    last_accessed: Optional[datetime] = None
    decay_rate: float = 0.01  # 日次減衰率

class EpisodicStore:
    """Layer 2: エピソディックストア"""

    def __init__(self, store_path: str, max_entries: int = 10000):
        self.store_path = store_path
        self.max_entries = max_entries
        self.episodes: list[Episode] = self._load()

    def record(self, episode: Episode):
        """エピソードを記録"""
        self.episodes.append(episode)
        self._enforce_capacity()
        self._persist()

    def recall(
        self,
        query: str,
        agent_name: Optional[str] = None,
        event_type: Optional[str] = None,
        time_range: Optional[tuple] = None,
        top_k: int = 10,
    ) -> list[Episode]:
        """関連するエピソードを想起"""
        candidates = self.episodes

        # フィルタリング
        if agent_name:
            candidates = [e for e in candidates if e.agent_name == agent_name]
        if event_type:
            candidates = [e for e in candidates if e.event_type == event_type]
        if time_range:
            start, end = time_range
            candidates = [
                e for e in candidates
                if start <= e.timestamp <= end
            ]

        # スコアリング: 関連性 × 重要度 × 新鮮度
        scored = []
        for ep in candidates:
            relevance = self._text_similarity(query, ep.description)
            freshness = self._calculate_freshness(ep)
            score = relevance * 0.4 + ep.importance * 0.35 + freshness * 0.25
            scored.append((score, ep))

        scored.sort(key=lambda x: x[0], reverse=True)

        # アクセスカウント更新
        results = [ep for _, ep in scored[:top_k]]
        for ep in results:
            ep.access_count += 1
            ep.last_accessed = datetime.now()

        return results

    def _calculate_freshness(self, episode: Episode) -> float:
        """時間減衰を計算(指数減衰)"""
        days_old = (datetime.now() - episode.timestamp).days
        return max(0.0, 1.0 - episode.decay_rate * days_old)

    def _enforce_capacity(self):
        """容量超過時に低スコアのエピソードを蒸留・削除"""
        if len(self.episodes) <= self.max_entries:
            return

        # スコアが最も低いエピソードを特定
        scored = [
            (self._retention_score(ep), ep)
            for ep in self.episodes
        ]
        scored.sort(key=lambda x: x[0])

        # 下位10%を蒸留候補に
        to_distill = scored[:len(scored) // 10]
        for _, ep in to_distill:
            self._promote_to_semantic(ep)  # Layer 3に蒸留
            self.episodes.remove(ep)

    def _retention_score(self, episode: Episode) -> float:
        """保持スコア: 高いほど残す価値がある"""
        freshness = self._calculate_freshness(episode)
        access_freq = min(episode.access_count / 10, 1.0)
        return (
            episode.importance * 0.4
            + freshness * 0.3
            + access_freq * 0.3
        )

どのようなイベントがエピソードとして記録されるのか?

# 記録される体験の例
EPISODE_TRIGGERS = {
    "decision": {
        "description": "エージェントが重要な判断を行った",
        "importance_base": 0.7,
        "example": "CEO: 新機能の優先順位をセキュリティ > パフォーマンスに決定",
    },
    "error": {
        "description": "エラーが発生し、回復した",
        "importance_base": 0.8,
    },
    "success": {
        "description": "タスクが正常に完了した",
        "importance_base": 0.5,
        "example": "ContentGen: 記事005がAEOスコア92で品質ゲート通過",
    },
    "feedback": {
        "description": "人間からフィードバックを受けた",
        "importance_base": 0.9,
        "example": "Operator: '価格設定は¥500からスタートして'",
    },
    "conflict": {
        "description": "エージェント間で意見が衝突した",
        "importance_base": 0.85,
        "example": "CTO vs Security: 新しい依存パッケージの採用是非",
    },
}

5. Layer 3: セマンティックストア(RAG)はどう構築するのか?

ChromaDBによるベクトル検索の実装

# engine/memory/semantic_store.py
import chromadb
from chromadb.config import Settings

class SemanticStore:
    """Layer 3: セマンティックストア (RAG)"""

    def __init__(self, persist_dir: str = "./workspace/memory/chromadb"):
        self.client = chromadb.PersistentClient(
            path=persist_dir,
            settings=Settings(anonymized_telemetry=False),
        )
        self.collections = {
            "knowledge":  self._get_or_create("knowledge"),
            "code":       self._get_or_create("code"),
            "decisions":  self._get_or_create("decisions"),
            "feedback":   self._get_or_create("feedback"),
        }

    def _get_or_create(self, name: str):
        return self.client.get_or_create_collection(
            name=name,
            metadata={"hnsw:space": "cosine"},
        )

    def store(
        self,
        collection: str,
        documents: list[str],
        metadatas: list[dict],
        ids: list[str],
    ):
        """ドキュメントをベクトル化して保存"""
        self.collections[collection].upsert(
            documents=documents,
            metadatas=metadatas,
            ids=ids,
        )

    def query(
        self,
        collection: str,
        query_text: str,
        n_results: int = 5,
        where: Optional[dict] = None,
        where_document: Optional[dict] = None,
    ) -> list[dict]:
        """類似度ベースの検索"""
        results = self.collections[collection].query(
            query_texts=[query_text],
            n_results=n_results,
            where=where,
            where_document=where_document,
        )

        return [
            {
                "document": doc,
                "metadata": meta,
                "distance": dist,
                "id": id_,
            }
            for doc, meta, dist, id_ in zip(
                results["documents"][0],
                results["metadatas"][0],
                results["distances"][0],
                results["ids"][0],
            )
        ]

    def count(self, collection: str) -> int:
        return self.collections[collection].count()

コレクション設計のベストプラクティス

# コレクション分割の基準
COLLECTION_DESIGN = {
    "knowledge": {
        "purpose": "一般的な技術知識、ドキュメント",
        "retention": "permanent",
        "update_freq": "weekly",
        "max_docs": 50000,
    },
    "code": {
        "purpose": "コードスニペット、パターン、アンチパターン",
        "retention": "permanent",
        "update_freq": "on_commit",
        "max_docs": 30000,
    },
    "decisions": {
        "purpose": "蒸留された意思決定の記録",
        "retention": "1_year",
        "update_freq": "on_distill",
        "max_docs": 10000,
    },
    "feedback": {
        "purpose": "人間からのフィードバック、学習事項",
        "retention": "permanent",
        "update_freq": "on_receive",
        "max_docs": 5000,
    },
}

6. Layer 4: 手続き記憶はどう実装するのか?

手続き記憶とは何か?

手続き記憶とは、「やり方」を記憶する層です。人間でいえば「自転車の乗り方」「タイピング」のような、意識せずに実行できるスキルです。AIエージェントでは、最適化されたプロンプトテンプレートとワークフロー定義がこれに該当します。

# engine/memory/procedural_memory.py
@dataclass
class Procedure:
    """手続き記憶の1エントリ"""
    name: str
    trigger: str              # いつ発動するか
    template: str             # プロンプトテンプレート
    success_rate: float       # 成功率
    avg_quality_score: float  # 平均品質スコア
    execution_count: int      # 実行回数
    last_updated: datetime
    version: int

class ProceduralMemory:
    """Layer 4: 手続き記憶"""

    def __init__(self, store_path: str):
        self.procedures: dict[str, Procedure] = self._load(store_path)

    def get_procedure(self, task_type: str) -> Optional[Procedure]:
        """タスクタイプに最適な手続きを取得"""
        candidates = [
            p for p in self.procedures.values()
            if self._matches_trigger(task_type, p.trigger)
        ]

        if not candidates:
            return None

        # 成功率 × 品質スコアで最適なものを選択
        return max(
            candidates,
            key=lambda p: p.success_rate * 0.6 + p.avg_quality_score / 10 * 0.4,
        )

    def update_from_outcome(self, name: str, quality_score: float, success: bool):
        """実行結果をフィードバック"""
        proc = self.procedures.get(name)
        if not proc:
            return

        # 指数移動平均で更新
        alpha = 0.1
        proc.success_rate = (1 - alpha) * proc.success_rate + alpha * (1.0 if success else 0.0)
        proc.avg_quality_score = (1 - alpha) * proc.avg_quality_score + alpha * quality_score
        proc.execution_count += 1
        proc.last_updated = datetime.now()

    def evolve(self, name: str, new_template: str):
        """手続きを進化(バージョンアップ)"""
        proc = self.procedures[name]
        proc.template = new_template
        proc.version += 1
        proc.last_updated = datetime.now()

7. 蒸留パイプラインはどう機能するのか?

蒸留とは何か?

**蒸留(Distillation)**とは、大量の生データから本質的な知識を抽出・圧縮するプロセスです。エピソディックストアの100件の体験から、3つの教訓を抽出する——これが蒸留です。

# engine/memory/distillation_pipeline.py
class DistillationPipeline:
    """エピソード → セマンティック知識への蒸留"""

    def __init__(self, episodic: EpisodicStore, semantic: SemanticStore):
        self.episodic = episodic
        self.semantic = semantic

    async def distill(self, batch_size: int = 50):
        """定期蒸留の実行"""
        # 1. 蒸留候補のエピソードを取得
        candidates = self._get_distillation_candidates(batch_size)
        if not candidates:
            return

        # 2. エピソードをクラスタリング
        clusters = self._cluster_episodes(candidates)

        # 3. 各クラスタから知識を抽出
        for cluster in clusters:
            knowledge = await self._extract_knowledge(cluster)

            # 4. セマンティックストアに保存
            self.semantic.store(
                collection="decisions",
                documents=[knowledge.summary],
                metadatas=[{
                    "source_episodes": [e.id for e in cluster],
                    "distilled_at": datetime.now().isoformat(),
                    "confidence": knowledge.confidence,
                    "category": knowledge.category,
                }],
                ids=[f"distilled-{uuid4().hex[:8]}"],
            )

        # 5. 蒸留済みエピソードにマーク
        for ep in candidates:
            ep.tags.append("distilled")

    def _get_distillation_candidates(self, batch_size: int) -> list[Episode]:
        """蒸留候補:古い+アクセス頻度低い+未蒸留"""
        candidates = [
            ep for ep in self.episodic.episodes
            if "distilled" not in ep.tags
            and self.episodic._calculate_freshness(ep) < 0.5
        ]
        candidates.sort(key=lambda e: e.importance, reverse=True)
        return candidates[:batch_size]

    def _cluster_episodes(self, episodes: list[Episode]) -> list[list[Episode]]:
        """類似エピソードをクラスタリング"""
        clusters = []
        used = set()

        for ep in episodes:
            if ep.id in used:
                continue

            cluster = [ep]
            used.add(ep.id)

            for other in episodes:
                if other.id not in used:
                    similarity = self._episode_similarity(ep, other)
                    if similarity > 0.7:
                        cluster.append(other)
                        used.add(other.id)

            clusters.append(cluster)

        return clusters

    async def _extract_knowledge(self, cluster: list[Episode]) -> Knowledge:
        """エピソード群から知識を抽出(LLM使用)"""
        descriptions = "\n".join(
            f"- [{ep.timestamp.date()}] {ep.description}{ep.outcome}"
            for ep in cluster
        )

        prompt = f"""以下の関連するエピソード群から、汎用的な教訓・パターンを1つ抽出してください。

エピソード:
{descriptions}

出力形式:
- summary: 1-2文の要約
- category: decision_pattern | error_pattern | optimization | preference
- confidence: 0.0-1.0
"""
        response = await self.llm.generate(prompt)
        return self._parse_knowledge(response)

蒸留の実行タイミング

distillation_schedule:
  trigger: "daily_at_03:00"       # 深夜3時に実行
  batch_size: 50                  # 1回あたり最大50エピソード
  min_episodes_for_cluster: 3     # クラスタの最小サイズ
  freshness_threshold: 0.5        # これ以下の鮮度のものが対象
  max_duration_minutes: 10        # 最大実行時間

8. 構造化忘却戦略はどう設計するのか?

なぜ「忘れる」ことが重要なのか?

全てを記憶するシステムには3つの問題があります:

  1. 検索ノイズの増大: 古い不要な情報が検索結果に混入する
  2. 矛盾の蓄積: 古い情報と新しい情報が矛盾し、判断が不安定になる
  3. コストの増大: ストレージとベクトル検索のコストが線形に増加する

構造化忘却は「何を忘れるか」をルールに基づいて決定する戦略です。

忘却ポリシーの設計

# engine/memory/forgetting_strategy.py
@dataclass
class ForgettingPolicy:
    """忘却ポリシー"""
    name: str
    target_layer: str         # "episodic" | "semantic"
    condition: str            # 忘却条件
    action: str               # "delete" | "archive" | "compress"
    grace_period_days: int    # 猶予期間

FORGETTING_POLICIES = [
    ForgettingPolicy(
        name="stale_episodes",
        target_layer="episodic",
        condition="freshness < 0.1 AND access_count < 2 AND importance < 0.5",
        action="delete",
        grace_period_days=90,
    ),
    ForgettingPolicy(
        name="superseded_decisions",
        target_layer="semantic",
        condition="newer_version_exists AND age > 30_days",
        action="archive",
        grace_period_days=30,
    ),
    ForgettingPolicy(
        name="low_value_knowledge",
        target_layer="semantic",
        condition="access_count == 0 AND age > 180_days",
        action="compress",
        grace_period_days=180,
    ),
    ForgettingPolicy(
        name="resolved_errors",
        target_layer="episodic",
        condition="event_type == 'error' AND has_resolution AND age > 60_days",
        action="compress",  # エラーパターンは圧縮して保持
        grace_period_days=60,
    ),
]

class ForgettingEngine:
    """構造化忘却エンジン"""

    def __init__(self, episodic: EpisodicStore, semantic: SemanticStore):
        self.episodic = episodic
        self.semantic = semantic

    def execute_forgetting(self):
        """忘却ポリシーを実行"""
        for policy in FORGETTING_POLICIES:
            candidates = self._find_candidates(policy)
            logger.info(
                f"忘却ポリシー '{policy.name}': "
                f"{len(candidates)}件が対象"
            )

            for item in candidates:
                if policy.action == "delete":
                    self._delete(item, policy.target_layer)
                elif policy.action == "archive":
                    self._archive(item, policy.target_layer)
                elif policy.action == "compress":
                    self._compress(item, policy.target_layer)

    def _compress(self, item, layer: str):
        """詳細を削除し、要約のみを保持"""
        compressed = {
            "id": item.id,
            "summary": self._summarize(item),
            "original_date": item.timestamp.isoformat(),
            "compressed_at": datetime.now().isoformat(),
        }
        # 圧縮版で置換
        self._replace(item, compressed, layer)

忘却してはいけないものは何か?

# 忘却禁止リスト(Never Forget)
NEVER_FORGET = {
    "human_feedback":     "人間からのフィードバックは永久保持",
    "security_incidents": "セキュリティインシデントは永久保持",
    "manifesto_rules":    "マニフェスト(不変ルール)は永久保持",
    "financial_records":  "金銭に関する記録は永久保持",
    "compliance_events":  "コンプライアンス関連は永久保持",
}

9. ChromaDB統合の実装における注意点は何か?

パフォーマンスチューニング

# 本番環境でのChromaDB設定
CHROMADB_CONFIG = {
    "persist_directory": "./workspace/memory/chromadb",
    "anonymized_telemetry": False,
    "allow_reset": False,  # 本番では無効化

    # HNSW(近似最近傍探索)のパラメータ
    "hnsw_space": "cosine",       # コサイン類似度
    "hnsw_construction_ef": 200,  # インデックス構築時の精度
    "hnsw_search_ef": 100,        # 検索時の精度
    "hnsw_M": 16,                 # グラフの接続数
}

エンベディング戦略

# 日本語対応のエンベディング
# multilingual-e5-large がコスト/精度のバランスが最適
EMBEDDING_CONFIG = {
    "model": "intfloat/multilingual-e5-large",
    "dimension": 1024,
    "max_tokens": 512,     # 長い文書は分割が必要
    "batch_size": 32,
    "normalize": True,
}

def chunk_document(text: str, chunk_size: int = 400, overlap: int = 50) -> list[str]:
    """ドキュメントをオーバーラップ付きでチャンク分割"""
    sentences = text.split("")
    chunks = []
    current = []
    current_len = 0

    for sent in sentences:
        sent_len = len(sent)
        if current_len + sent_len > chunk_size and current:
            chunks.append("".join(current) + "")
            # オーバーラップ: 最後のN文字分を保持
            overlap_sents = []
            overlap_len = 0
            for s in reversed(current):
                if overlap_len + len(s) > overlap:
                    break
                overlap_sents.insert(0, s)
                overlap_len += len(s)
            current = overlap_sents
            current_len = overlap_len

        current.append(sent)
        current_len += sent_len

    if current:
        chunks.append("".join(current) + "")

    return chunks

10. RAGとCAGはどう使い分けるのか?

判断フローチャート

情報が必要になった
    │
    ├── 過去5分以内に参照した?
    │   └── YES → ワーキングメモリ(Layer 1/CAG)から取得
    │
    ├── 毎回のタスクで必要?
    │   └── YES → CAGにピン留め
    │
    ├── 特定のキーワードで検索できる?
    │   └── YES → RAG(Layer 3)で検索
    │
    ├── 過去の体験から判断したい?
    │   └── YES → エピソディックストア(Layer 2)を参照
    │
    └── 定型的な処理方法?
        └── YES → 手続き記憶(Layer 4)を参照

実装上の使い分け

シナリオ 使用層 理由
現在の会話の文脈理解 CAG (Layer 1) 常にコンテキスト内に必要
「先週の同じエラーの対処法」 Episodic (Layer 2) 時間+状況での検索
「FastAPIのミドルウェア設計」 RAG (Layer 3) 知識ベースの類似度検索
「記事を書く手順」 Procedural (Layer 4) 最適化済みテンプレート

11. まとめ — メモリアーキテクチャの実装チェックリスト

  • 4層メモリ(Working / Episodic / Semantic / Procedural)が定義されている
  • ワーキングメモリに容量制限とオーバーフロー処理がある
  • エピソディックストアに重要度スコアと時間減衰がある
  • ChromaDBでセマンティック検索が実装されている
  • 手続き記憶が実行結果から自動更新される
  • 蒸留パイプラインが定期実行されている
  • 忘却ポリシーが定義され、自動実行されている
  • 忘却禁止リスト(Never Forget)が設定されている
  • 日本語対応のエンベディングモデルが選択されている
  • CAGとRAGの使い分け基準が明確

AIエージェントのメモリ設計は、「全てを覚える」より**「何を覚え、何を忘れるかを戦略的に決める」**方がはるかに重要です。人間の記憶の仕組みに学び、効率的で信頼性の高いメモリアーキテクチャを構築しましょう。


この記事はAEGISプロジェクトの4層メモリアーキテクチャ設計に基づいています。

価格: ¥1,200


他の記事は noteZenn でも公開しています。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?