MQTTって何?
MQ Telemetry Transport。メッセージキュープロトコルの1つ。
標準化されたメッセージキュープロトコルには、
AMQP(Advanced Message Queuing Protocol)、MQTT、STOMP(Streaming Text Oriented Messaging Protocol)などがありますが、MQTTの特徴を挙げると
- 軽い:プロトコル電文仕様が軽量で且つシンプル
- Pub/Sub型:データ配信モデルに特化、Queueモデルはサポートしない。そのためJob Queueモデルを利用した負荷分散には向かない
- QoS(Quality of Service)の指定ができる:クライアントとBroker間の通信品質を3つのQoSとしてパラメータ指定可能
- QoS0:最高1回。最も軽い。メッセージが確実に届く保証はない、メッセージ配布に失敗しても再送をしない
- QoS1:最低1回。必ずメッセージ配布するが、重複する可能性がある
- QoS2:正確に1回。最も重い。必ずメッセージを配布して、重複も発生しない
- ネットワークが不安定な場所で動作する:→ QoS1とQoS2のBrokerとクライアント間の一定のメッセージ到達保証で担保の模様。HeatBeat時間を設定することでもブツブツ切れる端末もカバー
試してみる
ここでは、簡単なサンプルとして、JavaプロセスからブラウザへPub/Subを試してみます
環境
- OS:Windows7 32bit
broker実装
- Mosquittoでも良かったんですが、Apache Apollo を利用します。 MQTTのだけではなくSTOMP/AMQP1.0もサポートしており、ちょっと試したかったので。
- http://activemq.apache.org/apollo/
Publisher側コード
- Java + MQTTクライアントとしてPaho(http://www.eclipse.org/paho/)を利用
- ここでは、/testtopic へ 「This is Test Message」をPublishしています
// ClientIDは接続単位でユニークにする必要があるのでここではUUID利用
String clientId = "JavaClient" + UUID.randomUUID().toString();
MqttClient sampleClient = new MqttClient("tcp://localhost:61613", clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("admin");
connOpts.setPassword("password".toCharArray());
connOpts.setCleanSession(true);
sampleClient.connect(connOpts);
// Publishするメッセージ
String messageContent = "This is Test Message";
MqttMessage message = new MqttMessage(messageContent.getBytes());
// QoSは数値で指定
message.setQos(1);
sampleClient.publish("/testtopic", message);
sampleClient.disconnect();
Subscriber側(Browser側JavaScript)コード
- ブラウザでMQTTを使う場合は、MQTT over WebSocketとなる。
- MQTTクライアントJavascriptライブラリとして同じくPahoを利用。メッセージをCallbackで受けてConsoleへ出力
var clientId = "JavaScriptClient-"+(Math.floor(Math.random() * 100000));
client = new Paho.MQTT.Client('localhost', Number(61623), clientId);
// Callback設定
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;
// 接続
client.connect({
onSuccess:onConnect,
onFailure:onFailure,
userName:'admin',
password:'password'
});
function onConnect() {
// subscribeするTopics設定
client.subscribe("/testtopic");
}
function onFailure(failure) {
console.log("failure:"+failure.errorMessage);
}
// コネクション切断Callback
function onConnectionLost(responseObject) {
if (responseObject.errorCode !== 0) {
console.log("onConnectionLost:"+responseObject.errorMessage);
}
}
// messeage到達Callback
function onMessageArrived(message) {
console.log("onMessageArrived:"+message.payloadString);
}
実行してみる
- まずBrokerを実行しておく
C:\apps\ApolloBroker\mybroker\bin\apollo-broker.cmd run
Subscriber
- Pub/SubではSubscriberをbrokerへ接続しておく必要があるので、前述のJavaScriptコードを実行しBrokerへ接続しておきます。
- ここではChromeとFirefoxの2プロセスから接続してみましょう
- 接続後Apolloの管理コンソール (http://127.0.0.1:61680/) から下図のようにConnector > ws で2つの接続が確認できます。
- また、Topicsを見るとSubscriber(Consumer)も2接続あるのも確認できます。
- JavaプログラムでPublishを実行するとSubscriber側のブラウザのコンソールへメッセージが送信されます
簡単なサンプル実行でしたが。。
今回全てローカル環境での簡単なサンプル実行でしたが、MQTTを利用することで、簡単に異プラットフォーム・異言語間で通信できることが確認できました。
MQTTクライアントはNode.jsやC#やPython・Androidと多数の言語・プラットフォームでサポートされているので、これらのプロセス・デバイス間で簡単にPub/Sub可能となりなかなか面白いですね。
Facebook MessengerのPushでも利用されていたり、Push基盤のプロトコルとしても利用できそうです。
後々、他のBroker実装やQoSの電文保証も検証してみようと思います。
AWSでもマネージドサービスとしてMQTTサービス(AWS IoT Message Broker)を展開するようなのでこちらも試してみたい!
参考情報
- MQTT V3.1 仕様 http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
- MQTT Broker実装
- Mosquitto http://mosquitto.org/
- Apache Apollo http://activemq.apache.org/apollo/
- RabbitMQ http://www.rabbitmq.com/mqtt.html
- MQTT Client実装
- Paho - (Android, C/C++, Java, JavaScript, Python, Go, .Net) http://www.eclipse.org/paho/
- M2Mqtt - MQTT Client Library for .Net. https://www.nuget.org/packages/M2Mqtt/
- Moscapsule - MQTT Client for iOS written in Swift. https://github.com/flightonary/Moscapsule