51
54

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

MQTTって何?

MQ Telemetry Transport。メッセージキュープロトコルの1つ。
標準化されたメッセージキュープロトコルには、
AMQP(Advanced Message Queuing Protocol)、MQTT、STOMP(Streaming Text Oriented Messaging Protocol)などがありますが、MQTTの特徴を挙げると

  • 軽い:プロトコル電文仕様が軽量で且つシンプル
  • Pub/Sub型:データ配信モデルに特化、Queueモデルはサポートしない。そのためJob Queueモデルを利用した負荷分散には向かない
  • QoS(Quality of Service)の指定ができる:クライアントとBroker間の通信品質を3つのQoSとしてパラメータ指定可能
  • QoS0:最高1回。最も軽い。メッセージが確実に届く保証はない、メッセージ配布に失敗しても再送をしない
  • QoS1:最低1回。必ずメッセージ配布するが、重複する可能性がある
  • QoS2:正確に1回。最も重い。必ずメッセージを配布して、重複も発生しない
  • ネットワークが不安定な場所で動作する:→ QoS1とQoS2のBrokerとクライアント間の一定のメッセージ到達保証で担保の模様。HeatBeat時間を設定することでもブツブツ切れる端末もカバー

試してみる

ここでは、簡単なサンプルとして、JavaプロセスからブラウザへPub/Subを試してみます

環境

  • OS:Windows7 32bit

broker実装

  • Mosquittoでも良かったんですが、Apache Apollo を利用します。 MQTTのだけではなくSTOMP/AMQP1.0もサポートしており、ちょっと試したかったので。
  • http://activemq.apache.org/apollo/

Publisher側コード

TestPublish.java
// ClientIDは接続単位でユニークにする必要があるのでここではUUID利用
String clientId = "JavaClient" + UUID.randomUUID().toString();
MqttClient sampleClient = new MqttClient("tcp://localhost:61613", clientId, new MemoryPersistence());
MqttConnectOptions connOpts = new MqttConnectOptions();
connOpts.setUserName("admin");
connOpts.setPassword("password".toCharArray());
connOpts.setCleanSession(true);
sampleClient.connect(connOpts);
// Publishするメッセージ
String messageContent = "This is Test Message";
MqttMessage message = new MqttMessage(messageContent.getBytes());
// QoSは数値で指定
message.setQos(1);
sampleClient.publish("/testtopic", message);
sampleClient.disconnect();

Subscriber側(Browser側JavaScript)コード

  • ブラウザでMQTTを使う場合は、MQTT over WebSocketとなる。
  • MQTTクライアントJavascriptライブラリとして同じくPahoを利用。メッセージをCallbackで受けてConsoleへ出力
var clientId = "JavaScriptClient-"+(Math.floor(Math.random() * 100000));
client = new Paho.MQTT.Client('localhost', Number(61623), clientId);

// Callback設定
client.onConnectionLost = onConnectionLost;
client.onMessageArrived = onMessageArrived;

// 接続
client.connect({
	onSuccess:onConnect,
    onFailure:onFailure,
	userName:'admin', 
    password:'password' 
});

function onConnect() {
  // subscribeするTopics設定
  client.subscribe("/testtopic");
}

function onFailure(failure) {
  console.log("failure:"+failure.errorMessage);
}

// コネクション切断Callback
function onConnectionLost(responseObject) {
  if (responseObject.errorCode !== 0) {
    console.log("onConnectionLost:"+responseObject.errorMessage);
  }
}

// messeage到達Callback
function onMessageArrived(message) {
  console.log("onMessageArrived:"+message.payloadString);
}

実行してみる

  • まずBrokerを実行しておく
C:\apps\ApolloBroker\mybroker\bin\apollo-broker.cmd run

Subscriber

  • Pub/SubではSubscriberをbrokerへ接続しておく必要があるので、前述のJavaScriptコードを実行しBrokerへ接続しておきます。
  • ここではChromeとFirefoxの2プロセスから接続してみましょう
  • 接続後Apolloの管理コンソール (http://127.0.0.1:61680/) から下図のようにConnector > ws で2つの接続が確認できます。
    foo.png
  • また、Topicsを見るとSubscriber(Consumer)も2接続あるのも確認できます。
    Consumer.png
  • JavaプログラムでPublishを実行するとSubscriber側のブラウザのコンソールへメッセージが送信されます
    callback.png

簡単なサンプル実行でしたが。。

今回全てローカル環境での簡単なサンプル実行でしたが、MQTTを利用することで、簡単に異プラットフォーム・異言語間で通信できることが確認できました。
MQTTクライアントはNode.jsやC#やPython・Androidと多数の言語・プラットフォームでサポートされているので、これらのプロセス・デバイス間で簡単にPub/Sub可能となりなかなか面白いですね。
Facebook MessengerのPushでも利用されていたり、Push基盤のプロトコルとしても利用できそうです。
後々、他のBroker実装やQoSの電文保証も検証してみようと思います。
AWSでもマネージドサービスとしてMQTTサービス(AWS IoT Message Broker)を展開するようなのでこちらも試してみたい!

参考情報

51
54
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
51
54

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?