MQTT概要
概要
HTTPからMQTTへ - IBMが提唱するモノとモノがつながる時代に最適化したプロトコル&アプライアンス
わかりやすい説明
MQTTについてのまとめ
ガチの人のgistはこちら
MQTT とはなんだったのか そこはかとなく書くよん。
いわゆるPUB/SUB型のメッセージプロトコルだけど、RabitMQとかとは違ってインターネットや4G/LTE網など信頼性の低いネットワークを考慮しているのが特徴...と認識してます。
使用シーンとしては、多数のセンサーからのデータを一旦MQTTブローカーに集めて、そこからsubscribeしてロギングするとか、あるセンサーのpublishした計測値を別の機器でsubscribeしてその値を元に制御するとか。
eMQTT概要
Elrang/OTPで実装されたオープンソースのMQTTブローカー。ライセンスはMIT。
これを書いている時点での特徴的な機能は以下のとおり。
- MQTT V3.1.1サポート
- HTTP/WebSocketサポート
- 複数ノードでクラスタを組める。
- プラグインによる機能拡張
- QoS 0,1,2をサポート
- retainサポート
- willメッセージサポート
- SSLサポート
- クライアントIDとIPアドレスによる認証
- ユーザー名:パスワードによる認証
公表されてるベンチマーク結果では
- 20万コネクション
- 2000接続オープン/秒
- 20万topic
- 2万IN/OUTメッセージ/秒
を8GBメモリのマシンで稼働できるとか。
ちなみに同作者によるErlang/OTP実装のMQTTクライアントemqttcもあります。なぜか1年ぐらい前に僕もコミットしてます :-)
インスタンスの用意
とりあえずEC2で2つほどインスタンスを用意します。
ホスト名: emqtt1
-
- ローカルIPアドレス:
172.31.1.1
- ローカルIPアドレス:
- パブリックDNS:
emqtt1.com
- アベイラビリティゾーン:
ap-northeast-1a
ホスト名: emqtt2
- ローカルIPアドレス:
172.31.1.2
- パブリックDNS:
emqtt2.com
- アベイラビリティゾーン:
ap-northeast-1c
IPアドレスとかは適当に変えてます。
あえて別々のアベイラビリティゾーンに立ち上げてみました。
インストール
各ホストで以下を実行
$ sudo apt-get install unzip
$ wget http://emqtt.io/downloads/ubuntu
$ mv ubuntu emqttd-ubuntu64-0.8.6-beta-20150617.zip
$ unzip emqttd-ubuntu64-0.8.6-beta-20150617.zip
$ cd emqttd/
ノード名を設定
$ vi etc/vm.args
-name emqtt1@172.31.1.1
emqtt2ノードでは emqtt2@172.31.1.2
に設定。
起動
$ bin/emqttd console
バックグラウンドで起動する場合は
$ bin/emqttd start
クラスタを構成する
ここまでの時点では、各々のノードが別々に稼働している状態。
ここで2つのノードをつないでみる。
emqtt1側で以下を実行。
$ ./bin/emqttd_ctl cluster emqtt2@172.31.1.2
cluster with 'emqtt2@172.31.1.2' successfully.
これでクラスタが構成された。
テスト
Rubyクライアントでテストしてみます。
$ sudo gem install mqtt
emqtt1側につなぐsubscriber
require "mqtt"
MQTT::Client.connect("emqtt1.com") do |c|
c.subscribe("topic")
c.get do |topic, message|
puts "#{topic} : #{message}"
end
end
emqtt2側につなぐsubscriber。ホスト名が違うだけ :-/
require "mqtt"
MQTT::Client.connect("emqtt2.com") do |c|
c.subscribe("topic")
c.get do |topic, message|
puts "#{topic} : #{message}"
end
end
publisherはこんな。emqtt1側にpublishする。
require "mqtt"
MQTT::Client.connect("emqtt1.com") do |c|
c.publish("topic", "message")
end
さて、では2つのsubscriberを起動
$ ruby subscriber1.rb
$ ruby subscriber2.rb
publishしてみる
$ ruby publisher
結果
$ ruby subscriber1.rb
topic : message
$ ruby subscriber2.rb
topic : message
お!
タイムリミットが来たのでここまでで...w
感想
もうちょっとハマるかと思ったら、案外あっさりできた。一度emqttdを落として立ち上げ直してもちゃんとクラスタ状態を維持してた。
データの永続化はmnesiaを使ってるぽいっけど、まだコードは追っていないので違うのかも。
1年くらい前にちょろっと見たときに比べて、クラスタ化やHTTP/WebSocketなどかなり進化してた。
まだちょっと触ってみただけなので、もう少しいろいろやってみよう。