はじめに
到達保証レベルのQoS1/QoS2について解説してパケットの流れをみてみます。
QoSについて
QoSは、メッセージ配送の到達保証レベルです。もちろん、科学は魔法ではありませんので、「欠損や重複を許容しなければ欠損や重複が生じない」というわけではありません。欠損や重複が物理的に起こることを防ぐのではなく、それらが起きた際に送信側/受信側がどう振る舞い、どこで検知できるかを規定しています。
QoS1
QoS1は必ず到達します。ただし重複はありえます。Publishに対してPubackを返すことが定められていて返ってこなければ、Publishは失敗で終わり、クライアントは再送しなければなりません。その際にクライアントはDupのフラグを1にします。
シーケンス図は下記
受け手がPublishで受信したにもかかわらず、何らかの事情(ネットワーク切断等)でPubackを返せなかった場合は受けては二重に同一内容のパケットを受け取ることになります。
DUPフラグがあるのだから重複が排除できるのではないか? と思うかもしれませんがMQTTはそれを行いません。実装をシンプルにするためです。
「4.3.2 QoS 1: At least once delivery」にある通り、
PUBACK パケットを送信した後は、同じパケット識別子を持つ新たな PUBLISH パケットが届いた場合、それが DUP フラグの設定に関係なく「新しい publish」であると扱わなければならない(MUST)。
と記載されているためです。DUPであろうがなかろうが同等のPublishとして扱うことが定められています。
そのようにすることで、Brokerに状態を持たせなくても良い実装にすることができます。ブローカー側の負荷も考慮し、かつ、そのほうがIoT向きのプロトコルで性能面でもメリットがありますね!
| 💬 筆者コメント |
|---|
| 筆者としては、結局QoS2の実装も同時に実装しなければならない場合には状態を持たせざるを得ず、実装難易度としては変わりませんが、規格側でそういうところまで配慮していただけるのはありがたいですね。主に性能面で嬉しいそうです。実際にQoS1/QoS0のみ対応しているというブローカーもあったと思います。 |
QoS2
QoS1が重複を許しているのに対して、QoS2は重複を許さず、必ずちょうど1回到達したことを確認してから初めて成功となるシーケンスです。パケットの流れとしては, Publish -> Pubrec -> Pubrel -> Pubcompの順にやりとりされます。
再送のPublish/Pubrelの再送が生じても同一IDは二重に処理しないようにすることで、「正確にたった1回」の送達ができるような設計になっています。キモはPublish/Pubrelの再送に対してACKの役割を持つPubrec/Pubcompは再送するけども, 処理としては、すでに処理していた場合は再処理しない。というところにあります。
規格部分を引用します。
Until it has received the corresponding PUBREL packet, the receiver MUST acknowledge any subsequent PUBLISH packet with the same Packet Identifier by sending a PUBREC. It MUST NOT cause duplicate messages to be delivered to any onward recipients
意訳
対応する PUBREL パケットを受信するまでは、
受信側は 同じパケット識別子(Packet Identifier)を持つ後続の PUBLISH パケットに対して、必ず PUBREC を送って応答しなければならない。
また、
後続の受信者(サブスクライバなど)に対して、重複したメッセージが配送される原因を作ってはならない。
After it has sent a PUBCOMP, the receiver MUST treat any subsequent PUBLISH packet that contains that Packet Identifier as being a new publication.
意訳
PUBCOMP を送った後は、その Packet Identifier を含む後続の PUBLISH パケットは まったく新しい PUBLISH として扱わなければならない。
実験
QoSをすべて動作させてパケットの中身をみてみましょう
import paho.mqtt.client as mqtt
import threading
import time
BROKER = "localhost"
# ★ 複数フィルター + QoS 別々
SUB_TOPICS = [
("test/topic", 0), # QoS 0
("home/living/temp", 1), # QoS 1
("device/+/status", 2), # QoS 2
]
# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg):
print(f"[受信] {msg.topic}: {msg.payload.decode()}")
def subscriber():
sub = mqtt.Client(
client_id="sub-client",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
protocol=mqtt.MQTTv311
)
sub.on_message = on_message
sub.connect(BROKER, 1883, 60)
# ★ 複数トピックを QoS 別々で購読
sub.subscribe(SUB_TOPICS)
sub.loop_forever()
# ======================
# Publisher(QoS 0 / 1 / 2 を順に送信)
# ======================
def publisher():
pub = mqtt.Client(
client_id="pub-client",
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
protocol=mqtt.MQTTv311
)
pub.connect(BROKER, 1883, 60)
pub.loop_start()
time.sleep(1)
# --- QoS 0 ---
print("[送信 QoS0] test/topic")
pub.publish("test/topic", "QoS0 message", qos=0)
time.sleep(1)
# --- QoS 1 ---
print("[送信 QoS1] home/living/temp")
pub.publish("home/living/temp", "QoS1 message", qos=1)
time.sleep(1)
# --- QoS 2 ---
print("[送信 QoS2] device/kitchen/status")
pub.publish("device/kitchen/status", "QoS2 message", qos=2)
time.sleep(1)
pub.loop_stop()
# ======================
# メイン処理
# ======================
if __name__ == "__main__":
# Subscriber
t1 = threading.Thread(target=subscriber, daemon=True)
t1.start()
# Publisher
t2 = threading.Thread(target=publisher, daemon=True)
t2.start()
# ループ継続
while True:
time.sleep(1)
[~/qiita/matching]$uv run qos.py
[送信 QoS0] test/topic
[受信] test/topic: QoS0 message
[送信 QoS1] home/living/temp
[受信] home/living/temp: QoS1 message
[送信 QoS2] device/kitchen/status
[受信] device/kitchen/status: QoS2 message
クライアント→サーバ、サーバ→クライアントの往復のパケットが1本のTCPに乗っているのでそれぞれ2つずつ見えています。
QoS0はPublishのみ

QoS2はPubrec→Pubrel→Pubcompが見えています。

まとめ
QoS1/2について解説し動作をみてみました。ブローカーにかかる負荷などを考えると大切な考えだと思いました。
著作権情報
Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html
