2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

ClickHouse×MCPで「学び続けるAIエージェント」を作る【前編】

Last updated at Posted at 2025-12-22

AIエージェントを本番運用していると、「なぜその回答になったのか」「どのツールをどう使ったのか」を後から追跡したくなる場面が必ず出てきます。本記事では、AIエージェントの動作ログをClickHouseに蓄積する仕組みを構築します。

前編ではログ基盤を作り、後編ではそのデータからナレッジを生成してRAG改善ループを回すところまでを扱います。

このシステムの目的と便益

目的

観測可能性(Observability)を作り、後編のRAG改善ループにつなげることです。

AIエージェントはブラックボックスになりがちです。何が起きているかを記録し、可視化し、改善に活かせる状態を作ることが本記事のゴールです。

便益

運用者視点

  • 原因究明が速い:障害時に「何が起きたか」を即座にトレースできます
  • 改善が属人化しない:ログがあれば誰でも分析・改善できます
  • コスト最適化できる:トークン消費やレイテンシを可視化し、ボトルネックを特定できます

利用者視点

  • 回答の安定性・再現性が上がる:同じ質問に対するブレを検知し、改善できます

前提とスコープ

前提

本記事は以下の環境を前提としています。

  1. 自社AIエージェント(オーケストレータ)が既にある
    • ユーザー入力を受け取り、LLMを呼び出し、必要に応じてツールを実行する自前のコード
  2. 業務ツール用MCPサーバも既にある
    • ファイル操作、DB検索、外部API呼び出し等を提供するMCPサーバ

この記事で作るもの

  • ログ用MCPサーバ(中継層)log_eventツールを提供し、ClickHouseへ書き込む
  • ClickHouseスキーマ:eventsテーブルのDDL

なぜ「モデルにログを任せない」のか

「毎ターン必ずlog_eventを呼べ」とLLMに指示する方法もありますが、保証が弱いという問題があります。モデルは指示を無視することがあり、確実性が担保できません。

確実にログを残したいなら、オーケストレータで強制記録するのが正解です。

オーケストレータの各フックポイントで記録:
├── ユーザー入力受領時
├── モデル呼び出し前
├── モデル応答受領時
├── ツール呼び出し前
├── ツール結果受領時
└── エラー発生時

モデルの気まぐれに依存せず、コードで強制的にログを残す設計にします。

アーキテクチャ(最小構成)

┌──────────┐
│  ユーザー │
└────┬─────┘
     ▼
┌──────────────────────────────────────────┐
│         オーケストレータ(自社エージェント)   │
│                                          │
│  ┌─────────────┐    ┌─────────────┐      │
│  │  LLM呼び出し │    │ 既存MCPサーバ  │     │
│  │  (Claude等) │    │ (ツール実行)   │     │
│  └─────────────┘    └─────────────┘      │
│          │                 │             │
│          └────────┬────────┘             │
│                   ▼                      │
│          ┌─────────────┐                 │
│          │ ログ用MCPサーバ│ ← この記事で作る │
│          │ (log_event)  │                │
│          └──────┬──────┘                 │
└─────────────────┼────────────────────────┘
                  ▼
           ┌───────────┐
           │ ClickHouse│
           │  (events) │
           └───────────┘

ポイント

  • 既存MCPサーバとログ用MCPサーバは分離(責務分離)
  • オーケストレータが各フックポイントでlog_eventを呼ぶ
  • ログ用MCPサーバはClickHouseへのInsertに専念

ログ設計の要件

後編でRAG改善ループを回すために、以下の3要件を満たすログ設計にします。

1. 相関(Correlation)

trace / turn / tool を紐付けられること。「この会話の中で、何番目のターンで、どのツールを呼んだか」が追えないと分析できません。

2. 再現性(Reproducibility)

モデル・プロンプト・設定・参照ドキュメントIDが追えること。「同じ条件で再実行したら同じ結果になるか」を検証できる状態を作ります。

3. 評価可能性(Evaluability)

検索・参照・結果のメタデータが残ること。後編で「どのドキュメントを参照したか」「ユーザーは満足したか」を評価するために必要です。

ClickHouseスキーマ(eventsのみで開始)

前編ではeventsテーブルのみで開始します。後編でknowledgeretrieval_metricsテーブルを追加します。

設計方針

  • MergeTreeエンジン(Append-onlyワークロードに最適)
  • ORDER BY (trace_id, ts, span_id) でトレース単位の検索を高速化
  • TTL 90日で古いデータを自動削除
  • 拡張用payloadでスキーマ進化に対応

DDL例:events

CREATE TABLE IF NOT EXISTS events
(
    -- 識別子
    event_id          UUID DEFAULT generateUUIDv4(),
    trace_id          String,
    span_id           String,
    parent_span_id    Nullable(String),

    -- タイムスタンプ
    ts                DateTime64(3) DEFAULT now64(3),

    -- イベント分類
    event_type        Enum8(
                        'user_input' = 1,
                        'llm_request' = 2,
                        'llm_response' = 3,
                        'tool_call' = 4,
                        'tool_result' = 5,
                        'error' = 6
                      ),
    role              Enum8(
                        'user' = 1,
                        'assistant' = 2,
                        'system' = 3,
                        'tool' = 4
                      ),

    -- コンテンツ(PIIや巨大データは避け、要約・メタ中心)
    content           String DEFAULT '',
    tool_name         Nullable(String),
    tool_args_masked  String DEFAULT '{}',
    tool_result_meta  String DEFAULT '{}',

    -- モデル情報
    model             String DEFAULT '',
    prompt_version    String DEFAULT '',

    -- メトリクス
    latency_ms        UInt32 DEFAULT 0,
    token_usage       String DEFAULT '{}',

    -- RAG関連(後編で活用)
    retrieved_doc_ids Array(String) DEFAULT [],

    -- 拡張用
    payload           String DEFAULT '{}',

    -- メタデータ
    created_at        DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (trace_id, ts, span_id)
TTL created_at + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;

補足

  • contentには生テキストではなく、要約やハッシュを入れることを推奨(PII対策、ストレージ削減)
  • tool_args_maskedはAPIキーや個人情報をマスクした状態で保存
  • retrieved_doc_idsは後編のRAG評価で使用

実装例:ログ用MCPサーバ(中継層)

この記事の中心となる実装です。log_eventツールを提供し、ClickHouseへInsertします。

log_event のツール定義(入力スキーマ)

LOG_EVENT_SCHEMA = {
    "name": "log_event",
    "description": "AIエージェントのイベントをClickHouseに記録する",
    "inputSchema": {
        "type": "object",
        "properties": {
            "event_id": {"type": "string", "description": "UUID(省略時は自動生成)"},
            "trace_id": {"type": "string", "description": "会話セッションID"},
            "span_id": {"type": "string", "description": "個別操作ID"},
            "parent_span_id": {"type": "string", "description": "親操作ID"},
            "event_type": {
                "type": "string",
                "enum": ["user_input", "llm_request", "llm_response",
                         "tool_call", "tool_result", "error"]
            },
            "role": {
                "type": "string",
                "enum": ["user", "assistant", "system", "tool"]
            },
            "content": {"type": "string", "description": "本文(最大10KB、超過時は切り詰め)"},
            "tool_name": {"type": "string"},
            "tool_args_masked": {"type": "string", "description": "マスク済みJSON"},
            "tool_result_meta": {"type": "string", "description": "結果メタJSON"},
            "model": {"type": "string"},
            "latency_ms": {"type": "integer"},
            "token_usage": {"type": "string"},
            "retrieved_doc_ids": {"type": "array", "items": {"type": "string"}},
            "payload": {"type": "string"}
        },
        "required": ["trace_id", "span_id", "event_type", "role"]
    }
}

MCPサーバ実装(コア部分)

"""ログ用MCPサーバ(コア部分のみ抜粋)"""

from mcp.server import Server
from mcp.types import Tool, TextContent
import httpx, json, asyncio
from uuid import uuid4
from datetime import datetime

server = Server("log-server")
event_buffer: list[dict] = []

@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
    if name != "log_event":
        raise ValueError(f"Unknown tool: {name}")

    # イベントを構築してバッファに追加
    event = {
        "event_id": arguments.get("event_id", str(uuid4())),
        "trace_id": arguments["trace_id"],
        "span_id": arguments["span_id"],
        "ts": datetime.now().isoformat(),
        "event_type": arguments["event_type"],
        "role": arguments["role"],
        # ... 他フィールド
    }
    event_buffer.append(event)

    # バッファが溜まったらClickHouseへ送信
    if len(event_buffer) >= 100:
        await flush_to_clickhouse()

    return [TextContent(type="text", text=json.dumps({"status": "ok"}))]


async def flush_to_clickhouse():
    """バッファをClickHouseへ書き込み"""
    global event_buffer
    if not event_buffer:
        return

    body = "\n".join(json.dumps(e) for e in event_buffer)
    event_buffer = []

    async with httpx.AsyncClient() as client:
        await client.post(
            "http://localhost:8123/?query=INSERT INTO events FORMAT JSONEachRow",
            content=body,
        )

実装のポイント

項目 説明
バッファリング 100件または一定時間でまとめてInsert(ClickHouseはバッチ書き込みが得意)
冪等性 event_idを事前生成し、リトライ時の重複を防止
マスキング tool_args_maskedでAPIキー等を除去してから保存
切り詰め contentが長すぎる場合は10KB程度で切り詰め

動作確認(クエリ例)

データが蓄積され始めたら、以下のクエリで確認できます。

トレース全体を取得

SELECT
    span_id,
    parent_span_id,
    event_type,
    role,
    tool_name,
    latency_ms,
    ts
FROM events
WHERE trace_id = 'trace_abc123'
ORDER BY ts ASC;

日別のイベント統計

SELECT
    toDate(ts) AS date,
    event_type,
    count() AS count,
    avg(latency_ms) AS avg_latency
FROM events
GROUP BY date, event_type
ORDER BY date DESC, count DESC;

ツール別のレイテンシ分析

SELECT
    tool_name,
    count() AS call_count,
    avg(latency_ms) AS avg_latency,
    quantile(0.95)(latency_ms) AS p95_latency
FROM events
WHERE event_type = 'tool_result' AND tool_name IS NOT NULL
GROUP BY tool_name
ORDER BY call_count DESC;

ここまでで何が嬉しいか

前編の内容だけで、以下のことが可能になります。

観点 できること
デバッグ 障害時にtrace_idで全操作をトレースし、原因を特定
コスト分析 トークン消費量をモデル別・日別に集計
パフォーマンス ツール別のレイテンシを継続的に計測、遅いツールを特定
監査 誰が、いつ、何を実行したかの記録が残る

次回予告

前編では「データを貯める」仕組みを構築しました。

後編では、蓄積したログからナレッジを自動生成し、RAGで検索可能にし、評価指標に基づいて改善ループを回す仕組みを構築します。

  • knowledgeテーブルとretrieval_metricsテーブルの追加
  • イベントログからナレッジを抽出するワーカー実装
  • ClickHouseからのRAG検索
  • 評価指標の自動記録(accept率、再回答率、引用カバレッジ)
  • 評価に基づくRAG設定の自動更新

前編で作ったeventsテーブルが、後編のすべての基盤になります。

後編に続く

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?