Subscription Identifierとは
- Subscription IdentifierとはSubscribe時にフィルタに対して指定することができるIDです
- この仕様を使って嬉しい点は受信したPUBLISHがどの購読条件にマッチした結果なのかをSubscriber側が知ることができる点です
- 複数のSubscribeにマッチした場合は複数のIDを入れることができます
- この機能が使用可能かどうかは、サーバーからのConnackの返り値のSubscription Identifiers Availableフラグで知ることができます。
実験
複数のID指定のSubscriberを立ててSubscribeします。
期待
仕様上は 1つのPUBLISHに複数のSubscription Identifierを含めることが可能なため、次のような挙動を期待しました。
[受信] test/topic: Hello 0 SID=[1, 2]
...
サーバコード
一日目のmochi-coのコードを利用します
クライアントコード
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
import threading
import time
BROKER = "localhost"
TOPIC = "test/topic"
# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg):
sid = getattr(msg.properties, "SubscriptionIdentifier", None)
print(f"[受信] {msg.topic}: {msg.payload.decode()} SID={sid}")
def subscriber():
sub = mqtt.Client(
client_id="sub-client",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
sub.on_message = on_message
sub.connect(BROKER, 1883, 60)
# ----------- SUBSCRIBE 1: test/# with SID = 1 -----------
props1 = Properties(PacketTypes.SUBSCRIBE)
props1.SubscriptionIdentifier = 1
sub.subscribe("test/#", properties=props1)
# ----------- SUBSCRIBE 2: test/topic with SID = 2 -----------
props2 = Properties(PacketTypes.SUBSCRIBE)
props2.SubscriptionIdentifier = 2
sub.subscribe("test/topic", properties=props2)
sub.loop_forever()
# ======================
# Publisher
# ======================
def publisher():
pub = mqtt.Client(
client_id="pub-client",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
pub.connect(BROKER, 1883, 60)
for i in range(3):
msg = f"Hello {i}"
pub.publish(TOPIC, msg)
print(f"[送信] {msg}")
time.sleep(1)
pub.disconnect()
# ======================
# メイン処理
# ======================
if __name__ == "__main__":
t = threading.Thread(target=subscriber, daemon=True)
t.start()
time.sleep(1)
publisher()
time.sleep(2)
[~/qiita/client]$uv run subid.py
[送信] Hello 0
[受信] test/topic: Hello 0 SID=[2]
[受信] test/topic: Hello 0 SID=[1]
[送信] Hello 1
[受信] test/topic: Hello 1 SID=[2]
[受信] test/topic: Hello 1 SID=[1]
[送信] Hello 2
[受信] test/topic: Hello 2 SID=[2]
[受信] test/topic: Hello 2 SID=[1]
結果はIDがひとつずつ書かれたパケットが、サブスクライブの分だけやってきています。
Subscription Identifierは「Broker → Client に配送される 各PUBLISHに含められるProperty」です。
「1回の配送につき、1つ以上のSIDを含められる」 というだけで「必ず1PUBLISHにまとめられる」わけではありません。結果より今回利用したmochi-coはSID=1 の PUBLISH と SID=2 の PUBLISH を別々に送る実装になっているということでした。
これは、HiveMQのブログにも下記のように書いてあります。
Overlapping Subscriptions
If a client has multiple subscriptions that match a single topic (e.g., subscribing to both machines/devices/# and machines/devices/environment/temperator), it may receive the same message multiple times. The specification does not guarantee the delivery order of these duplicate messages relative to one another.
1つのクライアントが、単一のトピックにマッチする複数のサブスクリプションを持っている場合
(例:machines/devices/# と machines/devices/environment/temperator の両方をサブスクライブしている場合)、
同一のメッセージを複数回受信する可能性があります。
これらの 重複して配送されるメッセージ同士の配送順序について、仕様では保証されていません。
出典:HiveMQ Blog
「Understanding MQTT Message Ordering」
https://www.hivemq.com/blog/understanding-mqtt-message-ordering/
まとめ
Subscription Identifierについて確認し、動作を検証しました。仕様にある程度の自由度を持たせているケースもあるという学びがありました。
著作権情報
Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html