1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[MQTT] Publishについて / Topic

Last updated at Posted at 2025-12-03

mqtt advent calendar 4日目

はじめに

Publishはクライアントがサーバ側、サーバ側を通じて他のクライアントに伝えたいメッセージの伝達のために使用されるMQTTでの基本的なパケットです。本記事では
HeaderとPayloadの内容についてざっと確認した後にTopicとTopicFilterのマッチ方法について解説します。

構造

Fixed header

PublishはControl Packet typeが0b0011(0x03)のメッセージです。
Fixed headerは

  • Packet Type
  • QoS
  • Retain
  • Dup

があります。これらのフラグは別途後日別記事で詳しく説明します。

Variable header

Variable Headerにはtopic nameとPacket IDを含みます。

topic name

トピック名は「どのSubscriberに届けるか」を決める宛先です。別のクライアントがサーバ側に保存してあるサブスクライブのtopic filterとのマッチングプロセスを通じて突き合わせて配送先が決定されます。

topic nameとtopic filterのマッチングプロセス

topic nameはPublish時に設定され、topic filterはサブスクライブ時に設定します。

  • ともにUTF-8バイト列です。null文字(U+0000)は使用できません
  • 長さの制限は65535バイト
  • Case sensitiveですので、フィルターがtennisでトピックがTENNISの場合はマッチしません
  • 空白を含むことができます

特殊記号として,

  • / : セパレーター
  • # : マルチレベルワイルドカード
  • + : シングルレベルワイルドカード
    があり特別な意味を持つ文字です。

セパレーターはtopic name, topic filterともに使用されますが、ワイルドカードはtopic filterのみに使用されます。

セパレータ /

セパレータはトピック, トピックフィルターの階層構造を表すのに使用されます。
この階層構造はワイルドカードと組み合わせて使用すると効果を発揮します。

せーパレータを使った例
sport/tennis/player1/score/wimbledon

マルチレベルワイルドカード #

フィルターの末尾に使われる文字です。
トピックフィルターとトピックのマッチングを階層の左から行っていった際に、
フィルターにマルチレベルワイルドカードは出現した場合、それより末尾のマッチングは打ち切って問答無用で一致する強力な文字です。かならず単独で末尾に使用されます。つまり、

OK: sport/tennis/#  
NG: sport/tennis#  
NG: sport/#/score
マッチング例
フィルター例: sport/tennis/player1/#

matchするトピック: 
sport/tennis/player1
sport/tennis/player1/ranking
sport/tennis/player1/score/wimbledon

matchしないトピック:
sport/football/player1

シングルレベルワイルドカード +

トピックフィルターとトピックのマッチングを階層の左から行っていった際に、
フィルターにシングルレベルワイルドカードは出現した場合、その階層は一致するとみなします。どの階層でも使用できますが、使用する場合は他の文字と組み合わせることはできません。つまり、tennis/player+ のようにはできません。

フィルター例: sport/+/player1
matchするトピック: 
sport/tennis/player1
sport/football/player1

matchしないトピック:
sport/tennis/special/player1
sport/tennis/player1/score

フィルター例: +/+
matchするトピック /finance
matchしないトピック /finance/bank

組み合わせ

+/tennis/#のように両方のワイルドカードを組み合わせることもできます。

実験

試して見ましょう
1日目のPython/Goコードを流用します。

import paho.mqtt.client as mqtt
import threading
import time

BROKER = "localhost"

# 試したいトピックフィルター(1つ)
TOPIC_FILTER = "sensor/+/temp"

# 実際に publish するトピック(複数)
TEST_TOPICS = [
    "sensor/room1/temp",
    "sensor/room2/humidity",
    "sensor/x/temp",
    "sensor/abc/temp/extra",
    "device/a/status",
    "a/b/c",
]

# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg, properties=None, reason_code=None):
    print(f"[受信] topic={msg.topic:25s} payload={msg.payload.decode()}")

def subscriber():
    sub = mqtt.Client(
        client_id="sub-client",
        protocol=mqtt.MQTTv5,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    )
    sub.on_message = on_message

    sub.connect(BROKER, 1883, 60)

    print(f"[Subscriber] subscribe → {TOPIC_FILTER}")
    sub.subscribe(TOPIC_FILTER)

    sub.loop_forever()

# ======================
# Publisher
# ======================
def publisher():
    pub = mqtt.Client(
        client_id="pub-client",
        protocol=mqtt.MQTTv5,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    )
    pub.connect(BROKER, 1883, 60)

    for topic in TEST_TOPICS:
        msg = f"MSG({topic})"
        print(f"[送信] {topic:25s}{msg}")
        pub.publish(topic, msg)
        time.sleep(0.4)

    pub.disconnect()


# ======================
# メイン処理
# ======================
if __name__ == "__main__":
    threading.Thread(target=subscriber, daemon=True).start()

    time.sleep(1)
    publisher()

    time.sleep(2)

システムトピック

上記に加えて、$SYSで始まるトピックはシステム用に割り当てられたトピックで、これを使ったクライアント間の通信はできません。また、Read-onlyの情報です。
ここで受信される内容は実装によってことなりますが主にmosquittoの内容を参考に実装されることが多い印象です。
https://mosquitto.org/man/mosquitto-8.html > Broker Status

受信するコードは

import paho.mqtt.client as mqtt
import threading
import time

BROKER = "localhost"
TOPIC = "test/topic"
SYS_TOPIC = "$SYS/#"   # ← 追加

# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg):
    print(f"[受信] {msg.topic}: {msg.payload.decode()}")

def subscriber():
    sub = mqtt.Client(client_id="sub-client", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
    sub.on_message = on_message
    sub.connect(BROKER, 1883, 60)

    # 通常のトピック
    sub.subscribe(TOPIC)

    # ★ $SYS トピックも購読
    sub.subscribe(SYS_TOPIC)

    sub.loop_forever()

# =================
# Publisher
# =================
def publisher():
    pub = mqtt.Client(client_id="pub-client", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
    pub.connect(BROKER, 1883, 60)
    for i in range(5):
        msg = f"Hello {i}"
        pub.publish(TOPIC, msg)
        print(f"[送信] {msg}")
        time.sleep(1)
    pub.disconnect()

# ======================
# メイン処理
# ======================
if __name__ == "__main__":
    t = threading.Thread(target=subscriber, daemon=True)
    t.start()

    time.sleep(1)
    publisher()

    time.sleep(5)

~/qiita/matching]$uv run system.py 
[受信] $SYS/broker/version: 2.7.9
[受信] $SYS/broker/packets/received: 11
[受信] $SYS/broker/packets/sent: 6
[受信] $SYS/broker/started: 1763175442
[受信] $SYS/broker/uptime: 291
[受信] $SYS/broker/messages/sent: 2

まとめ

Publishパケットとトピックのマッチングを見てみました

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?