しばらく放置していてネタが溜まってしまった
IoTデバイスのように大量のデバイスからの通信がある場合や、
ネットワーク品質が高くない場合、バッテリーに制限がある場合に有効。
説明はこことか見ると良さそう。
サーバについてはbeebotteというサービスが有名なようだが、node.jsでローカルブローカーを立てる事も可能。
moscaというものがあったが、aedesという後継プロジェクトで高速化しているらしい。
勿論AWSにもAWS IoT Coreというものが存在している。
今回はnodejsで立てる。
クライアント側はPythonだとpaho-mqttというライブラリがある。nodejsだとmqttというライブラリ。
C#でもこことかを見るとMQTTnetが使えそう。
ラズパイならPythonで良いが、今回はUnityも視野に入れてC#で確認したい。
あとブラウザからMQTT over WebSocketも確認しておきたい。
というよくばりセット。
ブローカーとnodejsによる送受信
npm install aedes
でaedesをインストール。
スクリプトはここを参考にする。
というかほぼ丸ごとコピーした。
const aedes = require('aedes')();
// 各アクションに対する反応
aedes.on('clientError', function (client, err) {
console.log('client error', client.id, err.message, err.stack)
});
aedes.on('connectionError', function (client, err) {
console.log('client error', client, err.message, err.stack)
});
aedes.on('publish', function (packet, client) {
if (client) {
console.log('message from client', client.id)
}
});
aedes.on('subscribe', function (subscriptions, client) {
if (client) {
console.log('subscribe from client', subscriptions, client.id)
}
});
aedes.on('unsubscribe', function (subscriptions, client) {
if (client) {
console.log('unsubscribe from client', subscriptions, client.id)
}
});
aedes.on('client', function (client) {
console.log('new client', client.id)
});
aedes.on('clientReady', function (client) {
console.log('client is ready', client.id)
});
aedes.on('clientDisconnect', function (client) {
console.log('client disconnected', client.id)
});
// 実際のサーバ立ち上げ
const server = require('net').createServer(aedes.handle);
const port = 1883;
server.listen(port, function () {
console.log(`Server running at mqtt://localhost:${port}/`);
});
各アクションの内容もパクってメモしておく。
アクション | 説明 |
---|---|
client | 新規クライアントが登録されたとき |
clientReady | クライアントの初期化完了し、通信待機状態になったとき |
clientDisconnect | クライアントの通信が正常に切断されたとき |
clientError | 通信中のクライアントに異常が発生した場合 |
connectionError | クライアントの初期化に失敗した場合 |
publish | パブリッシャーがメッセージをブローカーに送信した時の処理で、第一引数にメッセージパケットを含める |
subscribe | サブスクライバーが購読をリクエストした時の処理で、第一引数にトピックを含むオブジェクトを指定 |
unsubscribe | サブスクライバーが購読を取り止める時に処理で、第一引数にトピックを含むオブジェクトを指定 |
closed | サーバーが終了した場合 |
まずはnode.js上から接続を簡単に確認したいので
npm install mqtt
でクライアントモジュールをインストールして、以下で確認する。
なお、メッセージ送信をおこなう方をパブリッシャー、受信する方をサブスクライバーと呼ぶらしい。
わかり辛いので送信側、受信側と呼ぶ。
まずは受信側の作成。
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');
const topic = 'mqtt/test';
client.on('connect', function() {
console.log('subscriber connected.');
});
client.subscribe(topic, function(err, granted) {
console.log('subscriber subscribed.');
});
client.on('message', function(topic_, message) {
console.log('subscriber received topic:', topic_, 'message:', message.toString());
});
送信側は受信側と同じTopicを持たせる
const mqtt = require('mqtt');
const client = mqtt.connect('mqtt://localhost:1883');
const message = 'SAY HELLO TO MQTT';
const topic = 'mqtt/test'
client.on('connect', function() {
console.log('publisher connected.');
client.publish(topic, message);
console.log('send topic:', topic, ', message:', message);
});
これでブローカー、受信側、送信側の順番で立ち上げるとメッセージが送信される。
元々貧弱な回線が想定されているが、再接続についてはあまり考慮をしなくて良いのかまでは未確認
C#での送受信
githubからMQTTnetをダウンロードして利用する。
ライセンスはMIT。自分でコンパイルするのは面倒なので、Windows用が存在しているはv3.0.16をダウンロードしてみた。
Unityで利用する場合はバージョンに制約がある模様。
解凍したzipの中にあるMQTTnet.dllを利用する。
色々な設定等は公式wikiに載っている。
コンパイルは以前同様に.netのcscを利用する。(dllの参照オプションが必要)
.NET4系でコンパイル
error CS0012: The type 'System.IDisposable' is defined in an assembly that is not referenced. You must add a reference to assembly 'System.Runtime, Version=5.0.0.0,
のエラー
そりゃMQTTnet.dllだけではダメよな、ということでエラーが出た分を列挙
C:\Windows\Microsoft.NET\Framework\v4.0.30319\csc.exe /r:System.Runtime.dll,MQTTnet.dll,System.Private.CoreLib.dll -warn:0 publisher.cs
で依存関係のエラーは消えた(警告は沢山出るが無視する)
ただ、
The type 'System.TimeSpan' exists in both...
のようなクラス重複やCS1660、CS4010のラムダ式とデリゲート関連のエラーが出る。
both云々のエラーは
C:\Windows\Microsoft.NET\Framework\v4.0.30319\csc.exe /r:System.Runtime.dll /r:MQTTnet.dll /r:System=System.Private.CoreLib.dll -warn:0 publisher.cs
とSystemを.NET標準ではなく指定DLLから読み込むように強制的にオプションで指定するとなくなった。
Cannot await in the body of a catch clause
はC#のバージョンによるので、エラー時にDelayをかけていない。
色々試した結果、最終的には以下のようにしてコンパイルは通った。
using MQTTnet;
using MQTTnet.Client;
using MQTTnet.Client.Options;
using MQTTnet.Client.Connecting;
using MQTTnet.Client.Receiving;
using MQTTnet.Client.Disconnecting;
using System;
using System.Threading;
using System.Threading.Tasks;
class publisher {
static IMqttClient mqttClient = new MqttFactory().CreateMqttClient();
static IMqttClientOptions options;
static string topic = "mqtt/test";
static void Main(string[] args) {
try{
options = new MqttClientOptionsBuilder()
.WithTcpServer("localhost",1883)
.WithCleanSession()
.Build();
// SecureTPC connection = .WithTls()
OnStarted();
while(true){
Thread.Sleep(1000);
PubMessage();
}
}catch(Exception ex){
Console.WriteLine(ex);
}
}
static void OnStarted(){
mqttClient.SubscribeAsync(new TopicFilterBuilder().WithTopic(topic).Build());
mqttClient.UseConnectedHandler(async (e) =>
{
Console.WriteLine("### SUBSCRIBED ###");
});
// 送信
mqttClient.UseDisconnectedHandler(async (e) => {
Console.WriteLine("### DISCONNECTED FROM SERVER ###");
await Task.Delay(TimeSpan.FromSeconds(3));
try{ Connect(); }
catch { Console.WriteLine("### RECONNECTING FAILED ###"); }
});
// 受信
mqttClient.UseApplicationMessageReceivedHandler( (e) => {
Console.WriteLine(System.Text.Encoding.UTF8.GetString(e.ApplicationMessage.Payload));
});
}
static async void Connect(){
var retry = 0;
while (!mqttClient.IsConnected && retry < 5)
{
try
{
mqttClient.ConnectAsync(options).Wait(); //CancellationToken.Noneはエラーになった
}
catch
{
// 最近のC#ならこれができるので再接続トライになる
//await Task.Delay(TimeSpan.FromSeconds(1));
}
retry++;
}
}
static void PubMessage() {
Console.WriteLine("Send");
var message = new MqttApplicationMessageBuilder()
.WithTopic(topic)
.WithPayload("Hello World")
.WithExactlyOnceQoS()
.WithRetainFlag()
.Build();
mqttClient.PublishAsync(message); // awaitできなかった
}
}}
コンパイル出来たので実行すると
System.TypeLoadException: メソッド 'Main' に実装がないため (RVA なし)
のエラーが出た。
mainのMqttClientOptionsBuilderのWithTcpServerの箇所問題が起きている事までは分かったが解消方法が分からないので、
とりあえず今回はコンパイル通ったので一旦よしとする。
VisualStudioからコンパイルすればいけるのか?.NETのバージョンが新しい必要があるのか?などC#には詳しくないので分からないが、断念。
ブラウザからのMQTT
ここにMQTTやっている例があるので参考にしてみる。
普通に接続出来ないので、over WebSocketにする必要がある、という事はサーバ側もWebSocketの受けが必要なので
上の方で立ち上げたnodejsのブローカーだけだと上手くいかない。
というわけでここの質問参考にすると
websocket-streamのhandleにMQTT側のhandleを指定すればよい事になっている。
npm install websocket-stream
でWebSocketでインストールして以下のようにaedes.handleに渡すように追記してやる。
// over WebScoekt
const wsPort = 3000;
const httpServer = require('http').createServer();
const ws = require('websocket-stream');
ws.createServer({ server: httpServer }, aedes.handle);
httpServer.listen(wsPort, function () {
console.log('Aedes MQTT-WS listening on port: ' + wsPort)
});
HTML側はサンプルそのままでアドレス変えただけ
<!DOCTYPE html>
<html>
<head>
<title>MQTT.js Test</title>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
</head>
<body>
<p>MQTT.js Test</p>
<input type="button" value="Publish" onclick="OnButtonClick()"/>
<script>
var client = mqtt.connect('ws://localhost:3000');
client.on('connect', () => {
console.log('connected');
client.subscribe('test');
});
client.on('message', (topic, message) => {
console.log(topic + ' : ' + message);
});
function OnButtonClick() {
console.log('onClick');
client.publish('mqtt/test', 'hello world!');
}
</script>
</body>
</html>
これで最初に作成したnode.jsのsubscriveを起動してからブラウザで叩くとメッセージが表示された。
なんとなく使えたのでおわり。