cluster では現在 M2Mqtt を使っているのですが、Async や Rx のサポートがあるライブラリ使いたいお気持ちが発生したので chkr1011/MQTTnet を触ってみたいと思います。
(この記事はCluster,Inc. Advent Calendar 2018 - Qiita 9日目の記事のはずでした)
準備
インポート
Unity(今回は 2018.3.0f2 を使用)で新規プロジェクトを作成します。
NuGet Gallery から MQTTnet 2.8.5 のパッケージをダウンロードし、展開(.nupkg -> .zip にリネームして展開しました)して .NET 4.7.2 向けの DLL ファイルを、プロジェクトにインポートします。
接続
MqttFactory
から作成したクライアントに、MqttClientOptionsBuilder
から作成した接続オプションを渡して接続します。
IMqttClientOptions
に限らず、MQTTnet はオブジェクト作成が概ね Builder パターンで提供されており、M2Mqtt と比べてやたら引数の多いコンストラクタやメソッドが出てこないので精神衛生上よいです。
var factory = new MqttFactory();
var mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.hivemq.com")
.Build();
mqttClient.Connected += (s, e) => Debug.Log("接続したときの処理");
mqttClient.Disconnected += (s, e) => Debug.Log("切断されたときの処理");
await mqttClient.ConnectAsync(options);
IMqttClient.Disconnected
は IMqttClient.DisconnectAsync()
を呼んだ場合も、意図しない切断が発生したときも両方呼ばれます。
意図しない切断の場合は MqttClientDisconnectedEventArgs.Exception
に何かしら入っているので、そこを見てよしなにやれば再接続処理などもできます。
mqttClient.Disconnected += async (s, e) =>
{
if (e.Exception == null)
{
Debug.Log("意図した切断です");
return;
}
Debug.Log("意図しない切断です。5秒後に再接続を試みます");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(options);
}
catch
{
Debug.Log("再接続に失敗しました");
}
};
メッセージの送受信
受信
受信したメッセージは IMqttClient.ApplicationMessageReceived
でハンドリングします。
mqttClient.ApplicationMessageReceived += (s, e) =>
{
var stringBuilder = new StringBuilder();
stringBuilder.AppendLine("メッセージを受信しました");
stringBuilder.AppendLine($"Topic = {e.ApplicationMessage.Topic}");
stringBuilder.AppendLine($"Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
stringBuilder.AppendLine($"QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
stringBuilder.AppendLine($"Retain = {e.ApplicationMessage.Retain}");
Debug.Log(stringBuilder);
};
購読
TopicFilter
をクライアントに渡して購読します。
wait mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("johnson65/helloworld").Build());
};
送信
MqttApplicationMessage
をクライアントに渡して送信します。
var message = new MqttApplicationMessageBuilder()
.WithTopic("johnson65/helloworld")
.WithPayload("Hello World")
.WithExactlyOnceQoS()
.Build();
await mqttClient.PublishAsync(message);
まとめ
来年はこれ使いたいです。
最終的なコード
using System;
using System.Text;
using System.Threading.Tasks;
using MQTTnet;
using MQTTnet.Client;
using UnityEngine;
public class Main : MonoBehaviour
{
IMqttClient mqttClient;
async void Start()
{
var factory = new MqttFactory();
mqttClient = factory.CreateMqttClient();
var options = new MqttClientOptionsBuilder()
.WithTcpServer("broker.hivemq.com")
.Build();
mqttClient.Connected += (s, e) => Debug.Log("接続したときの処理");
mqttClient.Disconnected += async (s, e) =>
{
Debug.Log("切断したときの処理");
if (e.Exception == null)
{
Debug.Log("意図した切断です");
return;
}
Debug.Log("意図しない切断です。5秒後に再接続を試みます");
await Task.Delay(TimeSpan.FromSeconds(5));
try
{
await mqttClient.ConnectAsync(options);
}
catch
{
Debug.Log("再接続に失敗しました");
}
};
mqttClient.ApplicationMessageReceived += (s, e) =>
{
var stringBuilder = new StringBuilder();
stringBuilder.AppendLine("メッセージを受信しました");
stringBuilder.AppendLine($"Topic = {e.ApplicationMessage.Topic}");
stringBuilder.AppendLine($"Payload = {Encoding.UTF8.GetString(e.ApplicationMessage.Payload)}");
stringBuilder.AppendLine($"QoS = {e.ApplicationMessage.QualityOfServiceLevel}");
stringBuilder.AppendLine($"Retain = {e.ApplicationMessage.Retain}");
Debug.Log(stringBuilder);
};
await mqttClient.ConnectAsync(options);
await mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic("johnson65/helloworld").Build());
var message = new MqttApplicationMessageBuilder()
.WithTopic("johnson65/helloworld")
.WithPayload("Hello World")
.WithExactlyOnceQoS()
.Build();
await mqttClient.PublishAsync(message);
await mqttClient.DisconnectAsync();
}
}
実行結果
(TopicFilter
で QoS を指定しなかったから QoS ダウングレードで AtMostOnce になっているな )