🌟 はじめに
このプロジェクトでは、大規模言語モデル(LLM)であるCursorやClaudeが、自然言語コマンドだけで直接Kafkaを制御する方法を実験したいと思いました。
開始時点では、MCPプロトコルがあまり理解していませんでしたが、実装過程を通じて多くの興味深い経験を得ることができました。
🤔 MCPとは?
MCP(Model Context Protocol)は、LLMと外部プログラムを接続するための標準プロトコルです。
動作方法:
- MCPクライアント(例:Claude Desktop、Cursor、ChatGPT)が
stdin/stdout
を通じてあなたのプログラムとの接続を維持します。 - LLMが自然言語コマンドを理解(「Kafkaの全トピックをリストアップ」など)します。
- LLMが自動的にコマンドを、あなたが登録した「ツール」にマッピングします。
- MCPサーバーがコマンドを実行し、正しいJSON形式で結果を返します。
🛠 MCPクライアントとMCPサーバーの理解
-
MCPクライアント:
- LLMを実行します。
- ツールリストを管理します。
- 自然言語コマンドに基づいてJSONリクエストを生成します。
-
MCPサーバー:
- Go/Python/...プログラムです。
- 説明付きのツールリストを登録します。
- stdinからJSONリクエストを受信します。
- Kafkaまたは他のシステムを呼び出します。
- JSONレスポンスを返します。
クエリフォーチャ:
+------------------+
| ユーザー |
| "List all topics|
| in Kafka" |
+------------------+
|
v
+------------------+
| LLM |
| (Claude, OpenAI) |
+------------------+
|
"意図を理解:
List Kafka topics"
|
v
+--------------------------+
| JSONリクエスト作成 |
| method: tools/call |
| name: list_topics |
| arguments: { broker } |
+--------------------------+
|
v
+--------------------------+
| MCPクライアント |
| (Claude Desktop, Cursor) |
+--------------------------+
|
"MCPサーバープロセス実行:
./mcp-kafka"
|
v
+--------------------------+
| MCPサーバー |
| (Goプログラム) |
| - stdinを読み取り |
| - Kafka APIを呼び出し |
| - JSON結果を返す |
+--------------------------+
|
stdoutに出力:
{
"id":"xyz",
"result": ["orders", "payments"]
}
|
v
+--------------------------+
| MCPクライアント |
| - stdoutを読み取り |
| - JSONをLLMに送信 |
+--------------------------+
|
v
+------------------+
| LLM |
| (Claude, OpenAI) |
| "美しい文章を作成" |
+------------------+
|
v
+------------------+
| ユーザー |
| "Topics found: |
| orders, payments|
| |
+------------------+
💡 なぜLLMが非常に重要なのか?
以前は、Kafkaトピックをリストアップしたい場合、手動でコマンド構文やAPI呼び出しを書く必要がありました。
LLM + MCPを使用すると:
- 次のように言うだけで済みます:
"List all topics in Kafka on localhost:9092"
- Claudeが自動的に:
- 意図を理解します。
-
list_topics
ツールを選択します。 - JSONリクエストを作成します:
{ "method": "tools/call", "params": { "name": "list_topics", "arguments": { "broker": "localhost:9092" } }, "id": "abc123" }
コマンドマッピングについて心配する必要がありません。
🧩 MCPサーバーの実装方法
私のプロジェクト:
https://github.com/phandinhloccb/kafka-mcp-server
1. 依存関係の構造 (go.mod)
module mcp-kafka
go 1.23
require (
github.com/mark3labs/mcp-go v0.32.0
github.com/segmentio/kafka-go v0.4.47
)
2. MCPサーバーの初期化 (main.go)
package main
import (
"context"
"fmt"
"log"
"github.com/mark3labs/mcp-go/mcp"
"github.com/mark3labs/mcp-go/server"
)
func main() {
s := server.NewMCPServer(
"Kafka MCP Server",
"1.0.0",
server.WithToolCapabilities(false),
server.WithRecovery(),
)
// 4つの主要ツールを登録
registerKafkaTools(s)
if err := server.ServeStdio(s); err != nil {
log.Fatalf("Error server: %v", err)
}
}
3. ツール登録:トピックリスト
listTopicsTool := mcp.NewTool("list_topics",
mcp.WithDescription("List all topics in Kafka broker"),
mcp.WithString("broker",
mcp.Required(),
mcp.Description("Kafka broker address (e.g. localhost:9092)"),
),
)
s.AddTool(listTopicsTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
broker, err := request.RequireString("broker")
if err != nil {
return mcp.NewToolResultError("Missing broker information"), nil
}
topics, err := listKafkaTopics(broker)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Error listing topics: %v", err)), nil
}
if len(topics) == 0 {
return mcp.NewToolResultText("No topics found in broker"), nil
}
result := "List of topics:\n"
for i, topic := range topics {
result += fmt.Sprintf("%d. %s\n", i+1, topic)
}
return mcp.NewToolResultText(result), nil
})
4. Kafkaクライアント関数 (kafka_client.go)
// listKafkaTopics Kafkaブローカーから全トピックを取得
func listKafkaTopics(broker string) ([]string, error) {
conn, err := kafka.Dial("tcp", broker)
if err != nil {
return nil, fmt.Errorf("cannot connect to broker %s: %v", broker, err)
}
defer conn.Close()
partitions, err := conn.ReadPartitions()
if err != nil {
return nil, fmt.Errorf("cannot read partitions: %v", err)
}
// ユニークなトピックを格納するマップを作成
topicMap := make(map[string]bool)
for _, partition := range partitions {
topicMap[partition.Topic] = true
}
// マップをスライスに変換
var topics []string
for topic := range topicMap {
topics = append(topics, topic)
}
return topics, nil
}
5. ツール:トピック作成
createTopicTool := mcp.NewTool("create_topic",
mcp.WithDescription("Create a new topic in Kafka"),
mcp.WithString("broker", mcp.Required(), mcp.Description("Kafka broker address")),
mcp.WithString("topic", mcp.Required(), mcp.Description("Name of the topic to create")),
mcp.WithNumber("partitions", mcp.Description("Number of partitions (default: 1)")),
)
s.AddTool(createTopicTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
broker, _ := request.RequireString("broker")
topicName, _ := request.RequireString("topic")
partitions := 1
if p := request.GetFloat("partitions", 1.0); p > 0 {
partitions = int(p)
}
err := createKafkaTopic(broker, topicName, partitions)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Error creating topic: %v", err)), nil
}
return mcp.NewToolResultText(fmt.Sprintf("Successfully created topic '%s' with %d partitions", topicName, partitions)), nil
})
6. ツール:メッセージ送信
produceTool := mcp.NewTool("produce_message",
mcp.WithDescription("Send message to Kafka topic"),
mcp.WithString("broker", mcp.Required(), mcp.Description("Kafka broker address")),
mcp.WithString("topic", mcp.Required(), mcp.Description("Topic name")),
mcp.WithString("message", mcp.Required(), mcp.Description("Message content")),
mcp.WithString("key", mcp.Description("Key for message (optional)")),
)
s.AddTool(produceTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
broker, _ := request.RequireString("broker")
topic, _ := request.RequireString("topic")
message, _ := request.RequireString("message")
key := request.GetString("key", "")
err := produceKafkaMessage(broker, topic, key, message)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Error sending message: %v", err)), nil
}
keyInfo := ""
if key != "" {
keyInfo = fmt.Sprintf(" with key '%s'", key)
}
return mcp.NewToolResultText(fmt.Sprintf("Successfully sent message to topic '%s'%s", topic, keyInfo)), nil
})
7. ツール:メッセージ消費
consumeTool := mcp.NewTool("consume_messages",
mcp.WithDescription("Read messages from Kafka topic"),
mcp.WithString("broker", mcp.Required(), mcp.Description("Kafka broker address")),
mcp.WithString("topic", mcp.Required(), mcp.Description("Topic name")),
mcp.WithNumber("count", mcp.Description("Number of messages to read (default: 10)")),
)
s.AddTool(consumeTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
broker, _ := request.RequireString("broker")
topic, _ := request.RequireString("topic")
count := 10
if c := request.GetFloat("count", 10.0); c > 0 {
count = int(c)
}
messages, err := consumeKafkaMessages(broker, topic, count)
if err != nil {
return mcp.NewToolResultError(fmt.Sprintf("Error reading messages: %v", err)), nil
}
if messages == "" {
return mcp.NewToolResultText(fmt.Sprintf("No messages found in topic '%s'", topic)), nil
}
return mcp.NewToolResultText(fmt.Sprintf("Messages in topic '%s':\n\n%s", topic, messages)), nil
})
✨ LLMがMCPを通じて通信する方法
私がチャットで次のように言うとき:
"List all topics in Kafka on localhost:9092"
Claudeは以下を行います:
- 適切な説明を持つツールを見つけます。
-
list_topics
を選択します。 - MCPリクエストを送信します:
{ "method": "tools/call", "params": { "name": "list_topics", "arguments": { "broker": "localhost:9092" } }, "id": "xyz456" }
- サーバーがトピックリストを返します。
- Claudeが結果をテキスト形式で表示します。
🧪 MCPサーバーのテスト方法
1. バイナリをビルド:
go build -o mcp-kafka
2. 手動でテスト実行:
echo '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":1}' | ./mcp-kafka
3. Cursor用の設定を追加 (.cursor/mcp.json):
{
"mcpServers": {
"kafka": {
"command": "/Users/l-phan/Documents/my-project/kafka-mcp/mcp-kafka",
"autoApprove": [],
"disabled": false,
"timeout": 60,
"transportType": "stdio"
}
}
}
4. Cursorを開いて自然言語コマンドでチャット:
- "list topics in kafka broker localhost:9092"
- "create topic 'my-new-topic' in kafka broker localhost:9092"
- "send message 'Hello World' to kafka topic 'my-topic' on localhost:9092"
- "read 5 messages from kafka topic 'my-topic' on localhost:9092"
🚀 DockerでKafkaを実行する方法
# docker-compose.yml
version: '3'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.5.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-kafka:7.5.0
ports:
- "9092:9092"
depends_on:
- zookeeper
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
実行:
docker-compose up -d
💭 得られた教訓
LLMは非常に重要:
自然言語を正確なJSONリクエストに変換します。
MCPサーバーは非常にシンプル:
必要なのは:
- stdinを読み取る。
- JSONを解析する。
- Kafkaを処理する。
- JSONをstdoutに出力する。
MCPプロトコルは標準化された橋渡し:
LLMがあなたのシステムと一貫した方法で通信するのを助けます。
実装した4つの主要ツール:
- list_topics - 全トピックをリストアップ
- create_topic - 新しいトピックを作成
- produce_message - トピックにメッセージを送信
- consume_messages - トピックからメッセージを読み取り
🏁 結論
MCPは以下のことを望む場合に優れたツールです:
- LLMを使用して自動操作を行う。
- Kafkaまたは任意のシステムと相互作用できる「アシスタント」を構築する。