■ mosquittoとは
MQTTプロトコルを実装したオープンソースソフトウェア。
http://mosquitto.org/
他にもMQTTを実装したOSSとしては、Apache Apolloなどもある模様。
● MQTTとは
パブリッシュ/サブスクライブ型(出版-購読型)のモデルのプロトコル
https://sango.shiguredo.jp/mqtt
非同期メッセージングパラダイムの一種であり、メッセージの送信者(出版側)が特定の受信者(購読側)を想定せずにメッセージを送るようプログラム。
https://ja.wikipedia.org/wiki/%E5%87%BA%E7%89%88-%E8%B3%BC%E8%AA%AD%E5%9E%8B%E3%83%A2%E3%83%87%E3%83%AB
● 非同期のメッセージ
非同期メッセージングでは、結果を待たずにメッセージを次々と投げるので、メッセージを処理をするタイミングは一致しない。
http://ledsun.hatenablog.com/entry/2013/07/18/181044
■ mosquittoの導入
● mosquittoのインストール
下記のサイトが参考になる。
http://dev.classmethod.jp/cloud/setting-up-mosquitto-logging-on-amazon-linux/
[root@localhost tmp]# wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo -O "/etc/yum.repos.d/Mosquitto.repo"
[root@localhost tmp]# yum install mosquitto mosquitto-clients
[root@localhost tmp]# ls -litr /etc/yum.repos.d/Mosquitto.repo
[root@localhost tmp]# less /etc/init.d/mosquitto
#! /bin/sh
~~~~~
### BEGIN INIT INFO
# Provides: mosquitto
# Required-Start: $network $remote_fs
# Required-Stop: $network $remote_fs
# Default-Start: 3 5
# Default-Stop: 0 1 2 6
# Short-Description: Mosquitto MQTT broker
# Description: Mosquitto MQTT broker
### END INIT INFO
● 自動起動設定
○ デーモンに登録
[root@localhost tmp]# /sbin/chkconfig --add mosquitto
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto 0:off 1:off 2:off 3:on 4:off 5:on 6:off
○ 現在のランレベルで自動起動するように設定
[root@localhost tmp]# /sbin/chkconfig mosquitto on
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto 0:off 1:off 2:on 3:on 4:on 5:on 6:off
● 起動
[root@localhost tmp]# /etc/init.d/mosquitto start
Starting mosquitto (via systemctl): [ OK ]
● monitによる監視
monit(監視ソフト)を導入して、mosquittoがプロセスダウンしたときに自動で立ち上げるように設定。
https://easyengine.io/tutorials/monitoring/monit/
[root@localhost tmp]# cd ~
[root@localhost tmp]# wget http://mmonit.com/monit/dist/binary/5.14/monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# tar zxvf monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# cd monit-5.14/
[root@localhost tmp]# cp bin/monit /usr/bin/monit
[root@localhost tmp]# mkdir /etc/monit
[root@localhost tmp]# touch /etc/monit/monitrc
[root@localhost tmp]# chmod 0700 /etc/monit/monitrc
[root@localhost tmp]# ln -s /etc/monit/monitrc /etc/monitrc
[root@localhost tmp]# wget https://gist.githubusercontent.com/rahul286/9975061/raw/1aa107e62ecaaa2dacfdb61a12f13efb6f15005b/monit -P /etc/init.d/
[root@localhost tmp]# chmod u+x /etc/init.d/monit
[root@localhost tmp]# echo "START=yes" > /etc/default/monit
[root@localhost tmp]# monit -t
[root@localhost tmp]# /sbin/chkconfig --add monit
[root@localhost tmp]# /sbin/chkconfig monit on
[root@localhost tmp]# /sbin/chkconfig --list monit
[root@localhost tmp]# view /etc/monit.d/mosquitto.conf
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"
● ログ出力設定
[root@localhost tmp]# sudo mkdir /var/log/mosquitto
[root@localhost tmp]# sudo chown mosquitto /var/log/mosquitto
[root@localhost tmp]# view /etc/mosquitto/mosquitto.conf
合計 0
pid_file /var/run/mosquitto.pid
persistence true
persistence_location /var/lib/mosquitto/
log_dest syslog
log_dest file /var/log/mosquitto/mosquitto.log
#log_type debug
log_type error
log_type warning
log_type notice
log_type information
#log_type none
log_type subscribe
log_type unsubscribe
#log_type websockets
#log_type all
connection_messages true
log_timestamp true
include_dir /etc/mosquitto/conf.d
[root@localhost tmp]# /etc/init.d/mosquitto reload
■ 動作確認
● パブリッシュとサブスクライブのテスト
ターミナルを2つ立ち上げて、それぞれ以下のコマンドを実施。
sensors/temperatureというトピックから32という値をqos1で自分自身にパブリッシュして、サブスクライブする。
# サブスクライブ
[root@localhost tmp]# mosquitto_sub -t sensors/temperature -q
# パブリッシュ
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -m 32 -q 1
# 改行コードを含んだメッセージを送信したい場合、-fオプションを使って、ファイルの内容をパブリッシュすることで、改行コードを含んだメッセージをパブリッシュできる
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -f /var/tmp/test.txt
http://mosquitto.org/man/mosquitto_sub-1.html
http://mosquitto.org/man/mosquitto_pub-1.html
■ Pythonを使ってMQTTを操作
下記が参考になる。
https://librabuch.jp/2015/09/mosquiito_paho_python_mqtt/
● pipの導入
pythonのパッケージ管理を行うpipをインストール。
pipを使うpythonインストールがワン・コマンドで行えるため、簡単になる。
[root@localhost opt]# curl -kL https://bootstrap.pypa.io/get-pip.py | python curl -kL https://bootstrap.pypa.io/get-pip.py | python
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1487k 100 1487k 0 0 1631k 0 --:--:-- --:--:-- --:--:-- 1630k
Collecting pip
Downloading pip-8.1.1-py2.py3-none-any.whl (1.2MB)
100% |████████████████████████████████| 1.2MB 333kB/s
Collecting wheel
Downloading wheel-0.29.0-py2.py3-none-any.whl (66kB)
100% |████████████████████████████████| 71kB 1.9MB/s
Installing collected packages: pip, wheel
Successfully installed pip-8.1.1 wheel-0.29.0
● pahoのインストール
pahoはMQTTの機能を提供するEclipseのライブラリ。
https://eclipse.org/paho/
https://pypi.python.org/pypi/paho-mqtt/1.1
[root@localhost opt]# pip install paho-mqtt
Collecting paho-mqtt
Downloading paho-mqtt-1.1.tar.gz (41kB)
100% |████████████████████████████████| 51kB 3.4MB/s
Building wheels for collected packages: paho-mqtt
Running setup.py bdist_wheel for paho-mqtt ... done
Stored in directory: /root/.cache/pip/wheels/97/db/5f/1ddca8ee2f9b58f9bb68208323bd39bb0b177f32f434aa4b95
Successfully built paho-mqtt
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-1.1
[root@localhost opt]# ls -litr /usr/lib/python2.7/site-packages/paho/mqtt
合計 196
135479923 -rw-r--r--. 1 root root 8713 3月 19 23:15 publish.py
135479924 -rw-r--r--. 1 root root 91388 3月 19 23:15 client.py
135479925 -rw-r--r--. 1 root root 20 3月 19 23:15 __init__.py
135479926 -rw-r--r--. 1 root root 170 3月 19 23:15 __init__.pyc
135479927 -rw-r--r--. 1 root root 71288 3月 19 23:15 client.pyc
135479928 -rw-r--r--. 1 root root 8332 3月 19 23:15 publish.pyc
● 動作確認
同様にパブリッシャーとサブスクライバーを準備。
- publisher.py
from time import sleep
import paho.mqtt.client as mqtt
HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'
MESSAGE = 'test message'
PUBLISH_NUMBER = 5
SLEEP_TIME = 5
def publish_many_times(client, topic='topic/default', message='default', number=1, time=1, print_flag=False):
for i in range(number):
client.publish(topic, message)
if print_flag == True:
print (topic + ' ' + message)
sleep(time)
client.disconnect()
if __name__ == '__main__':
client = mqtt.Client(protocol=mqtt.MQTTv311)
print "publish start " + str(type(client))
client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)
publish_many_times(client,TOPIC, MESSAGE, PUBLISH_NUMBER, SLEEP_TIME)
- subscriber.py
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt
HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'
"""
接続を試みたときに実行
def on_connect(client, userdata, flags, respons_code):
* client
Clientクラスのインスタンス
* userdata
任意のタイプのデータで新たなClientクラスののインスタンスを作成するときに>設定することができる
* flags
応答フラグが含まれる辞書
クリーンセッションを0に設定しているユーザに有効。
セッションがまだ存在しているかどうかを判定する。
クリーンセッションが0のときは以前に接続したユーザに再接続する。
0 : セッションが存在していない
1 : セッションが存在している
* respons_code
レスポンスコードは接続が成功しているかどうかを示す。
0: 接続成功
1: 接続失敗 - incorrect protocol version
2: 接続失敗 - invalid client identifier
3: 接続失敗 - server unavailable
4: 接続失敗 - bad username or password
5: 接続失敗 - not authorised
"""
def on_connect(client, userdata, flags, respons_code):
print('status {0}'.format(respons_code))
client.subscribe(client.topic)
"""
def on_message(client, userdata, message):
topicを受信したときに実行する
"""
def on_message(client, userdata, message):
print(message.topic + ' ' + str(message.payload))
if __name__ == '__main__':
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.topic = TOPIC
client.on_connect = on_connect
client.on_message = on_message
client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)
# ループ
client.loop_forever()
[root@localhost tmp]# python publisher.py
[root@localhost tmp]# python subscriber.py
status 0
test_topic/test1 test message
test_topic/test1 test message
test_topic/test1 test message
client.on_connect()とclient.on_message()はコールバック関数で、
client.loop_forever()の中のループで、ハンドラーに呼び出され、実行される。
コールバックについてはここを参照。
def _handle_connack(self):
if self._strict_protocol:
if self._in_packet['remaining_length'] != 2:
return MQTT_ERR_PROTOCOL
if len(self._in_packet['packet']) != 2:
return MQTT_ERR_PROTOCOL
(flags, result) = struct.unpack("!BB", self._in_packet['packet'])
if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311:
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.")
# Downgrade to MQTT v3.1
self._protocol = MQTTv31
return self.reconnect()
if result == 0:
self._state = mqtt_cs_connected
self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")")
self._callback_mutex.acquire()
if self.on_connect:
self._in_callback = True
if sys.version_info[0] < 3:
argcount = self.on_connect.func_code.co_argcount
else:
argcount = self.on_connect.__code__.co_argcount
if argcount == 3:
self.on_connect(self, self._userdata, result)
else:
flags_dict = dict()
flags_dict['session present'] = flags & 0x01
self.on_connect(self, self._userdata, flags_dict, result)
self._in_callback = False
self._callback_mutex.release()
if result == 0:
rc = 0
self._out_message_mutex.acquire()
for m in self._out_messages:
m.timestamp = time.time()
if m.state == mqtt_ms_queued:
self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release()
return MQTT_ERR_SUCCESS
if m.qos == 0:
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.qos == 1:
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_puback
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.qos == 2:
if m.state == mqtt_ms_publish:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubrec
self._in_callback = True # Don't call loop_write after _send_publish()
rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
elif m.state == mqtt_ms_resend_pubrel:
self._inflight_messages = self._inflight_messages + 1
m.state = mqtt_ms_wait_for_pubcomp
self._in_callback = True # Don't call loop_write after _send_pubrel()
rc = self._send_pubrel(m.mid, m.dup)
self._in_callback = False
if rc != 0:
self._out_message_mutex.release()
return rc
self.loop_write() # Process outgoing messages that have just been queued up
self._out_message_mutex.release()
return rc
elif result > 0 and result < 6:
return MQTT_ERR_CONN_REFUSED
else:
return MQTT_ERR_PROTOCOL
def _handle_on_message(self, message):
self._callback_mutex.acquire()
matched = False
for t in self.on_message_filtered:
if topic_matches_sub(t[0], message.topic):
self._in_callback = True
t[1](self, self._userdata, message)
self._in_callback = False
matched = True
if matched == False and self.on_message:
self._in_callback = True
self.on_message(self, self._userdata, message)
self._in_callback = False
self._callback_mutex.release()