はじめに
この記事は、QiitaのModel Context Protocol(以下、MCP)解説シリーズの第15回です。
今回は、MCPをより実用的に活用するための応用編として、データベース連携に挑戦します。特に、手軽で軽量なSQLiteデータベースを読み取り専用で安全に公開する方法を解説します。
💡 なぜデータベース連携が重要なのか?
MCPの強みは、LLMが外部データにアクセスできることです。これまではテキストファイルや静的なデータを使ってきましたが、実際のアプリケーションでは、頻繁に更新される情報や、複雑な構造を持つデータを扱う必要があります。
データベース連携のメリット
- リアルタイム性の確保: 最新の在庫情報、顧客データ、売上データなど、常に変動する情報をLLMに提供できます
- 効率的なデータアクセス: 必要な情報だけをクエリで取得できるため、メモリや帯域幅を節約し、LLMの応答速度を向上させます
- データの構造化: データベースは情報を構造化して格納するため、LLMがデータを正確に理解し、利用しやすくなります
- スケーラビリティ: 大量のデータを効率的に処理できます
🛠️ ステップ1:プロジェクトのセットアップとデータベースの準備
今回はTypeScriptを使って実装します。一貫性を保つため、これまでのシリーズと同じ言語を使用します。
プロジェクトの初期化
mkdir mcp-sqlite-example
cd mcp-sqlite-example
npm init -y
npm install @modelcontextprotocol/sdk sqlite3 zod
npm install -D typescript ts-node @types/node @types/sqlite3
npx tsc --init
サンプルデータベースの作成
create-database.jsファイルを作成してサンプルデータを準備します:
const sqlite3 = require('sqlite3').verbose();
// データベースファイルを作成
const db = new sqlite3.Database('products.db');
db.serialize(() => {
// テーブルの作成
db.run(`CREATE TABLE IF NOT EXISTS products (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL,
category TEXT NOT NULL,
price REAL NOT NULL,
description TEXT,
stock_quantity INTEGER DEFAULT 0,
created_at DATETIME DEFAULT CURRENT_TIMESTAMP
)`);
db.run(`CREATE TABLE IF NOT EXISTS categories (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT
)`);
db.run(`CREATE TABLE IF NOT EXISTS orders (
id INTEGER PRIMARY KEY AUTOINCREMENT,
customer_name TEXT NOT NULL,
product_id INTEGER,
quantity INTEGER NOT NULL,
order_date DATETIME DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (product_id) REFERENCES products (id)
)`);
// サンプルデータの挿入
const categories = [
['Electronics', '電子機器・デバイス'],
['Accessories', 'アクセサリー・周辺機器'],
['Software', 'ソフトウェア・アプリケーション']
];
categories.forEach(([name, description]) => {
db.run('INSERT OR IGNORE INTO categories (name, description) VALUES (?, ?)', [name, description]);
});
const products = [
['Laptop Pro 15"', 'Electronics', 1299.99, 'High-performance laptop with 16GB RAM', 25],
['Wireless Keyboard', 'Accessories', 79.99, 'Ergonomic wireless keyboard with backlight', 50],
['Optical Mouse', 'Accessories', 29.99, 'Precision optical mouse with programmable buttons', 75],
['Code Editor Pro', 'Software', 99.99, 'Professional code editor with advanced features', 100],
['Gaming Headset', 'Accessories', 149.99, 'High-quality gaming headset with surround sound', 30],
['Tablet 10"', 'Electronics', 449.99, 'Lightweight tablet with high-resolution display', 40]
];
products.forEach(([name, category, price, description, stock]) => {
db.run('INSERT INTO products (name, category, price, description, stock_quantity) VALUES (?, ?, ?, ?, ?)',
[name, category, price, description, stock]);
});
const orders = [
['Alice Johnson', 1, 1],
['Bob Smith', 2, 2],
['Charlie Brown', 3, 1],
['Diana Wilson', 1, 1],
['Eve Davis', 4, 3]
];
orders.forEach(([customer, productId, quantity]) => {
db.run('INSERT INTO orders (customer_name, product_id, quantity) VALUES (?, ?, ?)',
[customer, productId, quantity]);
});
});
db.close((err) => {
if (err) {
console.error('データベース作成エラー:', err);
} else {
console.log('データベース products.db が正常に作成されました!');
}
});
データベースを作成:
node create-database.js
📝 ステップ2:MCPサーバーの実装
server.tsファイルを作成し、以下のコードを実装します:
#!/usr/bin/env node
import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { StdioServerTransport } from "@modelcontextprotocol/sdk/server/stdio.js";
import {
CallToolRequestSchema,
ListResourcesRequestSchema,
ListToolsRequestSchema,
ReadResourceRequestSchema,
TextContent,
} from "@modelcontextprotocol/sdk/types.js";
import { z } from "zod";
import sqlite3 from "sqlite3";
import { promisify } from "util";
// SQLインジェクション防止のための安全なクエリチェック
class QueryValidator {
private static readonly ALLOWED_KEYWORDS = [
'SELECT', 'FROM', 'WHERE', 'JOIN', 'INNER', 'LEFT', 'RIGHT', 'OUTER',
'ON', 'AND', 'OR', 'GROUP', 'BY', 'HAVING', 'ORDER', 'LIMIT', 'OFFSET',
'AS', 'DISTINCT', 'COUNT', 'SUM', 'AVG', 'MIN', 'MAX', 'CASE', 'WHEN',
'THEN', 'ELSE', 'END', 'IN', 'LIKE', 'BETWEEN', 'IS', 'NULL', 'NOT'
];
private static readonly FORBIDDEN_KEYWORDS = [
'INSERT', 'UPDATE', 'DELETE', 'DROP', 'CREATE', 'ALTER', 'TRUNCATE',
'REPLACE', 'MERGE', 'GRANT', 'REVOKE', 'COMMIT', 'ROLLBACK', 'PRAGMA'
];
static validateReadOnlyQuery(query: string): void {
const upperQuery = query.trim().toUpperCase();
// SELECT文で始まることを確認
if (!upperQuery.startsWith('SELECT')) {
throw new Error('読み取り専用クエリのみ許可されています。SELECT文を使用してください。');
}
// 禁止されたキーワードがないかチェック
for (const forbidden of this.FORBIDDEN_KEYWORDS) {
if (upperQuery.includes(forbidden)) {
throw new Error(`禁止されたSQL操作が含まれています: ${forbidden}`);
}
}
// セミコロンで複数のクエリを実行することを防ぐ
const statements = query.split(';').filter(s => s.trim());
if (statements.length > 1) {
throw new Error('複数のSQL文の実行は許可されていません。');
}
}
}
// データベース接続の管理
class DatabaseManager {
private db: sqlite3.Database;
constructor(dbPath: string) {
this.db = new sqlite3.Database(dbPath, sqlite3.OPEN_READONLY, (err) => {
if (err) {
console.error('データベース接続エラー:', err);
throw err;
}
console.error('SQLiteデータベースに正常に接続しました');
});
}
async query(sql: string): Promise<any[]> {
return new Promise((resolve, reject) => {
this.db.all(sql, (err, rows) => {
if (err) {
reject(err);
} else {
resolve(rows);
}
});
});
}
async getTableSchema(tableName: string): Promise<any[]> {
return new Promise((resolve, reject) => {
this.db.all(`PRAGMA table_info(${tableName})`, (err, rows) => {
if (err) {
reject(err);
} else {
resolve(rows);
}
});
});
}
async getTableNames(): Promise<string[]> {
return new Promise((resolve, reject) => {
this.db.all(
"SELECT name FROM sqlite_master WHERE type='table' ORDER BY name",
(err, rows: any[]) => {
if (err) {
reject(err);
} else {
resolve(rows.map(row => row.name));
}
}
);
});
}
close(): void {
this.db.close();
}
}
// 入力スキーマの定義
const QueryDatabaseSchema = z.object({
query: z.string().min(1).describe("実行するSQL SELECTクエリ")
});
const GetTableSchemaSchema = z.object({
tableName: z.string().min(1).describe("スキーマを取得するテーブル名")
});
class SQLiteMCPServer {
private server: Server;
private dbManager: DatabaseManager;
constructor(dbPath: string) {
this.dbManager = new DatabaseManager(dbPath);
this.server = new Server(
{
name: "sqlite-mcp-server",
version: "1.0.0",
},
{
capabilities: {
tools: {},
resources: {},
},
}
);
this.setupHandlers();
}
private setupHandlers() {
// ツール一覧の取得
this.server.setRequestHandler(ListToolsRequestSchema, async () => ({
tools: [
{
name: "query_database",
description: "SQLiteデータベースに対してSELECTクエリを実行し、結果を取得する",
inputSchema: {
type: "object",
properties: {
query: {
type: "string",
description: "実行するSQL SELECTクエリ(読み取り専用)"
}
},
required: ["query"]
}
},
{
name: "get_table_schema",
description: "指定されたテーブルのスキーマ情報を取得する",
inputSchema: {
type: "object",
properties: {
tableName: {
type: "string",
description: "スキーマを取得するテーブル名"
}
},
required: ["tableName"]
}
},
{
name: "list_tables",
description: "データベース内の全テーブル名を取得する",
inputSchema: {
type: "object",
properties: {},
required: []
}
}
]
}));
// ツール実行の処理
this.server.setRequestHandler(CallToolRequestSchema, async (request) => {
const { name, arguments: args } = request.params;
try {
switch (name) {
case "query_database":
return await this.handleQueryDatabase(args);
case "get_table_schema":
return await this.handleGetTableSchema(args);
case "list_tables":
return await this.handleListTables();
default:
throw new Error(`未知のツール: ${name}`);
}
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "不明なエラー";
return {
content: [
{
type: "text",
text: `❌ エラー: ${errorMessage}`
} as TextContent
],
isError: true
};
}
});
// リソース一覧の取得
this.server.setRequestHandler(ListResourcesRequestSchema, async () => {
try {
const tables = await this.dbManager.getTableNames();
return {
resources: tables.map(tableName => ({
uri: `sqlite:///${tableName}`,
name: `Table: ${tableName}`,
description: `SQLiteテーブル ${tableName} のスキーマ情報`,
mimeType: "application/json"
}))
};
} catch (error) {
console.error('リソース一覧取得エラー:', error);
return { resources: [] };
}
});
// リソース読み取りの処理
this.server.setRequestHandler(ReadResourceRequestSchema, async (request) => {
const { uri } = request.params;
try {
if (!uri.startsWith('sqlite:///')) {
throw new Error('サポートされていないURIスキーム');
}
const tableName = uri.replace('sqlite:///', '');
const schema = await this.dbManager.getTableSchema(tableName);
return {
contents: [
{
uri,
mimeType: "application/json",
text: JSON.stringify({
tableName,
schema: schema,
description: `テーブル ${tableName} のスキーマ情報`
}, null, 2)
}
]
};
} catch (error) {
throw new Error(`リソース読み取りエラー: ${error instanceof Error ? error.message : '不明なエラー'}`);
}
});
}
private async handleQueryDatabase(args: any) {
const parsed = QueryDatabaseSchema.parse(args);
try {
// クエリの安全性をチェック
QueryValidator.validateReadOnlyQuery(parsed.query);
// クエリの実行
const results = await this.dbManager.query(parsed.query);
const resultText = results.length === 0
? "クエリの実行は成功しましたが、結果は0件でした。"
: `クエリの実行結果(${results.length}件):\n\n${JSON.stringify(results, null, 2)}`;
return {
content: [
{
type: "text",
text: resultText
} as TextContent
]
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "データベースクエリでエラーが発生しました";
throw new Error(`クエリ実行エラー: ${errorMessage}`);
}
}
private async handleGetTableSchema(args: any) {
const parsed = GetTableSchemaSchema.parse(args);
try {
const schema = await this.dbManager.getTableSchema(parsed.tableName);
if (schema.length === 0) {
throw new Error(`テーブル '${parsed.tableName}' が見つかりません`);
}
const schemaInfo = {
tableName: parsed.tableName,
columns: schema.map((col: any) => ({
name: col.name,
type: col.type,
nullable: !col.notnull,
primaryKey: col.pk === 1,
defaultValue: col.dflt_value
}))
};
return {
content: [
{
type: "text",
text: `テーブル '${parsed.tableName}' のスキーマ情報:\n\n${JSON.stringify(schemaInfo, null, 2)}`
} as TextContent
]
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "スキーマ取得でエラーが発生しました";
throw new Error(`スキーマ取得エラー: ${errorMessage}`);
}
}
private async handleListTables() {
try {
const tables = await this.dbManager.getTableNames();
return {
content: [
{
type: "text",
text: `データベース内のテーブル一覧:\n\n${tables.map(table => `• ${table}`).join('\n')}`
} as TextContent
]
};
} catch (error) {
const errorMessage = error instanceof Error ? error.message : "テーブル一覧取得でエラーが発生しました";
throw new Error(`テーブル一覧取得エラー: ${errorMessage}`);
}
}
async start() {
const transport = new StdioServerTransport();
await this.server.connect(transport);
console.error("SQLite MCP Server が開始されました");
}
async stop() {
this.dbManager.close();
}
}
// サーバー開始
async function main() {
const dbPath = process.argv[2] || 'products.db';
if (!require('fs').existsSync(dbPath)) {
console.error(`エラー: データベースファイル '${dbPath}' が見つかりません`);
process.exit(1);
}
const server = new SQLiteMCPServer(dbPath);
// グレースフル・シャットダウン
process.on('SIGINT', async () => {
console.error('シャットダウン中...');
await server.stop();
process.exit(0);
});
await server.start();
}
// エラーハンドリング
process.on('unhandledRejection', (reason, promise) => {
console.error('Unhandled Rejection at:', promise, 'reason:', reason);
process.exit(1);
});
process.on('uncaughtException', (error) => {
console.error('Uncaught Exception:', error);
process.exit(1);
});
if (require.main === module) {
main().catch((error) => {
console.error("サーバー開始エラー:", error);
process.exit(1);
});
}
🔧 ステップ3:Claude Desktop設定
claude_desktop_config.jsonに以下の設定を追加:
{
"mcpServers": {
"sqlite-server": {
"command": "npx",
"args": ["ts-node", "/path/to/your/project/server.ts", "products.db"]
}
}
}
🚀 ステップ4:動作確認とテスト
サーバーの起動
npx ts-node server.ts products.db
Claude Desktopでのテスト例
以下のような質問でテストしてみましょう:
-
基本的なクエリ
"製品データベースの全テーブルを教えて" -
データ分析
"カテゴリ別の製品数を教えて" -
複雑なクエリ
"価格が100ドル以上の電子機器の一覧を、在庫数と一緒に表示して" -
スキーマ確認
"productsテーブルのスキーマを教えて"
期待される動作フロー
-
テーブル一覧の取得:
Claudeがlist_tablesツールを実行してデータベース構造を理解 -
スキーマの確認:
get_table_schemaツールでテーブル構造を確認 -
クエリの生成と実行:
適切なSELECTクエリを生成し、query_databaseツールで実行 -
結果の解釈:
データベースの結果を分析してユーザーに回答
🛡️ セキュリティ機能の詳細
1. 読み取り専用の強制
- データベースを
OPEN_READONLYモードで開く - SELECT文のみを許可する厳密なバリデーション
- INSERT、UPDATE、DELETE等の操作を完全にブロック
2. SQLインジェクション防止
- 許可されたキーワードのホワイトリスト
- 禁止されたキーワードのブラックリスト
- 複数SQL文の実行を防止
3. エラーハンドリング
- 詳細なエラーメッセージでデバッグを支援
- セキュリティ情報の漏洩を防止
- 適切なログ出力
📊 実用的な応用例
ビジネスインテリジェンス
// 売上分析のためのクエリ例
const analyticsQueries = {
monthlySales: `
SELECT
strftime('%Y-%m', order_date) as month,
COUNT(*) as order_count,
SUM(p.price * o.quantity) as total_revenue
FROM orders o
JOIN products p ON o.product_id = p.id
GROUP BY month
ORDER BY month DESC
`,
topProducts: `
SELECT
p.name,
p.category,
COUNT(o.id) as order_count,
SUM(o.quantity) as total_sold
FROM products p
LEFT JOIN orders o ON p.id = o.product_id
GROUP BY p.id
ORDER BY order_count DESC
LIMIT 10
`
};
在庫管理
-- 在庫が少ない商品の確認
SELECT name, category, stock_quantity
FROM products
WHERE stock_quantity < 30
ORDER BY stock_quantity ASC;
-- カテゴリ別在庫状況
SELECT
category,
COUNT(*) as product_count,
AVG(stock_quantity) as avg_stock,
MIN(stock_quantity) as min_stock
FROM products
GROUP BY category;
🎯 ベストプラクティス
1. パフォーマンスの最適化
// インデックスの活用を促すクエリ設計
// データベース接続のプーリング
// クエリ結果のキャッシュ(必要に応じて)
2. エラー処理の改善
private handleDatabaseError(error: any): string {
if (error.code === 'SQLITE_BUSY') {
return 'データベースが使用中です。しばらく後に再試行してください。';
} else if (error.code === 'SQLITE_LOCKED') {
return 'データベースがロックされています。';
} else {
return `データベースエラー: ${error.message}`;
}
}
3. 監査とログ
private logQuery(query: string, results: any[], executionTime: number) {
console.log({
timestamp: new Date().toISOString(),
query: query.substring(0, 100) + (query.length > 100 ? '...' : ''),
resultCount: results.length,
executionTime: `${executionTime}ms`
});
}
📈 スケーラビリティの考慮
より大規模なデータベースへの対応
- PostgreSQL/MySQL対応: 同じパターンで他のデータベースにも対応可能
- 接続プーリング: 複数のクライアントからの同時アクセスに対応
- クエリの最適化: EXPLAINを使った実行計画の分析
- キャッシュ戦略: 頻繁にアクセスされるデータのキャッシュ
パフォーマンス監視
class PerformanceMonitor {
private static queryStats = new Map<string, { count: number, totalTime: number }>();
static recordQuery(query: string, executionTime: number) {
const stats = this.queryStats.get(query) || { count: 0, totalTime: 0 };
stats.count++;
stats.totalTime += executionTime;
this.queryStats.set(query, stats);
}
static getSlowQueries(threshold = 1000): Array<{ query: string, avgTime: number }> {
const slow = [];
for (const [query, stats] of this.queryStats) {
const avgTime = stats.totalTime / stats.count;
if (avgTime > threshold) {
slow.push({ query, avgTime });
}
}
return slow.sort((a, b) => b.avgTime - a.avgTime);
}
}
🔍 トラブルシューティング
よくある問題と解決法
-
データベース接続エラー
# ファイルパスを確認 ls -la products.db # パーミッションを確認 chmod 644 products.db -
クエリが遅い場合
-- インデックスの確認 PRAGMA index_list('products'); -- クエリプランの確認 EXPLAIN QUERY PLAN SELECT * FROM products WHERE category = 'Electronics'; -
メモリ使用量の問題
// 大きな結果セットの場合は LIMIT を使用 if (!query.toUpperCase().includes('LIMIT')) { query += ' LIMIT 1000'; }
📝 まとめ
今回は、SQLiteデータベースへの読み取り専用アクセスをMCP経由で安全に提供する方法を学びました。
重要なポイント
- セキュリティファースト: 読み取り専用の徹底とSQLインジェクション防止
- 構造化されたアプローチ: スキーマ情報の提供とクエリバリデーション
- エラーハンドリング: 適切なエラー処理と情報提供
- スケーラビリティ: より大規模なシステムへの対応を考慮した設計
応用可能性
このパターンは以下の用途に応用できます:
- ビジネスインテリジェンス: 売上分析、KPI監視
- 在庫管理: 商品管理、発注支援
- 顧客管理: 顧客データの分析と検索
- レポート生成: 定期レポートの自動生成
次回は、外部Web API統合MCPをテーマに、RESTful APIを安全にラップし、LLMから活用する実装パターンを解説します。お楽しみに!