LoginSignup
41
64

More than 5 years have passed since last update.

MQTT入門(導入編)

Last updated at Posted at 2016-03-19

■ mosquittoとは

MQTTプロトコルを実装したオープンソースソフトウェア。
http://mosquitto.org/

他にもMQTTを実装したOSSとしては、Apache Apolloなどもある模様。

● MQTTとは

パブリッシュ/サブスクライブ型(出版-購読型)のモデルのプロトコル
https://sango.shiguredo.jp/mqtt

非同期メッセージングパラダイムの一種であり、メッセージの送信者(出版側)が特定の受信者(購読側)を想定せずにメッセージを送るようプログラム。
https://ja.wikipedia.org/wiki/%E5%87%BA%E7%89%88-%E8%B3%BC%E8%AA%AD%E5%9E%8B%E3%83%A2%E3%83%87%E3%83%AB

● 非同期のメッセージ

非同期メッセージングでは、結果を待たずにメッセージを次々と投げるので、メッセージを処理をするタイミングは一致しない。
http://ledsun.hatenablog.com/entry/2013/07/18/181044

■ mosquittoの導入

● mosquittoのインストール

下記のサイトが参考になる。
http://dev.classmethod.jp/cloud/setting-up-mosquitto-logging-on-amazon-linux/

[root@localhost tmp]# wget http://download.opensuse.org/repositories/home:/oojah:/mqtt/CentOS_CentOS-7/home:oojah:mqtt.repo -O "/etc/yum.repos.d/Mosquitto.repo"
[root@localhost tmp]# yum install mosquitto mosquitto-clients
[root@localhost tmp]# ls -litr /etc/yum.repos.d/Mosquitto.repo

[root@localhost tmp]# less /etc/init.d/mosquitto
#! /bin/sh
~~~~~
### BEGIN INIT INFO
# Provides: mosquitto
# Required-Start: $network $remote_fs
# Required-Stop: $network $remote_fs
# Default-Start: 3 5
# Default-Stop: 0 1 2 6
# Short-Description: Mosquitto MQTT broker
# Description: Mosquitto MQTT broker
### END INIT INFO

● 自動起動設定

○ デーモンに登録

[root@localhost tmp]# /sbin/chkconfig --add mosquitto
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto       0:off   1:off   2:off   3:on    4:off   5:on    6:off

○ 現在のランレベルで自動起動するように設定

[root@localhost tmp]# /sbin/chkconfig mosquitto on
[root@localhost tmp]# /sbin/chkconfig --list mosquitto
mosquitto       0:off   1:off   2:on    3:on    4:on    5:on    6:off

● 起動

[root@localhost tmp]# /etc/init.d/mosquitto start
Starting mosquitto (via systemctl):                        [  OK  ]

● monitによる監視

monit(監視ソフト)を導入して、mosquittoがプロセスダウンしたときに自動で立ち上げるように設定。
https://easyengine.io/tutorials/monitoring/monit/

[root@localhost tmp]# cd ~
[root@localhost tmp]# wget http://mmonit.com/monit/dist/binary/5.14/monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# tar zxvf monit-5.14-linux-x64.tar.gz
[root@localhost tmp]# cd monit-5.14/
[root@localhost tmp]# cp bin/monit /usr/bin/monit
[root@localhost tmp]# mkdir /etc/monit
[root@localhost tmp]# touch /etc/monit/monitrc
[root@localhost tmp]# chmod 0700 /etc/monit/monitrc 
[root@localhost tmp]# ln -s /etc/monit/monitrc /etc/monitrc
[root@localhost tmp]# wget https://gist.githubusercontent.com/rahul286/9975061/raw/1aa107e62ecaaa2dacfdb61a12f13efb6f15005b/monit -P /etc/init.d/
[root@localhost tmp]# chmod u+x /etc/init.d/monit
[root@localhost tmp]# echo "START=yes" > /etc/default/monit
[root@localhost tmp]# monit -t
[root@localhost tmp]# /sbin/chkconfig  --add monit
[root@localhost tmp]# /sbin/chkconfig  monit on
[root@localhost tmp]# /sbin/chkconfig --list monit
[root@localhost tmp]# view /etc/monit.d/mosquitto.conf
check process mosquitto with pidfile /var/run/mosquitto.pid
start = "/etc/init.d/mosquitto start"
stop = "/etc/init.d/mosquitto stop"

● ログ出力設定

[root@localhost tmp]# sudo mkdir /var/log/mosquitto
[root@localhost tmp]# sudo chown mosquitto /var/log/mosquitto

[root@localhost tmp]# view /etc/mosquitto/mosquitto.conf
合計 0
pid_file /var/run/mosquitto.pid

persistence true
persistence_location /var/lib/mosquitto/

log_dest syslog
log_dest file /var/log/mosquitto/mosquitto.log
#log_type debug
log_type error
log_type warning
log_type notice
log_type information
#log_type none
log_type subscribe
log_type unsubscribe
#log_type websockets
#log_type all

connection_messages true

log_timestamp true

include_dir /etc/mosquitto/conf.d

[root@localhost tmp]# /etc/init.d/mosquitto reload

■ 動作確認

● パブリッシュとサブスクライブのテスト

ターミナルを2つ立ち上げて、それぞれ以下のコマンドを実施。

sensors/temperatureというトピックから32という値をqos1で自分自身にパブリッシュして、サブスクライブする。

# サブスクライブ
[root@localhost tmp]# mosquitto_sub -t sensors/temperature -q 

# パブリッシュ
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -m 32 -q 1

# 改行コードを含んだメッセージを送信したい場合、-fオプションを使って、ファイルの内容をパブリッシュすることで、改行コードを含んだメッセージをパブリッシュできる
[root@localhost tmp]# mosquitto_pub -t sensors/temperature -f /var/tmp/test.txt

http://mosquitto.org/man/mosquitto_sub-1.html
http://mosquitto.org/man/mosquitto_pub-1.html

■ Pythonを使ってMQTTを操作

下記が参考になる。
https://librabuch.jp/2015/09/mosquiito_paho_python_mqtt/

● pipの導入

pythonのパッケージ管理を行うpipをインストール。
pipを使うpythonインストールがワン・コマンドで行えるため、簡単になる。

[root@localhost opt]# curl -kL https://bootstrap.pypa.io/get-pip.py | python curl -kL https://bootstrap.pypa.io/get-pip.py | python
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100 1487k  100 1487k    0     0  1631k      0 --:--:-- --:--:-- --:--:-- 1630k
Collecting pip
  Downloading pip-8.1.1-py2.py3-none-any.whl (1.2MB)
    100% |████████████████████████████████| 1.2MB 333kB/s 
Collecting wheel
  Downloading wheel-0.29.0-py2.py3-none-any.whl (66kB)
    100% |████████████████████████████████| 71kB 1.9MB/s 
Installing collected packages: pip, wheel
Successfully installed pip-8.1.1 wheel-0.29.0

● pahoのインストール

pahoはMQTTの機能を提供するEclipseのライブラリ。

https://eclipse.org/paho/
https://pypi.python.org/pypi/paho-mqtt/1.1

[root@localhost opt]# pip install paho-mqtt
Collecting paho-mqtt
  Downloading paho-mqtt-1.1.tar.gz (41kB)
    100% |████████████████████████████████| 51kB 3.4MB/s 
Building wheels for collected packages: paho-mqtt
  Running setup.py bdist_wheel for paho-mqtt ... done
  Stored in directory: /root/.cache/pip/wheels/97/db/5f/1ddca8ee2f9b58f9bb68208323bd39bb0b177f32f434aa4b95
Successfully built paho-mqtt
Installing collected packages: paho-mqtt
Successfully installed paho-mqtt-1.1

[root@localhost opt]# ls -litr /usr/lib/python2.7/site-packages/paho/mqtt
合計 196
135479923 -rw-r--r--. 1 root root  8713  3月 19 23:15 publish.py
135479924 -rw-r--r--. 1 root root 91388  3月 19 23:15 client.py
135479925 -rw-r--r--. 1 root root    20  3月 19 23:15 __init__.py
135479926 -rw-r--r--. 1 root root   170  3月 19 23:15 __init__.pyc
135479927 -rw-r--r--. 1 root root 71288  3月 19 23:15 client.pyc
135479928 -rw-r--r--. 1 root root  8332  3月 19 23:15 publish.pyc

● 動作確認

同様にパブリッシャーとサブスクライバーを準備。

  • publisher.py

from time import sleep
import paho.mqtt.client as mqtt

HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'
MESSAGE = 'test message'

PUBLISH_NUMBER = 5
SLEEP_TIME = 5

def publish_many_times(client, topic='topic/default', message='default', number=1, time=1, print_flag=False):

    for i in range(number):
        client.publish(topic, message)
        if print_flag == True:
            print (topic + ' ' + message)
        sleep(time)

    client.disconnect()

if __name__ == '__main__':
    client = mqtt.Client(protocol=mqtt.MQTTv311)

    print "publish start " + str(type(client))

    client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)

    publish_many_times(client,TOPIC, MESSAGE, PUBLISH_NUMBER, SLEEP_TIME)                                      
  • subscriber.py
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt

HOST = '127.0.0.1'
PORT = 1883
KEEP_ALIVE = 60
TOPIC = 'test_topic/test1'

"""
接続を試みたときに実行
def on_connect(client, userdata, flags, respons_code):

* client
Clientクラスのインスタンス

* userdata
任意のタイプのデータで新たなClientクラスののインスタンスを作成するときに>設定することができる

* flags
応答フラグが含まれる辞書
クリーンセッションを0に設定しているユーザに有効。
セッションがまだ存在しているかどうかを判定する。
クリーンセッションが0のときは以前に接続したユーザに再接続する。

0 : セッションが存在していない
1 : セッションが存在している

* respons_code
レスポンスコードは接続が成功しているかどうかを示す。
0: 接続成功
1: 接続失敗 - incorrect protocol version
2: 接続失敗 - invalid client identifier
3: 接続失敗 - server unavailable
4: 接続失敗 - bad username or password
5: 接続失敗 - not authorised
"""
def on_connect(client, userdata, flags, respons_code):
    print('status {0}'.format(respons_code))
    client.subscribe(client.topic)

"""
def on_message(client, userdata, message):
topicを受信したときに実行する
"""
def on_message(client, userdata, message):
    print(message.topic + ' ' + str(message.payload))

if __name__ == '__main__':

    client = mqtt.Client(protocol=mqtt.MQTTv311)
    client.topic = TOPIC

    client.on_connect = on_connect
    client.on_message = on_message

    client.connect(HOST, port=PORT, keepalive=KEEP_ALIVE)

    # ループ
    client.loop_forever()                                  
[root@localhost tmp]# python publisher.py 
[root@localhost tmp]# python subscriber.py 
status 0
test_topic/test1 test message
test_topic/test1 test message
test_topic/test1 test message

client.on_connect()とclient.on_message()はコールバック関数で、
client.loop_forever()の中のループで、ハンドラーに呼び出され、実行される。

コールバックについてはここを参照。

def _handle_connack(self):
    if self._strict_protocol:
        if self._in_packet['remaining_length'] != 2:
            return MQTT_ERR_PROTOCOL

    if len(self._in_packet['packet']) != 2:
        return MQTT_ERR_PROTOCOL

    (flags, result) = struct.unpack("!BB", self._in_packet['packet'])
    if result == CONNACK_REFUSED_PROTOCOL_VERSION and self._protocol == MQTTv311:
        self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+"), attempting downgrade to MQTT v3.1.")
        # Downgrade to MQTT v3.1
        self._protocol = MQTTv31
        return self.reconnect()

    if result == 0:
        self._state = mqtt_cs_connected

    self._easy_log(MQTT_LOG_DEBUG, "Received CONNACK ("+str(flags)+", "+str(result)+")")
    self._callback_mutex.acquire()
    if self.on_connect:
        self._in_callback = True

        if sys.version_info[0] < 3:
            argcount = self.on_connect.func_code.co_argcount
        else:
            argcount = self.on_connect.__code__.co_argcount

        if argcount == 3:
            self.on_connect(self, self._userdata, result)
        else:
            flags_dict = dict()
            flags_dict['session present'] = flags & 0x01
            self.on_connect(self, self._userdata, flags_dict, result)
        self._in_callback = False
    self._callback_mutex.release()
    if result == 0:
        rc = 0
        self._out_message_mutex.acquire()
        for m in self._out_messages:
            m.timestamp = time.time()
            if m.state == mqtt_ms_queued:
                self.loop_write() # Process outgoing messages that have just been queued up
                self._out_message_mutex.release()
                return MQTT_ERR_SUCCESS

            if m.qos == 0:
                self._in_callback = True # Don't call loop_write after _send_publish()
                rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                self._in_callback = False
                if rc != 0:
                    self._out_message_mutex.release()
                    return rc
            elif m.qos == 1:
                if m.state == mqtt_ms_publish:
                    self._inflight_messages = self._inflight_messages + 1
                    m.state = mqtt_ms_wait_for_puback
                    self._in_callback = True # Don't call loop_write after _send_publish()
                    rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                    self._in_callback = False
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
            elif m.qos == 2:
                if m.state == mqtt_ms_publish:
                    self._inflight_messages = self._inflight_messages + 1
                    m.state = mqtt_ms_wait_for_pubrec
                    self._in_callback = True # Don't call loop_write after _send_publish()
                    rc = self._send_publish(m.mid, m.topic, m.payload, m.qos, m.retain, m.dup)
                    self._in_callback = False
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
                elif m.state == mqtt_ms_resend_pubrel:
                    self._inflight_messages = self._inflight_messages + 1
                    m.state = mqtt_ms_wait_for_pubcomp
                    self._in_callback = True # Don't call loop_write after _send_pubrel()
                    rc = self._send_pubrel(m.mid, m.dup)
                    self._in_callback = False
                    if rc != 0:
                        self._out_message_mutex.release()
                        return rc
            self.loop_write() # Process outgoing messages that have just been queued up
        self._out_message_mutex.release()
        return rc
    elif result > 0 and result < 6:
        return MQTT_ERR_CONN_REFUSED
    else:
        return MQTT_ERR_PROTOCOL
def _handle_on_message(self, message):
    self._callback_mutex.acquire()
    matched = False
    for t in self.on_message_filtered:
        if topic_matches_sub(t[0], message.topic):
            self._in_callback = True
            t[1](self, self._userdata, message)
            self._in_callback = False
            matched = True

    if matched == False and self.on_message:
        self._in_callback = True
        self.on_message(self, self._userdata, message)
        self._in_callback = False

    self._callback_mutex.release()

■ 参考サイト

41
64
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
41
64