LoginSignup
4
4

More than 1 year has passed since last update.

Go: MQTT の pub/sub

Last updated at Posted at 2021-01-25

次のページを参考にしました。
GoでMQTTのpub/subを試す
ブローカーは broker.emqx.io です。

subscribe

subscribe.go
// ---------------------------------------------------------------
//
//	subscribe.go
//
//					Jan/25/2021
// ---------------------------------------------------------------
package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// ---------------------------------------------------------------
func main() {
	fmt.Fprintf (os.Stderr,"*** 開始 ***\n")
	msgCh := make(chan mqtt.Message)
	var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
		msgCh <- msg
	}
	opts := mqtt.NewClientOptions()
	opts.AddBroker("tcp://broker.emqx.io:1883")
	cc := mqtt.NewClient(opts)

	if token := cc.Connect(); token.Wait() && token.Error() != nil {
		log.Fatalf("Mqtt error: %s", token.Error())
	}

	if subscribeToken := cc.Subscribe("go-mqtt/sample", 0, f); subscribeToken.Wait() && subscribeToken.Error() != nil {
		log.Fatal(subscribeToken.Error())
	}

	signalCh := make(chan os.Signal, 1)
	signal.Notify(signalCh, os.Interrupt)

	for {
		select {
		case m := <-msgCh:
			fmt.Printf("topic: %v, payload: %v\n", m.Topic(), string(m.Payload()))
		case <-signalCh:
			fmt.Printf("Interrupt detected.\n")
			cc.Disconnect(1000)
			return
		}
	}
}

// ---------------------------------------------------------------

実行の準備

go mod init subscribe
go mod tidy

実行コマンド

subscribe.sh
go run subscribe.go

publish

publish.go
// ---------------------------------------------------------------
//
//	publish.go
//
//					Jan/25/2021
// ---------------------------------------------------------------
package main

import (
	"os"
	"fmt"
	"log"
	"time"

	mqtt "github.com/eclipse/paho.mqtt.golang"
)

// ---------------------------------------------------------------
func main() {
	fmt.Fprintf (os.Stderr,"*** 開始 ***\n")
	opts := mqtt.NewClientOptions()
	opts.AddBroker("tcp://broker.emqx.io:1883")
	cc := mqtt.NewClient(opts)

	if token := cc.Connect(); token.Wait() && token.Error() != nil {
		log.Fatalf("Mqtt error: %s", token.Error())
	}

	for it := 0; it < 5; it++ {
		now := time.Now ()
		text := fmt.Sprintf("こんにちは %d ", it) +
			fmt.Sprintf ("%s",now)
		token := cc.Publish("go-mqtt/sample", 0, false, text)
		token.Wait()
	}

	cc.Disconnect(250)

	fmt.Println("Complete publish")
	fmt.Fprintf (os.Stderr,"*** 終了 ***\n")
}


// ---------------------------------------------------------------

実行の準備

go mod init publish
go mod tidy

実行コマンド

publish.sh
go run publish.go

確認したバージョン

$ go version
go version go1.19.2 linux/amd64
4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4