Edited at

eMQTTでMQTTブローカーのクラスタを組んだメモ

More than 3 years have passed since last update.


MQTT概要

概要

HTTPからMQTTへ - IBMが提唱するモノとモノがつながる時代に最適化したプロトコル&アプライアンス

わかりやすい説明

MQTTについてのまとめ

ガチの人のgistはこちら

MQTT とはなんだったのか そこはかとなく書くよん。

いわゆるPUB/SUB型のメッセージプロトコルだけど、RabitMQとかとは違ってインターネットや4G/LTE網など信頼性の低いネットワークを考慮しているのが特徴...と認識してます。

使用シーンとしては、多数のセンサーからのデータを一旦MQTTブローカーに集めて、そこからsubscribeしてロギングするとか、あるセンサーのpublishした計測値を別の機器でsubscribeしてその値を元に制御するとか。


eMQTT概要

Elrang/OTPで実装されたオープンソースのMQTTブローカー。ライセンスはMIT。

これを書いている時点での特徴的な機能は以下のとおり。


  • MQTT V3.1.1サポート

  • HTTP/WebSocketサポート

  • 複数ノードでクラスタを組める。

  • プラグインによる機能拡張

  • QoS 0,1,2をサポート

  • retainサポート

  • willメッセージサポート

  • SSLサポート

  • クライアントIDとIPアドレスによる認証

  • ユーザー名:パスワードによる認証

公表されてるベンチマーク結果では


  • 20万コネクション

  • 2000接続オープン/秒

  • 20万topic

  • 2万IN/OUTメッセージ/秒

を8GBメモリのマシンで稼働できるとか。

ちなみに同作者によるErlang/OTP実装のMQTTクライアントemqttcもあります。なぜか1年ぐらい前に僕もコミットしてます :-)


インスタンスの用意

とりあえずEC2で2つほどインスタンスを用意します。


ホスト名: emqtt1


  • * ローカルIPアドレス: 172.31.1.1

  • パブリックDNS: emqtt1.com

  • アベイラビリティゾーン: ap-northeast-1a



ホスト名: emqtt2


  • ローカルIPアドレス: 172.31.1.2

  • パブリックDNS: emqtt2.com

  • アベイラビリティゾーン: ap-northeast-1c

IPアドレスとかは適当に変えてます。

あえて別々のアベイラビリティゾーンに立ち上げてみました。


インストール

各ホストで以下を実行

$ sudo apt-get install unzip

$ wget http://emqtt.io/downloads/ubuntu
$ mv ubuntu emqttd-ubuntu64-0.8.6-beta-20150617.zip
$ unzip emqttd-ubuntu64-0.8.6-beta-20150617.zip
$ cd emqttd/

ノード名を設定

$ vi etc/vm.args


etc/vm.args

-name emqtt1@172.31.1.1


emqtt2ノードでは emqtt2@172.31.1.2 に設定。


起動

$ bin/emqttd console

バックグラウンドで起動する場合は

$ bin/emqttd start


クラスタを構成する

ここまでの時点では、各々のノードが別々に稼働している状態。

ここで2つのノードをつないでみる。

emqtt1側で以下を実行。

$ ./bin/emqttd_ctl cluster emqtt2@172.31.1.2

cluster with 'emqtt2@172.31.1.2' successfully.

これでクラスタが構成された。


テスト

Rubyクライアントでテストしてみます。

$ sudo gem install mqtt

emqtt1側につなぐsubscriber


subscriber1.rb

require "mqtt"

MQTT::Client.connect("emqtt1.com") do |c|
c.subscribe("topic")
c.get do |topic, message|
puts "#{topic} : #{message}"
end
end


emqtt2側につなぐsubscriber。ホスト名が違うだけ :-/


subscriber2.rb

require "mqtt"

MQTT::Client.connect("emqtt2.com") do |c|
c.subscribe("topic")
c.get do |topic, message|
puts "#{topic} : #{message}"
end
end


publisherはこんな。emqtt1側にpublishする。


publisher.rb

require "mqtt"

MQTT::Client.connect("emqtt1.com") do |c|
c.publish("topic", "message")
end


さて、では2つのsubscriberを起動

$ ruby subscriber1.rb

$ ruby subscriber2.rb

publishしてみる

$ ruby publisher

結果

$ ruby subscriber1.rb

topic : message

$ ruby subscriber2.rb

topic : message

お!

タイムリミットが来たのでここまでで...w


感想

もうちょっとハマるかと思ったら、案外あっさりできた。一度emqttdを落として立ち上げ直してもちゃんとクラスタ状態を維持してた。

データの永続化はmnesiaを使ってるぽいっけど、まだコードは追っていないので違うのかも。

1年くらい前にちょろっと見たときに比べて、クラスタ化やHTTP/WebSocketなどかなり進化してた。

まだちょっと触ってみただけなので、もう少しいろいろやってみよう。