次のページを参考にしました。
GoでMQTTのpub/subを試す
ブローカーは broker.emqx.io です。
subscribe
subscribe.go
// ---------------------------------------------------------------
//
// subscribe.go
//
// Jan/25/2021
// ---------------------------------------------------------------
package main
import (
"fmt"
"log"
"os"
"os/signal"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// ---------------------------------------------------------------
func main() {
fmt.Fprintf (os.Stderr,"*** 開始 ***\n")
msgCh := make(chan mqtt.Message)
var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
msgCh <- msg
}
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://broker.emqx.io:1883")
cc := mqtt.NewClient(opts)
if token := cc.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Mqtt error: %s", token.Error())
}
if subscribeToken := cc.Subscribe("go-mqtt/sample", 0, f); subscribeToken.Wait() && subscribeToken.Error() != nil {
log.Fatal(subscribeToken.Error())
}
signalCh := make(chan os.Signal, 1)
signal.Notify(signalCh, os.Interrupt)
for {
select {
case m := <-msgCh:
fmt.Printf("topic: %v, payload: %v\n", m.Topic(), string(m.Payload()))
case <-signalCh:
fmt.Printf("Interrupt detected.\n")
cc.Disconnect(1000)
return
}
}
}
// ---------------------------------------------------------------
実行の準備
go mod init subscribe
go mod tidy
実行コマンド
subscribe.sh
go run subscribe.go
publish
publish.go
// ---------------------------------------------------------------
//
// publish.go
//
// Jan/25/2021
// ---------------------------------------------------------------
package main
import (
"os"
"fmt"
"log"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
// ---------------------------------------------------------------
func main() {
fmt.Fprintf (os.Stderr,"*** 開始 ***\n")
opts := mqtt.NewClientOptions()
opts.AddBroker("tcp://broker.emqx.io:1883")
cc := mqtt.NewClient(opts)
if token := cc.Connect(); token.Wait() && token.Error() != nil {
log.Fatalf("Mqtt error: %s", token.Error())
}
for it := 0; it < 5; it++ {
now := time.Now ()
text := fmt.Sprintf("こんにちは %d ", it) +
fmt.Sprintf ("%s",now)
token := cc.Publish("go-mqtt/sample", 0, false, text)
token.Wait()
}
cc.Disconnect(250)
fmt.Println("Complete publish")
fmt.Fprintf (os.Stderr,"*** 終了 ***\n")
}
// ---------------------------------------------------------------
実行の準備
go mod init publish
go mod tidy
実行コマンド
publish.sh
go run publish.go
確認したバージョン
$ go version
go version go1.19.2 linux/amd64