MQTT5 User Propetiesについて
MQTT5からはユーザが自由に使用できる領域が追加されています。HTTPで例えるとHeaderのような扱いです。User PropertyはHeader同様にNameとValueのペアになっています。ペアになっていることからKeyとValueのように使われることを想定していると思われます、が重複する名前をNameをつけることも可能です。
The User Property is allowed to appear multiple times to represent multiple name, value pairs. The same name is allowed to appear more than once.
MQTTの定義自体にはUser Property具体的な意味は定義されていません。これはPublishのPayloadと同じようにMQTTの上位のアプリケーション層が自由に使うことが想定されています。
User Propertyが使用できるパケットはPingreq/Pingres以外のすべてです。
- Connect
- Connack
- Publish
- Puback
- Pubrec
- Pubrel
- Pubcomp
- Subscribe
- Suback
- Unsub
- Unsuback
- Disconnect
- Auth
活用例
アプリケーション層なので、使い方は自由ですが、活用例としては下記のような例が考えられたりします。
- OpentelemetryなどTraceID
When a request exists in a system, the trace context is added to the metadata of the used transport protocol. For example, the HTTP headers, Kafka record headers, or MQTT user properties.
(https://docs.hivemq.com/hivemq-distributed-tracing-extension/latest/distributed-tracing-terminology.html)
- ルーティングの制御
User Properties を使用して、アプリケーション レベルのルーティングを行うこともできます。
(https://www.emqx.com/ja/blog/mqtt5-user-properties)
- 属性保持
- IoT機器のSchemaバージョン管理、ACL
実験
実際にPublishにUserpropertyを入れて送信し、返ってくるパケットのUserpropertyを確認してみましょう。
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
import threading
import time
BROKER = "localhost"
TOPIC = "test/topic"
# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg):
print(f"[受信] {msg.topic}: {msg.payload.decode()}")
# --- User Property の表示 ---
props = msg.properties
if props is not None and props.UserProperty is not None:
print("[受信] UserProperty:")
for k, v in props.UserProperty:
print(f" {k} = {v}")
else:
print("[受信] UserProperty: なし")
def subscriber():
sub = mqtt.Client(
client_id="sub-client",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
sub.on_message = on_message
sub.connect(BROKER, 1883, 60)
sub.subscribe(TOPIC)
sub.loop_forever()
# =================
# Publisher
# =================
def publisher():
pub = mqtt.Client(
client_id="pub-client",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
pub.connect(BROKER, 1883, 60)
for i in range(5):
msg = f"Hello {i}"
# --- User Property を設定 ---
props = Properties(PacketTypes.PUBLISH)
props.UserProperty = [
("trace_id", f"abc-{i}"),
("schema", "v1")
]
pub.publish(TOPIC, msg, properties=props)
print(f"[送信] {msg} (UserProperty 付き)")
time.sleep(1)
pub.disconnect()
# ======================
# メイン処理
# ======================
if __name__ == "__main__":
# 受信スレッドを起動
t = threading.Thread(target=subscriber, daemon=True)
t.start()
# 少し待ってから送信
time.sleep(1)
publisher()
# 終了待ち
time.sleep(2)
[~/qiita/client]$uv run userpropery.py
[送信] Hello 0 (UserProperty 付き)
[受信] test/topic: Hello 0
[受信] UserProperty:
trace_id = abc-0
schema = v1
[送信] Hello 1 (UserProperty 付き)
[受信] test/topic: Hello 1
[受信] UserProperty:
trace_id = abc-1
schema = v1
[送信] Hello 2 (UserProperty 付き)
[受信] test/topic: Hello 2
[受信] UserProperty:
trace_id = abc-2
schema = v1
[送信] Hello 3 (UserProperty 付き)
[受信] test/topic: Hello 3
[受信] UserProperty:
trace_id = abc-3
schema = v1
[送信] Hello 4 (UserProperty 付き)
[受信] test/topic: Hello 4
[受信] UserProperty:
trace_id = abc-4
schema = v1
ただのnameとvalueの配列ですので、下記のようにnameやvalueが重複しても問題ありません。
props.UserProperty = [
("trace_id", f"abc-{i}"),
("schema", "v1"),
("schema", "v1-002")
]
まとめ
UserPropertiesについて確認しました
著作権情報
Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html