1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Go言語で開発したネットワーク管理ソフト(TWSNMP FC/FK)にMQTTサーバーとMQTTクライアントを組み込んだ話

Posted at

はじめに

私が開発しているネットワーク管理ソフトTWSNMP FC

TWSNMP FK

にMQTTサーバー(ブローカー)とMQTTクライアントを組み込んだ時に利用したGo言語のパッケージの紹介です。

MQTTサーバーパッケージ

Go言語でMQTTサーバーを開発するためのパッケージは

がおすすめです。
単体で動作するMQTTサーバーコマンドも提唱されていますが、ライブラリーとしてGo言語のアプリに組み込むことができます。GitHUBには、簡単なサンプルコードが紹介されていますが、動作している様子を確認するためにログに出力できるようなしたサンプルコードを作成しました。

package main

import (
	"bytes"
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/mochi-mqtt/server/v2"
	"github.com/mochi-mqtt/server/v2/hooks/auth"
	"github.com/mochi-mqtt/server/v2/listeners"
	"github.com/mochi-mqtt/server/v2/packets"
)

func main() {
	if len(os.Args) < 2 {
		log.Fatalln("mqtt_sv <tcp port> [<stats port>]")
	}
	// シグナルで停止できるようにします。
	sigs := make(chan os.Signal, 1)
	done := make(chan bool, 1)
	signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		<-sigs
		done <- true
	}()
	// MQTTサーバーを作成します。
	server := mqtt.New(nil)
	if err := server.AddHook(new(auth.AllowHook), nil); err != nil {
		log.Fatal(err)
	}
	// Hookを登録します。
	if err := server.AddHook(new(ExampleHook), &ExampleHookOptions{
		Server: server,
	}); err != nil {
		log.Fatal(err)
	}
	// TCPの待ち受けポートを作成して登録します。
	tcp := listeners.NewTCP(listeners.Config{ID: "tcp1", Address: ":" + os.Args[1]})
	if err := server.AddListener(tcp); err != nil {
		log.Fatal(err)
	}
	if len(os.Args) > 2 {
		// Statsの待ち受けポートを作成して登録します。
		stats := listeners.NewHTTPStats(
			listeners.Config{
				ID:      "stats1",
				Address: ":" + os.Args[2],
			}, server.Info,
		)
		if err := server.AddListener(stats); err != nil {
			log.Fatal(err)
		}
	}
	go func() {
		err := server.Serve()
		if err != nil {
			log.Fatal(err)
		}
	}()
	<-done
}

// ExampleHookOptions は、ExampleHookのオプションです。
type ExampleHookOptions struct {
	Server *mqtt.Server
}

// ExampleHook はMQTTサーバーの動作をログに出力するHookです。
type ExampleHook struct {
	mqtt.HookBase
	config *ExampleHookOptions
}

func (h *ExampleHook) ID() string {
	return "events-example"
}

func (h *ExampleHook) Provides(b byte) bool {
	return bytes.Contains([]byte{
		mqtt.OnConnect,
		mqtt.OnDisconnect,
		mqtt.OnSubscribed,
		mqtt.OnUnsubscribed,
		mqtt.OnPublished,
		mqtt.OnPublish,
	}, []byte{b})
}

func (h *ExampleHook) Init(config any) error {
	h.Log.Info("initialised")
	if _, ok := config.(*ExampleHookOptions); !ok && config != nil {
		return mqtt.ErrInvalidConfigType
	}

	h.config = config.(*ExampleHookOptions)
	if h.config.Server == nil {
		return mqtt.ErrInvalidConfigType
	}
	return nil
}

func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
	h.Log.Info("client connected", "client", cl.ID)
	return nil
}

func (h *ExampleHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
	h.Log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)
}

func (h *ExampleHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
	h.Log.Info(fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
}

func (h *ExampleHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
	h.Log.Info("unsubscribed", "client", cl.ID, "filters", pk.Filters)
}

func (h *ExampleHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
	h.Log.Info("received from client", "client", cl.ID, "topic", pk.TopicName, "payload", string(pk.Payload))
	return pk, nil
}

func (h *ExampleHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
	h.Log.Info("published to client", "client", cl.ID, "topic", pk.TopicName, "payload", string(pk.Payload))
}

MQTTクライアントパッケージ

Go言語のMQTTクライアントパッケージは、多くの人が紹介している

がよいと思います。
こちらも、先程のサーバーのサンプルを試すためのサンプルコードを作成しました。

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"
	"syscall"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// 受信した時のハンドラー
var publishHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
	log.Printf("Received message: %s from topic: %s", msg.Payload(), msg.Topic())
}

// 接続した時のハンドラー
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	log.Println("Connected")
}

// 切断した時のハンドラー
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	log.Printf("Connect lost: %v", err)
}

func main() {
	if len(os.Args) < 2 {
		log.Fatalln("mqtt_cl <IP:port>")
	}
	// MQTTクライアントを作成します。
	opts := mqtt.NewClientOptions()
	opts.AddBroker(fmt.Sprintf("tcp://%s", os.Args[1]))
	opts.SetClientID("test-mqtt-client")
	opts.SetDefaultPublishHandler(publishHandler)
	opts.OnConnect = connectHandler
	opts.OnConnectionLost = connectLostHandler
	client := mqtt.NewClient(opts)
	// サーバーに接続します。
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		log.Fatalln(token.Error())
	}
	// Topicをサブスクライブします。
	subscribe(client, "topic/test")
	// Topicを10回パブリッシュします。
	for i := 0; i < 10; i++ {
		publish(client, "topic/test", fmt.Sprintf("payload:%d", i))
	}
	// Ctl+cが押されるまで待ちます。
	c := make(chan os.Signal, 2)
	signal.Notify(c, os.Interrupt, syscall.SIGTERM)
	<-c
	client.Disconnect(250)
}

// サブスクライブの登録
func subscribe(client mqtt.Client, topic string) {
	token := client.Subscribe(topic, 1, nil)
	token.Wait()
	log.Printf("subscribe topic: %s", topic)
}

// Topicのパブリッシュ
func publish(client mqtt.Client, topic, payload string) {
	token := client.Publish(topic, 0, false, payload)
	token.Wait()
	log.Printf("publish topic:%s payload:%s", topic, payload)
}

サンプルコードを動かす

MQTTサーバーとクライアントのサンプルコードを試してみます。

サーバーを

$go run main.go 1887 1888

のように起動してから、クライアントを

$go run main.go 127.0.0.1:1887

のように実行します。

サーバーのログは、


ime=2025-11-23T05:32:10.290+09:00 level=INFO msg="added hook" hook=allow-all-auth
time=2025-11-23T05:32:10.290+09:00 level=INFO msg="added hook" hook=events-example
time=2025-11-23T05:32:10.290+09:00 level=INFO msg=initialised hook=events-example
time=2025-11-23T05:32:10.291+09:00 level=INFO msg="attached listener" id=tcp1 protocol=tcp address=[::]:1887
time=2025-11-23T05:32:10.291+09:00 level=INFO msg="attached listener" id=stats1 protocol=http address=:1888
time=2025-11-23T05:32:10.291+09:00 level=INFO msg="mochi mqtt starting" version=2.7.9
time=2025-11-23T05:32:10.291+09:00 level=INFO msg="mochi mqtt server started"
time=2025-11-23T05:32:20.272+09:00 level=INFO msg="client connected" hook=events-example client=test-mqtt-client
time=2025-11-23T05:32:20.272+09:00 level=INFO msg="subscribed qos=[1]" hook=events-example client=test-mqtt-client filters="[{ShareName:[] Filter:topic/test Identifier:0 Identifiers:map[] RetainHandling:0 Qos:1 RetainAsPublished:false NoLocal:false FwdRetainedFlag:false}]"
time=2025-11-23T05:32:20.272+09:00 level=INFO msg="received from client" hook=events-example client=test-mqtt-client topic=topic/test payload=payload:0
time=2025-11-23T05:32:20.272+09:00 level=INFO msg="published to client" hook=events-example client=test-mqtt-client topic=topic/test payload=payload:0
|
time=2025-11-23T05:32:23.841+09:00 level=INFO msg="client disconnected" hook=events-example client=test-mqtt-client expire=true error=<nil>
time=2025-11-23T05:32:23.841+09:00 level=INFO msg=unsubscribed hook=events-example client=test-mqtt-client filters="[{ShareName:[] Filter:topic/test Identifier:0 Identifiers:map[] RetainHandling:0 Qos:1 RetainAsPublished:false NoLocal:false FwdRetainedFlag:false}]"

のようになります。
クライアント側のログは、

2025/11/23 05:32:20 Connected
2025/11/23 05:32:20 subscribe topic: topic/test
2025/11/23 05:32:20 publish topic:topic/test payload:payload:0
|省略
2025/11/23 05:32:20 publish topic:topic/test payload:payload:9
2025/11/23 05:32:20 Received message: payload:0 from topic: topic/test
|省略
2025/11/23 05:32:20 Received message: payload:9 from topic: topic/test

のようになります。

ブラウザーかcurlでhttp://localhost:1888にアクセスすれば、

$curl http://localhost:1888
{
	"version": "2.7.9",
	"started": 1763843928,
	"time": 1763843948,
	"uptime": 20,
	"bytes_received": 279,
	"bytes_sent": 239,
	"clients_connected": 0,
	"clients_disconnected": 0,
	"clients_maximum": 1,
	"clients_total": 0,
	"messages_received": 10,
	"messages_sent": 10,
	"messages_dropped": 0,
	"retained": 0,
	"inflight": 0,
	"inflight_dropped": 0,
	"subscriptions": 0,
	"packets_received": 13,
	"packets_sent": 12,
	"memory_alloc": 2015232,
	"threads": 6
}

のようにサーバーのStatsを取得できます。

TWSNMP FC/FKの応用例

TWSNMP FC/FKをMQTT対応した目的に一つは、

image.png

のような環境でZigbeeのセンサーのデータをTWSNMPで監視することです。最初は、mosquittoというMQTTサーバーを経由していましたが、サーバーも組み込んでしまいました。

クライアント

クライアントは、ポーリング機能で使っています。MQTTサーバー(ブローカー)に接続して特定のTopicをサブスクライブすることで監視する仕組みです。

image.png

のように取得したデータを

image.png

のような感じでグラフ化できます。

TWSNMP内蔵のMQTTサーバー以外でも使えます。

サーバー

サーバーは、ブローカーの役割をしますが、MQTTクライアントのテストにも使えます。

image.png

のようにTopicの受信状況を一覧でみることができます。

もちろん、このサーバーからデータを取得するポーリングもできます。
受信したTopicをsyslogに記録する設定にすれば、Topicをログに記録することもできます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?