GolangはGoogleによって開発された静的かつ強く型付けされた、コンパイルされ、並行処理が可能なガベージコレクションを備えたプログラミング言語です。Goは表現豊かでクリーンで効率的です。その並行処理メカニズムは、マルチコアやネットワークマシンを最大限に活用するプログラムの作成を容易にし、革新的な型システムは柔軟かつモジュラーなプログラム構築を可能にします。Goは迅速にマシンコードにコンパイルされる一方、ガベージコレクションとランタイムリフレクションの利便性を備えています。これは動的型付けの解釈言語のような高速で静的型付けのコンパイル言語です。
MQTTは、軽量なIoTメッセージングプロトコルであり、パブリッシュ/サブスクライブモデルに基づいています。このプロトコルは、非常に少ないコードと帯域幅を使用してIoTデバイスにリアルタイムで信頼性の高いメッセージングサービスを提供できます。ハードウェアリソースや帯域幅が限られた環境のデバイスに適しているため、MQTTプロトコルはIoT、モバイルインターネット、IoV、電力業界など幅広い分野で使用されています。
この記事では主に、Golangプロジェクトでpaho.mqtt.golangクライアントライブラリを使用し、クライアントとMQTTブローカー間の接続、サブスクリプション、メッセージングの実装方法を紹介します。
プロジェクトの初期化
このプロジェクトは go 1.13.12
をベースに開発およびテストされています。
go version
go version go1.13.12 darwin/amd64
このプロジェクトではMQTTクライアントライブラリとしてpaho.mqtt.golangを使用しています。インストールは以下のように行います:
go get github.com/eclipse/paho.mqtt.golang
GoでのMQTTの使用
この記事では、EMQXが提供する無料の公開MQTTブローカーを使用します。このサービスは、EMQXのMQTT IoTクラウドプラットフォームに基づいて作成されています。サーバーへのアクセス情報は以下の通りです:
- ブローカー: broker.emqx.io
- TCPポート: 1883
- Websocketポート: 8083
MQTTブローカーへの接続
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"time"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func main() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("emqx")
opts.SetPassword("public")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
}
- ClientOptions: ブローカー、ポート、クライアントID、ユーザー名、パスワードなどの設定に使用。
- messagePubHandler: MQTT pubメッセージ処理のグローバルハンドラ。
- connectHandler: 接続のコールバック。
- connectLostHandler: 接続喪失のコールバック。
TSL接続を使用する場合、以下の設定を使用できます:
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
// クライアント証明書/キーペアのインポート
clientKeyPair, err := tls.LoadX509KeyPair("client-crt.pem", "client-key.pem")
if err != nil {
panic(err)
}
return &tls.Config{
RootCAs: certpool,
ClientAuth: tls.NoClientCert,
ClientCAs: nil,
InsecureSkipVerify: true,
Certificates: []tls.Certificate{clientKeyPair},
}
}
クライアント証明書が設定されていない場合は、以下のように設定できます:
func NewTlsConfig() *tls.Config {
certpool := x509.NewCertPool()
ca, err := ioutil.ReadFile("ca.pem")
if err != nil {
log.Fatalln(err.Error())
}
certpool.AppendCertsFromPEM(ca)
return &tls.Config{
RootCAs: certpool,
}
}
その後、TLSを設定します。
var broker = "broker.emqx.io"
var port = 8883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("ssl://%s:%d", broker, port))
tlsConfig := NewTlsConfig()
opts.SetTLSConfig(tlsConfig)
サブスクリプション
func sub(client mqtt.Client) {
topic := "topic/test"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic %s", topic)
}
メッセージのパブリッシュ
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("topic/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
テスト
以下のコードでテストを行います。
package main
import (
"fmt"
mqtt "github.com/eclipse/paho.mqtt.golang"
"log"
"time"
)
var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
fmt.Printf("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
fmt.Println("Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
fmt.Printf("Connect lost: %v", err)
}
func main() {
var broker = "broker.emqx.io"
var port = 1883
opts := mqtt.NewClientOptions()
opts.AddBroker(fmt.Sprintf("tcp://%s:%d", broker, port))
opts.SetClientID("go_mqtt_client")
opts.SetUsername("emqx")
opts.SetPassword("public")
opts.SetDefaultPublishHandler(messagePubHandler)
opts.OnConnect = connectHandler
opts.OnConnectionLost = connectLostHandler
client := mqtt.NewClient(opts)
if token := client.Connect(); token.Wait() && token.Error() != nil {
panic(token.Error())
}
sub(client)
publish(client)
client.Disconnect(250)
}
func publish(client mqtt.Client) {
num := 10
for i := 0; i < num; i++ {
text := fmt.Sprintf("Message %d", i)
token := client.Publish("topic/test", 0, false, text)
token.Wait()
time.Sleep(time.Second)
}
}
func sub(client mqtt.Client) {
topic := "topic/test"
token := client.Subscribe(topic, 1, nil)
token.Wait()
fmt.Printf("Subscribed to topic: %s", topic)
}
コードを実行すると、MQTT接続とサブスクリプションが成功し、サブスクライブしたトピックのメッセージを正常に受信できることが確認できます。
まとめ
これまでに、paho.mqtt.golangクライアントを使用して公開MQTTブローカーに接続し、テストクライアントとMQTTブローカー間の接続、メッセージのパブリッシュ、サブスクリプションを実装しました。
次に、EMQが提供するMQTTプロトコルに関する簡単で理解しやすいガイドシリーズの記事をチェックして、MQTTプロトコルの特徴を学び、MQTTのより高度なアプリケーションを探求し、MQTTアプリケーションおよびサービス開発を始めてみましょう。