67
Help us understand the problem. What are the problem?

More than 5 years have passed since last update.

posted at

updated at

MQTT のQoSについて検証(MQTTv3.1)d

1. MQTTのQoS

 MQTTのQoSは3つのレベルが存在します。それぞれ「QoS 0(At most once)」、「QoS 1(At least once)」、「QoS 2(Exactly once」です。これらはクライアントとサーバ間での到達保証をするQoS設定です。このクライアントとサーバ間の到達保証とは「パブリッシャー」と「サブスクライバー」間での到達保証ではありません。クライアントとサーバは「パブリッシャーとブローカー」または「ブローカーとサブスクライバー」に対応しています。
 QoSは「パブリッシャー」がパブリッシュするメッセージ毎に設定を行います。またサブスクライバーがサブスクライブする際に、トピック毎に設定することが可能です。「パブリッシャー」または「サブスクライバー」で指定されたQoSはそれぞれブローカーとの間で非同期に機能します。また、パブリッシャーで「QoS2」が指定された場合でもサブスクライバーで「QoS0」または「QoS1」が指定されていた場合、「ブローカー」から「サブスクライバー」に送られるメッセージは「サブスクライバー」で指定された最大のQoSにダウングレードされます。
 このようにMQTTのQoSは「パブリッシャーとブローカー」または「ブローカーとサブスクライバー」間のみで有効となります。クライアント、サーバの役割はそれぞれで変わります。(ブローカーはクライアントサーバのどちらの役割にもなります)
このトピックではこれらMQTTのQoS動作をサンプルコードとパケットキャプチャを用いて確認して行きます。

2. 検証の準備

2.1 検証方法

検証ではPythonのpaho-mqttモジュールを利用し、「パブリッシャー」と「サブスクライバー」のサンプルコードを作成します。MQTT ブローカーとしてmosquittoをインストールしたVMを用意します。また、サブスクライバー×2をホストOSで立ち上げます。その後、パケットキャプチャを実行した状態でパブリッシャーを実行します。

2.2 サンプルコード

  • パブリッシャーのサンプルコード
publisher.py
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt

_host = "192.168.0.2" # ブローカーの ホストを指定する。
_qos = 0 # QoSの値を0-2で指定する。
topic = "mqtt/topic/test"

def on_publish(client, userdata, mid):
    client.disconnect()

def main():
    mqtt_client = mqtt.Client(protocol=mqtt.MQTTv31)
    mqtt_client.on_publish = on_publish
    mqtt_client.connect(host=_host, port=1883, keepalive=60)
    mqtt_client.publish(topic,"Hello MQTT!",_qos) # メッセージを指定のQoSでパブリッシュ
    mqtt_client.loop_forever()

if __name__ == '__main__':
    main()

  • サブスクライバー
subscriber.py
# -*- coding: utf-8 -*-
import paho.mqtt.client as mqtt

_host = "192.168.0.2"
_qos = 0
topic = "mqtt/topic/test"

def on_connect(client, userdata, flag, rc):
    client.subscribe(topic, qos = _qos)

def on_message(client, userdata, message):
    print message.topic + " " + str(message.payload)

def main():
    mqtt_client = mqtt.Client(protocol = mqtt.MQTTv31)
    mqtt_client.on_connect = on_connect
    mqtt_client.on_message = on_message
    mqtt_client.connect(host=_host, port=1883, keepalive=60)
    mqtt_client.loop_forever()


if __name__ == '__main__':
    main() 

3.検証結果

3.1 検証した項目

  1. パブリッシャーとサブスクライバーそれぞれがQoS0
  2. パブリッシャーとサブスクライバーそれぞれがQoS1
  3. パブリッシャーとサブスクライバーそれぞれがQoS2
  4. パブリッシャーがQoS2, サブスクライバーがQoS0
  5. パブリッシャーがQoS0, サブスクライバーがQoS2

3.2 結果

検証1 (パブリッシャーとサブスクライバーそれぞれがQoS0)

QoS0でのパケットキャプチャです。MQTTのメッセージのみを切り出しています。
192.168.211.1がパブリッシャーとサブスクライバー、192.168.221.191がブローカーです。
QoS0Flow.png
1. パブリッシャーからブローカーへの接続要求(CONNECT)
赤枠で囲った4バイトがMQTTメッセージのヘッダになります。先頭1バイト目の上位4bitがメッセージの種類を表しており、「1」はCONNECTメッセージを表します。(追記ヘッダは先頭2byteの間違いです。赤枠引き間違えます)
QoS0CONNECT.png
2. パブリッシャーからブローカーへのパブリッシュ(PUBLISH)
先頭1バイト目の上位4bitが「3」です。PUBLISHを表します。また下位4bitの2bit,3bit目でQoSを指定します。この場合下位4bitが0ですので、必然的にQoS0です。
QoS0PUBLISH1.png

  1. ブロカーから接続要求への返答CONNACKが返されています。
    同様に先頭が2です。これはCONNECTに対応したブローカーからの返答CONNACKと定義されています。
    QoS0CONNACK.png

  2. その後ブローカーからサブスクライバーへのパブリッシュが行われています。

QoS0の場合、ブローカーにパブリッシュされた後に、サブスクライバーにそれぞれメッセージが配信されていることが確認できる。
また、ブローカーがパブリッシャーの接続要求に対する返答である「CONNACKメッセージ」を返す前にメッセージを配信しています。

検証2 (パブリッシャーとサブスクライバーそれぞれがQoS1)

同様にQoS1の場合です。
QoS1Flow.png

  1. Time 0.000513でパブリッシャーからブローカーにPUBLISHメッセージが送られています。
    先頭バイトが32です。これはQoS1のPUBLISHメッセージを表します。2は下位4ビットの2bit目のフラグが立っている。QoS1の状態です。
    QoS1Publish1.png

  2. Time0.000602でブローカーからパブリッシャーにPUBACKメッセージが送られています。
    QoS1ではPUBLISHメッセージを受け取るとこのPUBACKメッセージを返すことになっています。
    先頭が4です。これはPUBACKメッセージを表します。
    QoS1PUBACK.png

  3. Time0.000800,Time0.000909でブローカーからサブスクライバーにPUBLISHメッセージが送られます。

  4. Time0.000960, Time0.002361で3に対するPUBACKが返されています。

それぞれQoS1の場合、パブリッシャーがパブリッシュを行った後にPUBACKを返している。また、サブスクライバーもブローカーからメッセージを受け取った後にPUBACKを返しています。

検証3 (パブリッシャーとサブスクライバーそれぞれがQoS2)

続いてQoS2です。
QoS2Flow.png

  1. Time 0.000551でパブリッシャーからブローカーにパブリッシュされます。
    今度は先頭バイトの下位4bitが4です。3bit目が1なのでQoS2になります。
    QoS2PUBLISH.png

  2. Time 0.000590ではブローカーからパブリッシャーにPUBRECメッセージが送られます。
    5がPUBRECメッセージです。
    QoS2PUBREC.png

  3. Time 0.000783ではパブリッシャーからブローカーにPUBRELメッセージが送られています。
    他のメッセージと同様にPUBRELです。
    QoS2PUBREL.png

  4. Time 0.000823ではブローカーからパブリッシャーにPUBCOMPメッセージが送られます。
    QoS2PUBCOMP.png

これでQoS2の通信は完了です。このあと、ブローカーとサブスクライバー間でも同様にQoS2のフローがやり取りされています。

パブリッシャーとブローカーの間でPUBLISH,PUBREC,PUBREL,PUBCOMPのメッセージがそれぞれやり取りされていることが確認できます。また、ブローカーとサブスクライバーの間でも同様のフローです。さらに、パブリッシャーとブローカのQoS2のフロー完了後にサブスクライバーとブローカーのQoS2のやり取りが始まっり、それぞれが非同期で行われていることが確認できます。

検証4 (パブリッシャーがQoS2, サブスクライバーがQoS0)

ではパブリッシャーとサブスクライバーが別々のQoSを設定した場合、どのようになるでしょうか?
パブッリシャーはQoS2でパブリッシュし、サブスクライバーはQoS0でサブスクライブしています。
20.png

パブリッシャーとブローカー間はQoS2で通信していますが、ブローカーとサブスクライバー間はQoS0のフローで通信していることがわかります。(Time0.001216とTime0.001282)
サブスクライバーとの通信ではQoS0にダウングレードしていることが分かります。

検証5 (パブリッシャーがQoS0, サブスクライバーがQoS2)

続いて、パブリッシャーがQoS0, サブスクライバーがQoS2です。
02.png

QoS0のフローで通信を行っていることがわかります。ブローカーとサブスクライバー間はQoS2を指定しているはずですが、QoS0の通信フローに従っています。元のメッセージのQoSが要求するQoSよりも低い場合はもとのメッセージのQoSが優先されるようです。

まとめ

  1. MQTTのQoSはクライアントとサーバ間のみの到達保証を行う
  2. クライアントとサーバの役割は「パブリッシャーとブローカー」、「ブローカーとサブスクライバー」で変わる。
  3. 「パブリッシャーとブローカー」「ブローカーとサブスクライバー」それぞれの通信は非同期で通信する。
  4. 同じTopicに対してパブリッシャーとサブスクライバーが別々のQoSを指定した場合、QoSの最低値はパブリッシャーの設定値に依存し、QoSの最高値はサブスクライバーでの設定値に依存する。
  5. パブリッシャーのQoSに従う場合、サブスクライバーのQoSは常に2を指定すれば良さそう。

宣伝: 絵で見て分かるIoT/センサの仕組みと活用 絶賛発売中です!
http://www.amazon.co.jp/絵で見てわかるIoT-センサの仕組みと活用-株式会社NTTデータ/dp/4798140627/ref=sr_1_1?ie=UTF8&qid=1428737994&sr=8-1&keywords=IoT

参考文献

http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
https://pypi.python.org/pypi/paho-mqtt

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
67
Help us understand the problem. What are the problem?