mqtt advent calendar 4日目
はじめに
Publishはクライアントがサーバ側、サーバ側を通じて他のクライアントに伝えたいメッセージの伝達のために使用されるMQTTでの基本的なパケットです。本記事では
HeaderとPayloadの内容についてざっと確認した後にTopicとTopicFilterのマッチ方法について解説します。
構造
Fixed header
PublishはControl Packet typeが0b0011(0x03)のメッセージです。
Fixed headerは
- Packet Type
- QoS
- Retain
- Dup
があります。これらのフラグは別途後日別記事で詳しく説明します。
Variable header
Variable Headerにはtopic nameとPacket IDを含みます。
topic name
トピック名は「どのSubscriberに届けるか」を決める宛先です。別のクライアントがサーバ側に保存してあるサブスクライブのtopic filterとのマッチングプロセスを通じて突き合わせて配送先が決定されます。
topic nameとtopic filterのマッチングプロセス
topic nameはPublish時に設定され、topic filterはサブスクライブ時に設定します。
- ともにUTF-8バイト列です。null文字(U+0000)は使用できません
- 長さの制限は65535バイト
- Case sensitiveですので、フィルターがtennisでトピックがTENNISの場合はマッチしません
- 空白を含むことができます
特殊記号として,
-
/: セパレーター -
#: マルチレベルワイルドカード -
+: シングルレベルワイルドカード
があり特別な意味を持つ文字です。
セパレーターはtopic name, topic filterともに使用されますが、ワイルドカードはtopic filterのみに使用されます。
セパレータ /
セパレータはトピック, トピックフィルターの階層構造を表すのに使用されます。
この階層構造はワイルドカードと組み合わせて使用すると効果を発揮します。
sport/tennis/player1/score/wimbledon
マルチレベルワイルドカード #
フィルターの末尾に使われる文字です。
トピックフィルターとトピックのマッチングを階層の左から行っていった際に、
フィルターにマルチレベルワイルドカードは出現した場合、それより末尾のマッチングは打ち切って問答無用で一致する強力な文字です。かならず単独で末尾に使用されます。つまり、
OK: sport/tennis/#
NG: sport/tennis#
NG: sport/#/score
フィルター例: sport/tennis/player1/#
matchするトピック:
sport/tennis/player1
sport/tennis/player1/ranking
sport/tennis/player1/score/wimbledon
matchしないトピック:
sport/football/player1
シングルレベルワイルドカード +
トピックフィルターとトピックのマッチングを階層の左から行っていった際に、
フィルターにシングルレベルワイルドカードは出現した場合、その階層は一致するとみなします。どの階層でも使用できますが、使用する場合は他の文字と組み合わせることはできません。つまり、tennis/player+ のようにはできません。
フィルター例: sport/+/player1
matchするトピック:
sport/tennis/player1
sport/football/player1
matchしないトピック:
sport/tennis/special/player1
sport/tennis/player1/score
フィルター例: +/+
matchするトピック /finance
matchしないトピック /finance/bank
組み合わせ
+/tennis/#のように両方のワイルドカードを組み合わせることもできます。
実験
試して見ましょう
1日目のPython/Goコードを流用します。
import paho.mqtt.client as mqtt
import threading
import time
BROKER = "localhost"
# 試したいトピックフィルター(1つ)
TOPIC_FILTER = "sensor/+/temp"
# 実際に publish するトピック(複数)
TEST_TOPICS = [
"sensor/room1/temp",
"sensor/room2/humidity",
"sensor/x/temp",
"sensor/abc/temp/extra",
"device/a/status",
"a/b/c",
]
# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg, properties=None, reason_code=None):
print(f"[受信] topic={msg.topic:25s} payload={msg.payload.decode()}")
def subscriber():
sub = mqtt.Client(
client_id="sub-client",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
sub.on_message = on_message
sub.connect(BROKER, 1883, 60)
print(f"[Subscriber] subscribe → {TOPIC_FILTER}")
sub.subscribe(TOPIC_FILTER)
sub.loop_forever()
# ======================
# Publisher
# ======================
def publisher():
pub = mqtt.Client(
client_id="pub-client",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
pub.connect(BROKER, 1883, 60)
for topic in TEST_TOPICS:
msg = f"MSG({topic})"
print(f"[送信] {topic:25s} → {msg}")
pub.publish(topic, msg)
time.sleep(0.4)
pub.disconnect()
# ======================
# メイン処理
# ======================
if __name__ == "__main__":
threading.Thread(target=subscriber, daemon=True).start()
time.sleep(1)
publisher()
time.sleep(2)
システムトピック
上記に加えて、$SYSで始まるトピックはシステム用に割り当てられたトピックで、これを使ったクライアント間の通信はできません。また、Read-onlyの情報です。
ここで受信される内容は実装によってことなりますが主にmosquittoの内容を参考に実装されることが多い印象です。
https://mosquitto.org/man/mosquitto-8.html > Broker Status
受信するコードは
import paho.mqtt.client as mqtt
import threading
import time
BROKER = "localhost"
TOPIC = "test/topic"
SYS_TOPIC = "$SYS/#" # ← 追加
# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg):
print(f"[受信] {msg.topic}: {msg.payload.decode()}")
def subscriber():
sub = mqtt.Client(client_id="sub-client", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
sub.on_message = on_message
sub.connect(BROKER, 1883, 60)
# 通常のトピック
sub.subscribe(TOPIC)
# ★ $SYS トピックも購読
sub.subscribe(SYS_TOPIC)
sub.loop_forever()
# =================
# Publisher
# =================
def publisher():
pub = mqtt.Client(client_id="pub-client", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
pub.connect(BROKER, 1883, 60)
for i in range(5):
msg = f"Hello {i}"
pub.publish(TOPIC, msg)
print(f"[送信] {msg}")
time.sleep(1)
pub.disconnect()
# ======================
# メイン処理
# ======================
if __name__ == "__main__":
t = threading.Thread(target=subscriber, daemon=True)
t.start()
time.sleep(1)
publisher()
time.sleep(5)
~/qiita/matching]$uv run system.py
[受信] $SYS/broker/version: 2.7.9
[受信] $SYS/broker/packets/received: 11
[受信] $SYS/broker/packets/sent: 6
[受信] $SYS/broker/started: 1763175442
[受信] $SYS/broker/uptime: 291
[受信] $SYS/broker/messages/sent: 2
まとめ
Publishパケットとトピックのマッチングを見てみました