AIエージェントを本番運用していると、「なぜその回答になったのか」「どのツールをどう使ったのか」を後から追跡したくなる場面が必ず出てきます。本記事では、AIエージェントの動作ログをClickHouseに蓄積する仕組みを構築します。
前編ではログ基盤を作り、後編ではそのデータからナレッジを生成してRAG改善ループを回すところまでを扱います。
このシステムの目的と便益
目的
観測可能性(Observability)を作り、後編のRAG改善ループにつなげることです。
AIエージェントはブラックボックスになりがちです。何が起きているかを記録し、可視化し、改善に活かせる状態を作ることが本記事のゴールです。
便益
運用者視点
- 原因究明が速い:障害時に「何が起きたか」を即座にトレースできます
- 改善が属人化しない:ログがあれば誰でも分析・改善できます
- コスト最適化できる:トークン消費やレイテンシを可視化し、ボトルネックを特定できます
利用者視点
- 回答の安定性・再現性が上がる:同じ質問に対するブレを検知し、改善できます
前提とスコープ
前提
本記事は以下の環境を前提としています。
-
自社AIエージェント(オーケストレータ)が既にある
- ユーザー入力を受け取り、LLMを呼び出し、必要に応じてツールを実行する自前のコード
-
業務ツール用MCPサーバも既にある
- ファイル操作、DB検索、外部API呼び出し等を提供するMCPサーバ
この記事で作るもの
-
ログ用MCPサーバ(中継層):
log_eventツールを提供し、ClickHouseへ書き込む - ClickHouseスキーマ:eventsテーブルのDDL
なぜ「モデルにログを任せない」のか
「毎ターン必ずlog_eventを呼べ」とLLMに指示する方法もありますが、保証が弱いという問題があります。モデルは指示を無視することがあり、確実性が担保できません。
確実にログを残したいなら、オーケストレータで強制記録するのが正解です。
オーケストレータの各フックポイントで記録:
├── ユーザー入力受領時
├── モデル呼び出し前
├── モデル応答受領時
├── ツール呼び出し前
├── ツール結果受領時
└── エラー発生時
モデルの気まぐれに依存せず、コードで強制的にログを残す設計にします。
アーキテクチャ(最小構成)
┌──────────┐
│ ユーザー │
└────┬─────┘
▼
┌──────────────────────────────────────────┐
│ オーケストレータ(自社エージェント) │
│ │
│ ┌─────────────┐ ┌─────────────┐ │
│ │ LLM呼び出し │ │ 既存MCPサーバ │ │
│ │ (Claude等) │ │ (ツール実行) │ │
│ └─────────────┘ └─────────────┘ │
│ │ │ │
│ └────────┬────────┘ │
│ ▼ │
│ ┌─────────────┐ │
│ │ ログ用MCPサーバ│ ← この記事で作る │
│ │ (log_event) │ │
│ └──────┬──────┘ │
└─────────────────┼────────────────────────┘
▼
┌───────────┐
│ ClickHouse│
│ (events) │
└───────────┘
ポイント
- 既存MCPサーバとログ用MCPサーバは分離(責務分離)
- オーケストレータが各フックポイントで
log_eventを呼ぶ - ログ用MCPサーバはClickHouseへのInsertに専念
ログ設計の要件
後編でRAG改善ループを回すために、以下の3要件を満たすログ設計にします。
1. 相関(Correlation)
trace / turn / tool を紐付けられること。「この会話の中で、何番目のターンで、どのツールを呼んだか」が追えないと分析できません。
2. 再現性(Reproducibility)
モデル・プロンプト・設定・参照ドキュメントIDが追えること。「同じ条件で再実行したら同じ結果になるか」を検証できる状態を作ります。
3. 評価可能性(Evaluability)
検索・参照・結果のメタデータが残ること。後編で「どのドキュメントを参照したか」「ユーザーは満足したか」を評価するために必要です。
ClickHouseスキーマ(eventsのみで開始)
前編ではeventsテーブルのみで開始します。後編でknowledgeとretrieval_metricsテーブルを追加します。
設計方針
- MergeTreeエンジン(Append-onlyワークロードに最適)
-
ORDER BY
(trace_id, ts, span_id)でトレース単位の検索を高速化 - TTL 90日で古いデータを自動削除
- 拡張用payloadでスキーマ進化に対応
DDL例:events
CREATE TABLE IF NOT EXISTS events
(
-- 識別子
event_id UUID DEFAULT generateUUIDv4(),
trace_id String,
span_id String,
parent_span_id Nullable(String),
-- タイムスタンプ
ts DateTime64(3) DEFAULT now64(3),
-- イベント分類
event_type Enum8(
'user_input' = 1,
'llm_request' = 2,
'llm_response' = 3,
'tool_call' = 4,
'tool_result' = 5,
'error' = 6
),
role Enum8(
'user' = 1,
'assistant' = 2,
'system' = 3,
'tool' = 4
),
-- コンテンツ(PIIや巨大データは避け、要約・メタ中心)
content String DEFAULT '',
tool_name Nullable(String),
tool_args_masked String DEFAULT '{}',
tool_result_meta String DEFAULT '{}',
-- モデル情報
model String DEFAULT '',
prompt_version String DEFAULT '',
-- メトリクス
latency_ms UInt32 DEFAULT 0,
token_usage String DEFAULT '{}',
-- RAG関連(後編で活用)
retrieved_doc_ids Array(String) DEFAULT [],
-- 拡張用
payload String DEFAULT '{}',
-- メタデータ
created_at DateTime64(3) DEFAULT now64(3)
)
ENGINE = MergeTree()
ORDER BY (trace_id, ts, span_id)
TTL created_at + INTERVAL 90 DAY
SETTINGS index_granularity = 8192;
補足
-
contentには生テキストではなく、要約やハッシュを入れることを推奨(PII対策、ストレージ削減) -
tool_args_maskedはAPIキーや個人情報をマスクした状態で保存 -
retrieved_doc_idsは後編のRAG評価で使用
実装例:ログ用MCPサーバ(中継層)
この記事の中心となる実装です。log_eventツールを提供し、ClickHouseへInsertします。
log_event のツール定義(入力スキーマ)
LOG_EVENT_SCHEMA = {
"name": "log_event",
"description": "AIエージェントのイベントをClickHouseに記録する",
"inputSchema": {
"type": "object",
"properties": {
"event_id": {"type": "string", "description": "UUID(省略時は自動生成)"},
"trace_id": {"type": "string", "description": "会話セッションID"},
"span_id": {"type": "string", "description": "個別操作ID"},
"parent_span_id": {"type": "string", "description": "親操作ID"},
"event_type": {
"type": "string",
"enum": ["user_input", "llm_request", "llm_response",
"tool_call", "tool_result", "error"]
},
"role": {
"type": "string",
"enum": ["user", "assistant", "system", "tool"]
},
"content": {"type": "string", "description": "本文(最大10KB、超過時は切り詰め)"},
"tool_name": {"type": "string"},
"tool_args_masked": {"type": "string", "description": "マスク済みJSON"},
"tool_result_meta": {"type": "string", "description": "結果メタJSON"},
"model": {"type": "string"},
"latency_ms": {"type": "integer"},
"token_usage": {"type": "string"},
"retrieved_doc_ids": {"type": "array", "items": {"type": "string"}},
"payload": {"type": "string"}
},
"required": ["trace_id", "span_id", "event_type", "role"]
}
}
MCPサーバ実装(コア部分)
"""ログ用MCPサーバ(コア部分のみ抜粋)"""
from mcp.server import Server
from mcp.types import Tool, TextContent
import httpx, json, asyncio
from uuid import uuid4
from datetime import datetime
server = Server("log-server")
event_buffer: list[dict] = []
@server.call_tool()
async def call_tool(name: str, arguments: dict) -> list[TextContent]:
if name != "log_event":
raise ValueError(f"Unknown tool: {name}")
# イベントを構築してバッファに追加
event = {
"event_id": arguments.get("event_id", str(uuid4())),
"trace_id": arguments["trace_id"],
"span_id": arguments["span_id"],
"ts": datetime.now().isoformat(),
"event_type": arguments["event_type"],
"role": arguments["role"],
# ... 他フィールド
}
event_buffer.append(event)
# バッファが溜まったらClickHouseへ送信
if len(event_buffer) >= 100:
await flush_to_clickhouse()
return [TextContent(type="text", text=json.dumps({"status": "ok"}))]
async def flush_to_clickhouse():
"""バッファをClickHouseへ書き込み"""
global event_buffer
if not event_buffer:
return
body = "\n".join(json.dumps(e) for e in event_buffer)
event_buffer = []
async with httpx.AsyncClient() as client:
await client.post(
"http://localhost:8123/?query=INSERT INTO events FORMAT JSONEachRow",
content=body,
)
実装のポイント
| 項目 | 説明 |
|---|---|
| バッファリング | 100件または一定時間でまとめてInsert(ClickHouseはバッチ書き込みが得意) |
| 冪等性 |
event_idを事前生成し、リトライ時の重複を防止 |
| マスキング |
tool_args_maskedでAPIキー等を除去してから保存 |
| 切り詰め |
contentが長すぎる場合は10KB程度で切り詰め |
動作確認(クエリ例)
データが蓄積され始めたら、以下のクエリで確認できます。
トレース全体を取得
SELECT
span_id,
parent_span_id,
event_type,
role,
tool_name,
latency_ms,
ts
FROM events
WHERE trace_id = 'trace_abc123'
ORDER BY ts ASC;
日別のイベント統計
SELECT
toDate(ts) AS date,
event_type,
count() AS count,
avg(latency_ms) AS avg_latency
FROM events
GROUP BY date, event_type
ORDER BY date DESC, count DESC;
ツール別のレイテンシ分析
SELECT
tool_name,
count() AS call_count,
avg(latency_ms) AS avg_latency,
quantile(0.95)(latency_ms) AS p95_latency
FROM events
WHERE event_type = 'tool_result' AND tool_name IS NOT NULL
GROUP BY tool_name
ORDER BY call_count DESC;
ここまでで何が嬉しいか
前編の内容だけで、以下のことが可能になります。
| 観点 | できること |
|---|---|
| デバッグ | 障害時にtrace_idで全操作をトレースし、原因を特定 |
| コスト分析 | トークン消費量をモデル別・日別に集計 |
| パフォーマンス | ツール別のレイテンシを継続的に計測、遅いツールを特定 |
| 監査 | 誰が、いつ、何を実行したかの記録が残る |
次回予告
前編では「データを貯める」仕組みを構築しました。
後編では、蓄積したログからナレッジを自動生成し、RAGで検索可能にし、評価指標に基づいて改善ループを回す仕組みを構築します。
-
knowledgeテーブルとretrieval_metricsテーブルの追加 - イベントログからナレッジを抽出するワーカー実装
- ClickHouseからのRAG検索
- 評価指標の自動記録(accept率、再回答率、引用カバレッジ)
- 評価に基づくRAG設定の自動更新
前編で作ったeventsテーブルが、後編のすべての基盤になります。
後編に続く