なぜ作ったか
趣味で作っているアプリで、あるSaaSのAPIを利用しています。開発中、APIドキュメントを何度も参照することになります。「このエンドポイントのパラメータは?」「このエラーコードの意味は?」
これまではClaude CodeのWeb検索を使っていました。自律的にドキュメントを探してくれるので便利なのですが、以下の不満がありました。
- 検索結果が不安定(関連性の低いページを参照してしまう)
- API仕様の細かい部分を拾いきれない
調べてみると、このSaaSのAPIドキュメントはMarkdown形式でも公開されていました。「RAGベースのMCPサーバーを立てれば、ローカルで高速に検索できるのでは?」と考え、運営者に問い合わせたところ、個人の開発用途なら問題ないとのこと。実装することにしました。
作ったもの
RAG MCPサーバーの全体像です。
前提
このシステムはローカル環境での利用のみを想定しています。外部公開は想定していません。
※ EmbeddingとベクトルDBはローカル、回答生成のみGemini APIを利用
主要機能
-
answer_question: RAG質問応答ツール -
search_docs: ベクトル検索のみツール -
mcp(MCP Python SDK) でstdio transportに対応
実際の動き
シンプルに「データ取得のスクリプトを書いて」とだけ依頼してみます。ページング方法もlimitの値も一切指定していません。
> データ取得APIを使って、2025年12月1日から12月31日までのデータを全件取得するPythonスクリプトを書いて
Claude Codeの動き
まず、不足している情報を確認してきます。
⏺ データ取得APIのスクリプトを作成します。いくつか確認させてください:
⏺ User answered Claude's questions:
⎿ · どのAPIからデータを取得しますか? → (このSaaSのAPI)
· 取得したいデータの種類は何ですか? → レコードデータ
· 取得したデータの出力形式は? → JSON
回答すると、MCPサーバーに問い合わせ始めます。
⏺ APIからレコードデータを取得してJSON形式で保存するスクリプトを作成します。
まず、レコードデータ取得の仕様を確認します。
⏺ api-docs-mcp - answer_question (MCP)
(query: "2025年12月1日から12月31日までのレコードを全件取得するには
どのAPIエンドポイントを使用しますか?日付範囲指定と
ページネーションの方法を教えてください。", top_k: 5)
⎿ レコードを取得するには、レコード一覧取得APIを使用し、クエリパラメータで
期間指定とページネーションを行います。
## 1. 使用するAPIエンドポイント
- エンドポイント: GET /api/v1/records
- 日付範囲: created_at_from / created_at_to
- ページング: limit(最大1000)でページサイズを指定
… (60行以上の詳細な仕様)
⏺ api-docs-mcp - answer_question (MCP)
(query: "APIの認証方法とベースURLを教えてください。
アクセストークンの取得方法も含めて。", top_k: 3)
⎿ APIの認証方法、アクセストークンの取得方法、および認証関連のURLについて
ご説明します。
## 1. 認証方法の概要
- 認証方式: OAuth 2.0 / Bearer Token
- ヘッダー: Authorization: Bearer {access_token}
… (59行以上の詳細な認証フロー)
⏺ api-docs-mcp - answer_question (MCP)
(query: "ベースURLとリクエスト例を教えてください。
レコード取得の際の完全なエンドポイントURLを知りたいです。", top_k: 3)
⎿ ベースURLとリクエスト例について、参考情報に基づき回答します。
### 1. APIエンドポイントの構造
- ベースURL: https://api.example.com/{account_id}
- レコードデータ: /api/v1/records
… (42行以上の実装例)
そして、MCPサーバーから取得した情報をもとにスクリプトを生成してくれます。
⏺ Write(fetch_records.py)
⎿ Wrote 87 lines to fetch_records.py
⏺ 2025年12月のレコードデータを全件取得するPythonスクリプトを作成しました。
スクリプトの主な機能
1. ページネーション対応: limit=1000で最大件数を取得し、全ページを自動取得
2. 日付範囲指定: 2025年12月1日〜31日を指定
3. エラーハンドリング: HTTPエラーやリクエストエラーに対応
4. レート制限対策: リクエスト間に0.5秒の待機時間
5. JSON保存: タイムスタンプ付きファイル名で保存
技術選定
採用した技術とその選定理由です。
| 技術 | 役割 | 選定理由 |
|---|---|---|
| MCP Python SDK | MCPプロトコル | Claude Code CLIとstdio transportで連携 |
| ChromaDB | ベクトルDB | ローカル完結でセットアップが簡単 |
| multilingual-e5-large | Embeddingモデル | 日本語精度が高くローカルで動作 |
| gemini-1.5-flash | LLM | 無料枠があり応答が速い |
| LangChain Text Splitters | テキスト分割 | Markdown見出し構造を認識した分割に対応 |
| uv | パッケージ管理 | pip/venvより高速で依存関係管理が楽 |
ディレクトリ構成
プロジェクト全体の構成です。
api-docs-mcp/
├── .venv/ # 仮想環境(uv syncで自動作成)
├── app/
│ ├── __init__.py
│ ├── mcp_server.py # MCP Server (stdio)
│ ├── config.py # 設定管理
│ ├── rag_pipeline.py # RAGパイプライン
│ └── services/
│ ├── vector_db.py # ChromaDB操作
│ ├── llm.py # Gemini API
│ └── cache.py # LRUキャッシュ
├── scripts/
│ └── build_index.py # インデックス構築(※ その他のスクリプトは省略)
├── data/
│ ├── processed/ # GitHubから取得したMarkdownドキュメント
│ └── parents.json # 親ドキュメント(メモリに読み込んで使用)
├── vector_store/ # ChromaDB(子チャンクを保存)
├── pyproject.toml # プロジェクト設定・依存関係
├── uv.lock # 依存関係のロックファイル
└── .env # 環境変数
依存関係と環境設定
実装に必要なパッケージと環境変数の構成です。
プロジェクト初期化
# プロジェクトディレクトリ作成
mkdir api-docs-mcp
cd api-docs-mcp
# pyproject.toml作成(依存関係を宣言)
cat > pyproject.toml <<EOF
[project]
name = "api-docs-mcp"
version = "0.1.0"
requires-python = ">=3.11"
dependencies = [
"chromadb",
"sentence-transformers",
"google-generativeai",
"mcp",
"langchain",
"langchain-text-splitters",
"python-dotenv",
]
EOF
# uvで依存関係を一括インストール
uv sync
環境変数
.env.example
# Gemini API
GEMINI_API_KEY=your_api_key_here
# Server Settings
LOG_LEVEL=INFO
# Vector DB Settings
VECTOR_STORE_PATH=./vector_store
COLLECTION_NAME=api_docs
# Embedding Model
EMBEDDING_MODEL=intfloat/multilingual-e5-large
# LLM Settings
LLM_MODEL=gemini-1.5-flash
LLM_TEMPERATURE=0.3
LLM_MAX_TOKENS=2048
# RAG Settings
TOP_K_RESULTS=5
MIN_SCORE=0.5
# Cache Settings
CACHE_ENABLED=true
CACHE_TTL=3600
CACHE_MAX_SIZE=100
※記事中のコードは簡略版のため、一部の設定値(TOP_K_RESULTS、CACHE_*など)は省略しています。
ドキュメントデータの準備
GitHubからcloneしたMarkdownドキュメントをRAG用に加工しました。
親子チャンク方式
RAGを作ってみて分かったのは、チャンクサイズを適当に決めると簡単に失敗するという点でした。当初はシンプルにドキュメント全体を500文字で分割していましたが、「関連情報が見つかりません」という回答が返ることがありました。原因は文脈の不足。タイトルとその詳細説明が別々のチャンクに分かれてしまい、LLMが十分な情報を得られなかったためです。
この問題を解決するため、ParentDocumentRetriever方式を採用しました。検索は小さなチャンク、回答生成は大きなセクションという2層構造です。
-
親ドキュメント(文脈保持用)
- Markdownのh1/h2見出し単位で分割
- セクション全体の文脈を保持
- LLMへの入力に使用
-
子チャンク(検索用)
- 親を500文字程度に分割(オーバーラップ100文字)
- ベクトル検索に使用
- 検索ヒット時に対応する親ドキュメントを返す
仕組み
検索クエリ
↓
子チャンクでベクトル検索(精度重視)
↓
ヒットした子チャンクから親IDを取得
↓
親ドキュメント全体をLLMに渡す(文脈重視)
↓
詳細な回答生成
この方式により、検索精度と回答品質の両方を確保できました。
インデックス構築
親ドキュメントはdata/parents.jsonにJSON形式で保存し、VectorDBServiceの初期化時にメモリに読み込みます。子チャンクのmetadataにparent_idを持たせることで、検索時に親を取得できる仕組みです。
# scripts/build_index.py(簡略版)
# ※ エラーハンドリングや詳細設定は省略
from langchain_text_splitters import (
MarkdownHeaderTextSplitter,
RecursiveCharacterTextSplitter
)
# 親ドキュメント作成(h1/h2単位)
parent_splitter = MarkdownHeaderTextSplitter(
headers_to_split_on=[("#", "h1"), ("##", "h2")]
)
parent_docs = parent_splitter.split_text(markdown_content)
# 子チャンク作成(検索用)
child_splitter = RecursiveCharacterTextSplitter(
chunk_size=500,
chunk_overlap=100
)
all_parents = []
all_children = []
for parent_doc in parent_docs:
parent_id = str(uuid.uuid4())
# 親ドキュメントを保存
parent_data = {
"parent_id": parent_id,
"text": parent_doc.page_content,
# ...
}
all_parents.append(parent_data)
# 親から子チャンクを作成
child_texts = child_splitter.split_text(parent_doc.page_content)
# 子チャンクにparent_idを紐付け
for child_text in child_texts:
child_data = {
"text": child_text,
"parent_id": parent_id, # 親への参照
# ...
}
all_children.append(child_data)
# 子チャンクのみEmbedding化してChromaDBに保存
child_texts_with_prefix = [f"passage: {c['text']}" for c in all_children]
embeddings = embedding_model.encode(child_texts_with_prefix)
collection.add(embeddings=embeddings, ...)
このスクリプトを実行すると、data/parents.json(親ドキュメント)とvector_store/(子チャンクのベクトルDB)が生成されます。
RAGパイプライン実装
RAGシステムの中核となる3つのサービス(VectorDB、LLM、RAGPipeline)を実装します。ベクトル検索でドキュメントを取得し、LLMで回答を生成する流れです。
VectorDBService(ChromaDB操作)
# app/services/vector_db.py(簡略版)
import json
import chromadb
from sentence_transformers import SentenceTransformer
class VectorDBService:
def __init__(
self,
vector_store_path: str,
collection_name: str,
embedding_model: str,
parents_file: str = "data/parents.json"
):
# ChromaDB初期化
self.client = chromadb.PersistentClient(path=vector_store_path)
# コレクション取得
self.collection = self.client.get_collection(name=collection_name)
# Embeddingモデル初期化
self.embedding_model = SentenceTransformer(embedding_model)
# 親ドキュメントをメモリに読み込み
with open(parents_file, 'r', encoding='utf-8') as f:
parents = json.load(f)
self.parents_map = {p["parent_id"]: p for p in parents}
def search(self, query: str, top_k: int = 5):
# クエリをEmbedding化
query_text = f"query: {query}"
query_embedding = self.embedding_model.encode(query_text)
# ベクトル検索(子チャンクを検索)
results = self.collection.query(
query_embeddings=[query_embedding.tolist()],
n_results=top_k
)
return results
def search_with_parents(self, query: str, top_k: int = 5, min_score: float = 0.5):
"""子チャンクで検索し、親ドキュメントを返す"""
# 親単位で集約するので、子チャンクは多めに取得
child_results = self.search(query, top_k=top_k * 3)
# 親IDごとにグループ化
parent_groups = {}
for i in range(len(child_results['documents'][0])):
parent_id = child_results['metadatas'][0][i].get("parent_id")
if parent_id not in parent_groups:
parent_groups[parent_id] = []
# NOTE: ChromaDBが返すdistanceはコレクションの距離関数(cosine/l2等)に依存する。
# ここでは便宜上「distanceが小さいほど関連度が高い」とみなし、簡易スコアとして 1 - distance を使っている。
# 環境によっては負値になり得るため、閾値やrelevance_scoreの扱いは要調整。
score = max(0.0, 1.0 - child_results['distances'][0][i])
# スコア閾値でフィルタリング
if score < min_score:
continue
parent_groups[parent_id].append({
"text": child_results['documents'][0][i],
"score": score
})
# 親ドキュメントを取得
parents = []
for parent_id, children in parent_groups.items():
parent_data = self.parents_map.get(parent_id)
if parent_data:
best_score = max(c["score"] for c in children)
parents.append({
"parent_id": parent_id,
"text": parent_data["text"],
"doc_title": parent_data["doc_title"],
"doc_url": parent_data["doc_url"],
"child_score": best_score,
"child_count": len(children)
})
# スコアでソートしてtop_k件
parents.sort(key=lambda x: x["child_score"], reverse=True)
return parents[:top_k]
LLMService(Gemini API)
# app/services/llm.py(簡略版)
import google.generativeai as genai
class LLMService:
def __init__(self, model_name, temperature, max_tokens):
genai.configure(api_key=os.getenv("GEMINI_API_KEY"))
self.model = genai.GenerativeModel(model_name)
self.temperature = temperature
self.max_tokens = max_tokens
def answer_with_context(self, query: str, context_chunks: list):
"""親ドキュメントのリストから回答を生成"""
# コンテキストを構築
context_parts = []
for i, chunk in enumerate(context_chunks, 1):
context_parts.append(f"""## 参考情報 [{i}]
**ドキュメント**: {chunk['doc_title']}
**URL**: {chunk['doc_url']}
**内容**:
{chunk['text']}
""")
context = "\n\n".join(context_parts)
prompt = f"""以下のドキュメントを参考に質問に回答してください。
【重要な制約】
- ドキュメントに記載がない内容は「ドキュメントに記載がありません」と答えてください
- 回答する場合は、根拠となるドキュメントのURLを明記してください
- 推測が必要な場合は「推測ですが」と明記してください
- 回答の最後に「Sources:」を付けて、参照したドキュメントのURLを箇条書きで列挙してください
{context}
【質問】
{query}
【回答】
"""
response = self.model.generate_content(
prompt,
generation_config={
"temperature": self.temperature,
"max_output_tokens": self.max_tokens,
}
)
return {
"answer": response.text,
"prompt_length": len(prompt),
"answer_length": len(response.text)
}
RAGPipeline統合
# app/rag_pipeline.py(簡略版)
class RAGPipeline:
def __init__(
self,
vector_store_path: str = "./vector_store",
collection_name: str = "api_docs",
embedding_model: str = "intfloat/multilingual-e5-large",
llm_model: str = "gemini-1.5-flash",
cache_enabled: bool = True,
top_k: int = 5,
min_score: float = 0.5,
# ... その他の設定(temperature, max_tokens, cache詳細設定など)
):
self.top_k = top_k
self.min_score = min_score
# サービス初期化
self.vector_db = VectorDBService(
vector_store_path=vector_store_path,
collection_name=collection_name,
embedding_model=embedding_model,
parents_file="data/parents.json"
)
self.llm = LLMService(
model_name=llm_model,
temperature=0.3,
max_tokens=2048
)
self.cache = QueryCache(...) if cache_enabled else None
def search_docs(self, query: str, top_k: int = None):
"""ドキュメント検索のみ実行(回答生成なし)"""
results = self.vector_db.search_with_parents(
query=query,
top_k=top_k or self.top_k,
min_score=self.min_score
)
return results
def answer_question(self, query: str, top_k: int = 5):
# キャッシュチェック
if self.cache:
cached = self.cache.get(query)
if cached:
return cached
# ベクトル検索(親ドキュメントを取得)
search_results = self.vector_db.search_with_parents(
query=query,
top_k=top_k,
min_score=self.min_score
)
if not search_results:
return {
"answer": "申し訳ございません。関連する情報が見つかりませんでした。",
"sources": [],
"relevance_score": 0.0
}
# LLM生成(親ドキュメントのリストを渡す)
llm_response = self.llm.answer_with_context(query, search_results)
# 関連度計算(距離ベースの簡易指標)
avg_score = sum(r["child_score"] for r in search_results) / len(search_results)
# 結果を辞書で返す
result = {
"answer": llm_response["answer"],
"sources": search_results,
"relevance_score": round(avg_score, 4) # 検索結果の平均関連度(距離ベースの簡易指標)
}
# キャッシュ保存
if self.cache:
self.cache.set(query, result)
return result
なお、relevance_scoreは検索結果の距離から計算した簡易的な関連度スコアであり、統計的な信頼度や確率ではありません。また、MIN_SCOREは精度と再現率のトレードオフがあり、検索結果が0件になる場合は閾値を下げて調整が必要です。
MCP Server実装
Claude Code CLIから呼び出せるMCPサーバーを実装します。stdio transportで通信し、バックグラウンドでRAGパイプラインを初期化することで、接続タイムアウトを回避します。
なお、サーバー起動前にscripts/build_index.pyでベクトルインデックスを構築しておく必要があります(未構築の場合、コレクション取得に失敗します)。
# app/mcp_server.py(簡略版)
# グローバルRAGパイプライン
rag_pipeline: RAGPipeline | None = None
# RAG初期化完了イベント
rag_initialized = asyncio.Event()
def initialize_rag():
"""RAGパイプラインを初期化"""
global rag_pipeline
rag_pipeline = RAGPipeline(
vector_store_path=settings.resolve_path(settings.vector_store_path),
collection_name=settings.collection_name,
embedding_model=settings.embedding_model,
llm_model=settings.llm_model,
# ... その他の設定
)
async def initialize_rag_async():
"""非同期でRAGパイプラインを初期化
バックグラウンドで初期化を実行し、完了時にイベントをセット。
Embeddingモデルのロードに数秒かかるため、
サーバー起動をブロックしないよう非同期で実行する。
"""
try:
await asyncio.to_thread(initialize_rag)
rag_initialized.set() # 初期化完了を通知
except Exception as e:
logger.error(f"RAG初期化エラー: {e}")
rag_initialized.set()
raise
# MCPサーバーインスタンス
app = Server("api-docs-mcp")
@app.list_tools()
async def list_tools() -> list[Tool]:
"""利用可能なツールのリストを返す"""
return [
Tool(
name="answer_question",
description="APIドキュメントに関する質問に回答します",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "質問内容"
},
"top_k": {
"type": "integer",
"description": "検索結果数(デフォルト: 5)",
"default": 5
}
},
"required": ["query"]
}
),
Tool(
name="search_docs",
description="ドキュメントをベクトル検索します",
inputSchema={
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "検索クエリ"
},
"top_k": {
"type": "integer",
"description": "取得する結果数(デフォルト: 3)",
"default": 3
}
},
"required": ["query"]
}
)
]
@app.call_tool()
async def call_tool(name: str, arguments: Any) -> CallToolResult:
"""ツールを実行"""
# RAG初期化完了を待つ
if not rag_initialized.is_set():
await rag_initialized.wait()
if rag_pipeline is None:
return CallToolResult(
content=[TextContent(type="text", text="エラー: RAGパイプラインが初期化されていません")]
)
if name == "answer_question":
query = arguments.get("query")
top_k = arguments.get("top_k", 5)
result = rag_pipeline.answer_question(query=query, top_k=top_k)
# ソース情報を整形(LLMの出力に加えて、参照先を明示するため)
# トップ3件のみ表示(冗長になるのを防ぐ)
sources_text = "\n\n## 参考ドキュメント\n"
for i, source in enumerate(result["sources"][:3], 1):
sources_text += f"\n### [{i}] {source.get('doc_title', 'N/A')}\n"
sources_text += f"- URL: {source.get('doc_url', 'N/A')}\n"
response_text = f"{result['answer']}\n{sources_text}"
return CallToolResult(
content=[TextContent(type="text", text=response_text)]
)
elif name == "search_docs":
query = arguments.get("query")
top_k = arguments.get("top_k", 3) # 検索のみなので少なめにデフォルト設定
results = rag_pipeline.search_docs(query=query, top_k=top_k)
# 検索結果を整形
response_text = f"## 検索結果 ({len(results)}件)\n\n"
for i, result in enumerate(results, 1):
response_text += f"### [{i}] {result.get('doc_title', 'N/A')}\n"
response_text += f"- URL: {result.get('doc_url', 'N/A')}\n"
response_text += f"- テキスト: {result.get('text', '')[:200]}...\n\n"
return CallToolResult(
content=[TextContent(type="text", text=response_text)]
)
else:
return CallToolResult(
content=[TextContent(type="text", text=f"エラー: 未知のツール '{name}'")]
)
async def main():
"""MCPサーバーを起動"""
# RAGパイプライン初期化をバックグラウンドタスクとして開始
initialization_task = asyncio.create_task(initialize_rag_async())
# すぐにサーバーを起動(初期化完了を待たない)
async with stdio_server() as (read_stream, write_stream):
await app.run(
read_stream,
write_stream,
app.create_initialization_options()
)
# サーバー終了時に初期化タスクの完了を待つ
if not initialization_task.done():
await initialization_task
if __name__ == "__main__":
# NOTE: ログ設定の詳細は省略(ファイルに出力する必要がある)
asyncio.run(main())
実装のポイント
-
バックグラウンド初期化の必要性
- Embeddingモデル(multilingual-e5-large)のロードに数秒かかる
- この間、
app.run()を呼び出さないとMCPサーバーが応答できない - Claude Code CLIは接続タイムアウトするため、同期的に初期化すると接続失敗する
- 解決策:
asyncio.create_task()で初期化をバックグラウンド実行し、すぐにサーバーを起動
-
stdio通信の保護
- MCP stdio transportは標準入出力でJSON通信を行う
- 標準エラー出力やログ出力が混入すると通信が壊れる
- 解決策:
sys.stderrとログをファイルにリダイレクト
Claude Code連携
実装したMCPサーバーをClaude Code CLIから利用できるよう設定します。プロジェクトの.mcp.jsonに以下の設定を追加します。
{
"mcpServers": {
"api-docs-mcp": {
"command": "/path/to/api-docs-mcp/.venv/bin/python",
"args": ["-u", "/path/to/api-docs-mcp/app/mcp_server.py"]
}
}
}
Claude Code CLIを再起動すると、自動的にMCPサーバーが起動します。
使ってみた感想
実際に数週間使ってみて、想像以上に開発体験が改善されました。
何がうれしいか
「データ取得のスクリプトを書いて」というざっくりした依頼だけで、Claude Codeが自律的にMCPサーバーに問い合わせて、必要な仕様を収集してくれます。不足している情報(どのAPI?どのデータ?等)は質問形式で確認してくれるため、開発フローが途切れなくなりました。
Web検索との違い
以前はClaude CodeのWeb検索も利用していましたが、探索に時間がかかったり、大量のコンテキストを読み込んで肝心の仕様が漏れることがありました。MCPサーバー導入後は、ローカルのRAGで即座に必要な情報だけを取得できるため、開発のテンポが良くなりました。
今後の展望
ドキュメント更新の自動検知や、コード例と説明文を関連付ける高度なチャンキング、図表のマルチモーダル対応など、改善の余地はまだあります。ただ、現状でも十分実用的で、日常的に使えるツールになっています。
まとめ
Claude CodeからAPI仕様を直接質問できるRAG MCPサーバーを作りました。このアプローチは他のドキュメントにも応用できます。OSSのフレームワークドキュメントや、利用が許可されている技術ドキュメントなど、テキストベースであれば同様の仕組みを構築できます。
ただし、このシステムは親ドキュメント全文をGemini APIに送信する設計です。機密情報を含むドキュメントを使用する場合は注意が必要です。また、ドキュメントを学習させる際は、利用規約やライセンスを必ず確認してください。
ドキュメントを頻繁に参照する開発や、Claude Codeを日常的に使っている方の参考になれば幸いです。