1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

🌟 はじめに

このプロジェクトでは、大規模言語モデル(LLM)であるCursorやClaudeが、自然言語コマンドだけで直接Kafkaを制御する方法を実験したいと思いました。
開始時点では、MCPプロトコルがあまり理解していませんでしたが、実装過程を通じて多くの興味深い経験を得ることができました。

🤔 MCPとは?

MCPModel Context Protocol)は、LLMと外部プログラムを接続するための標準プロトコルです。
動作方法:

  1. MCPクライアント(例:Claude Desktop、Cursor、ChatGPT)がstdin/stdoutを通じてあなたのプログラムとの接続を維持します。
  2. LLMが自然言語コマンドを理解(「Kafkaの全トピックをリストアップ」など)します。
  3. LLMが自動的にコマンドを、あなたが登録した「ツール」にマッピングします。
  4. MCPサーバーがコマンドを実行し、正しいJSON形式で結果を返します。

🛠 MCPクライアントとMCPサーバーの理解

  • MCPクライアント

    • LLMを実行します。
    • ツールリストを管理します。
    • 自然言語コマンドに基づいてJSONリクエストを生成します。
  • MCPサーバー

    • Go/Python/...プログラムです。
    • 説明付きのツールリストを登録します。
    • stdinからJSONリクエストを受信します。
    • Kafkaまたは他のシステムを呼び出します。
    • JSONレスポンスを返します。

クエリフォーチャ:

+------------------+
|     ユーザー      |
|  "List all topics|
|   in Kafka"      |
+------------------+
           |
           v
+------------------+
|      LLM         |
| (Claude, OpenAI) |
+------------------+
           |
   "意図を理解:
   List Kafka topics"
           |
           v
+--------------------------+
|   JSONリクエスト作成     |
|   method: tools/call     |
|   name: list_topics      |
|   arguments: { broker }  |
+--------------------------+
           |
           v
+--------------------------+
|   MCPクライアント       |
| (Claude Desktop, Cursor) |
+--------------------------+
           |
 "MCPサーバープロセス実行:
  ./mcp-kafka"
           |
           v
+--------------------------+
|        MCPサーバー       |
|     (Goプログラム)       |
| - stdinを読み取り        |
| - Kafka APIを呼び出し    |
| - JSON結果を返す         |
+--------------------------+
           |
   stdoutに出力:
   {
     "id":"xyz",
     "result": ["orders", "payments"]
   }
           |
           v
+--------------------------+
|       MCPクライアント      |
| - stdoutを読み取り        |
| - JSONをLLMに送信        |
+--------------------------+
           |
           v
+------------------+
|      LLM         |
| (Claude, OpenAI) |
| "美しい文章を作成"  |
+------------------+
           |
           v
+------------------+
|    ユーザー       |
| "Topics found:   |
|  orders, payments|
|                  |
+------------------+


💡 なぜLLMが非常に重要なのか?

以前は、Kafkaトピックをリストアップしたい場合、手動でコマンド構文やAPI呼び出しを書く必要がありました。

LLM + MCPを使用すると:

  • 次のように言うだけで済みます:

    "List all topics in Kafka on localhost:9092"

  • Claudeが自動的に:
    • 意図を理解します。
    • list_topicsツールを選択します。
    • JSONリクエストを作成します:
      {
        "method": "tools/call",
        "params": {
          "name": "list_topics",
          "arguments": {
            "broker": "localhost:9092"
          }
        },
        "id": "abc123"
      }
      

コマンドマッピングについて心配する必要がありません


🧩 MCPサーバーの実装方法

私のプロジェクト:
https://github.com/phandinhloccb/kafka-mcp-server

1. 依存関係の構造 (go.mod)

module mcp-kafka

go 1.23

require (
	github.com/mark3labs/mcp-go v0.32.0
	github.com/segmentio/kafka-go v0.4.47
)

2. MCPサーバーの初期化 (main.go)

package main

import (
	"context"
	"fmt"
	"log"

	"github.com/mark3labs/mcp-go/mcp"
	"github.com/mark3labs/mcp-go/server"
)

func main() {
	s := server.NewMCPServer(
		"Kafka MCP Server",
		"1.0.0",
		server.WithToolCapabilities(false),
		server.WithRecovery(),
	)
	
	// 4つの主要ツールを登録
	registerKafkaTools(s)
	
	if err := server.ServeStdio(s); err != nil {
		log.Fatalf("Error server: %v", err)
	}
}

3. ツール登録:トピックリスト

listTopicsTool := mcp.NewTool("list_topics",
	mcp.WithDescription("List all topics in Kafka broker"),
	mcp.WithString("broker",
		mcp.Required(),
		mcp.Description("Kafka broker address (e.g. localhost:9092)"),
	),
)

s.AddTool(listTopicsTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
	broker, err := request.RequireString("broker")
	if err != nil {
		return mcp.NewToolResultError("Missing broker information"), nil
	}

	topics, err := listKafkaTopics(broker)
	if err != nil {
		return mcp.NewToolResultError(fmt.Sprintf("Error listing topics: %v", err)), nil
	}

	if len(topics) == 0 {
		return mcp.NewToolResultText("No topics found in broker"), nil
	}

	result := "List of topics:\n"
	for i, topic := range topics {
		result += fmt.Sprintf("%d. %s\n", i+1, topic)
	}

	return mcp.NewToolResultText(result), nil
})

4. Kafkaクライアント関数 (kafka_client.go)

// listKafkaTopics Kafkaブローカーから全トピックを取得
func listKafkaTopics(broker string) ([]string, error) {
	conn, err := kafka.Dial("tcp", broker)
	if err != nil {
		return nil, fmt.Errorf("cannot connect to broker %s: %v", broker, err)
	}
	defer conn.Close()

	partitions, err := conn.ReadPartitions()
	if err != nil {
		return nil, fmt.Errorf("cannot read partitions: %v", err)
	}

	// ユニークなトピックを格納するマップを作成
	topicMap := make(map[string]bool)
	for _, partition := range partitions {
		topicMap[partition.Topic] = true
	}

	// マップをスライスに変換
	var topics []string
	for topic := range topicMap {
		topics = append(topics, topic)
	}

	return topics, nil
}

5. ツール:トピック作成

createTopicTool := mcp.NewTool("create_topic",
	mcp.WithDescription("Create a new topic in Kafka"),
	mcp.WithString("broker", mcp.Required(), mcp.Description("Kafka broker address")),
	mcp.WithString("topic", mcp.Required(), mcp.Description("Name of the topic to create")),
	mcp.WithNumber("partitions", mcp.Description("Number of partitions (default: 1)")),
)

s.AddTool(createTopicTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
	broker, _ := request.RequireString("broker")
	topicName, _ := request.RequireString("topic")
	
	partitions := 1
	if p := request.GetFloat("partitions", 1.0); p > 0 {
		partitions = int(p)
	}

	err := createKafkaTopic(broker, topicName, partitions)
	if err != nil {
		return mcp.NewToolResultError(fmt.Sprintf("Error creating topic: %v", err)), nil
	}

	return mcp.NewToolResultText(fmt.Sprintf("Successfully created topic '%s' with %d partitions", topicName, partitions)), nil
})

6. ツール:メッセージ送信

produceTool := mcp.NewTool("produce_message",
	mcp.WithDescription("Send message to Kafka topic"),
	mcp.WithString("broker", mcp.Required(), mcp.Description("Kafka broker address")),
	mcp.WithString("topic", mcp.Required(), mcp.Description("Topic name")),
	mcp.WithString("message", mcp.Required(), mcp.Description("Message content")),
	mcp.WithString("key", mcp.Description("Key for message (optional)")),
)

s.AddTool(produceTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
	broker, _ := request.RequireString("broker")
	topic, _ := request.RequireString("topic")
	message, _ := request.RequireString("message")
	key := request.GetString("key", "")

	err := produceKafkaMessage(broker, topic, key, message)
	if err != nil {
		return mcp.NewToolResultError(fmt.Sprintf("Error sending message: %v", err)), nil
	}

	keyInfo := ""
	if key != "" {
		keyInfo = fmt.Sprintf(" with key '%s'", key)
	}

	return mcp.NewToolResultText(fmt.Sprintf("Successfully sent message to topic '%s'%s", topic, keyInfo)), nil
})

7. ツール:メッセージ消費

consumeTool := mcp.NewTool("consume_messages",
	mcp.WithDescription("Read messages from Kafka topic"),
	mcp.WithString("broker", mcp.Required(), mcp.Description("Kafka broker address")),
	mcp.WithString("topic", mcp.Required(), mcp.Description("Topic name")),
	mcp.WithNumber("count", mcp.Description("Number of messages to read (default: 10)")),
)

s.AddTool(consumeTool, func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error) {
	broker, _ := request.RequireString("broker")
	topic, _ := request.RequireString("topic")
	
	count := 10
	if c := request.GetFloat("count", 10.0); c > 0 {
		count = int(c)
	}

	messages, err := consumeKafkaMessages(broker, topic, count)
	if err != nil {
		return mcp.NewToolResultError(fmt.Sprintf("Error reading messages: %v", err)), nil
	}

	if messages == "" {
		return mcp.NewToolResultText(fmt.Sprintf("No messages found in topic '%s'", topic)), nil
	}

	return mcp.NewToolResultText(fmt.Sprintf("Messages in topic '%s':\n\n%s", topic, messages)), nil
})

✨ LLMがMCPを通じて通信する方法

私がチャットで次のように言うとき:

"List all topics in Kafka on localhost:9092"

Claudeは以下を行います:

  1. 適切な説明を持つツールを見つけます。
  2. list_topicsを選択します。
  3. MCPリクエストを送信します:
    {
      "method": "tools/call",
      "params": {
        "name": "list_topics",
        "arguments": {
          "broker": "localhost:9092"
        }
      },
      "id": "xyz456"
    }
    
  4. サーバーがトピックリストを返します。
  5. Claudeが結果をテキスト形式で表示します。

🧪 MCPサーバーのテスト方法

1. バイナリをビルド:

go build -o mcp-kafka

2. 手動でテスト実行:

echo '{"jsonrpc":"2.0","method":"tools/list","params":{},"id":1}' | ./mcp-kafka

3. Cursor用の設定を追加 (.cursor/mcp.json):

{
  "mcpServers": {
    "kafka": {
      "command": "/Users/l-phan/Documents/my-project/kafka-mcp/mcp-kafka",
      "autoApprove": [],
      "disabled": false,
      "timeout": 60,
      "transportType": "stdio"
    }
  }
}

4. Cursorを開いて自然言語コマンドでチャット:

Screenshot 2025-06-29 at 22.42.03.png

  • "list topics in kafka broker localhost:9092"
  • "create topic 'my-new-topic' in kafka broker localhost:9092"
  • "send message 'Hello World' to kafka topic 'my-topic' on localhost:9092"
  • "read 5 messages from kafka topic 'my-topic' on localhost:9092"

🚀 DockerでKafkaを実行する方法

# docker-compose.yml
version: '3'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-kafka:7.5.0
    ports:
      - "9092:9092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://broker:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1

実行:

docker-compose up -d

💭 得られた教訓

LLMは非常に重要:

自然言語を正確なJSONリクエストに変換します。

MCPサーバーは非常にシンプル:

必要なのは:

  • stdinを読み取る。
  • JSONを解析する。
  • Kafkaを処理する。
  • JSONをstdoutに出力する。

MCPプロトコルは標準化された橋渡し:

LLMがあなたのシステムと一貫した方法で通信するのを助けます。

実装した4つの主要ツール:

  1. list_topics - 全トピックをリストアップ
  2. create_topic - 新しいトピックを作成
  3. produce_message - トピックにメッセージを送信
  4. consume_messages - トピックからメッセージを読み取り

🏁 結論

MCPは以下のことを望む場合に優れたツールです:

  • LLMを使用して自動操作を行う。
  • Kafkaまたは任意のシステムと相互作用できる「アシスタント」を構築する。

📚 参考資料

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?