0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[MQTT] shared subscriptionsについて

Last updated at Posted at 2025-12-10

mqtt5で記載された内容です

はじめに

Shared subscription (共有サブスクリプション)について確かめたあと、実際に動作を確認します。

Shared Subscriptionについて

Shared Subscriptionとは、同じShareName/TopicFilterを購読している複数のクライアントが存在する場合に、Publish されたメッセージをそのグループ内のどれか 1クライアントだけに配信するための仕組みです。
 使用方法は$share/{ShareName}/{filter}のトピックフィルターをサブスクライブします。そうするとfilterでサブスクライブを行ったことになります。Publish するときは、
$share/teamA/sensorsに対してPublishします。 Shared Subscription はあくまで「Subscribe 時に $share/... を使う」だけでPublishは通常通りsensorsに送るのみです。
 例えば、下記のように、$share/teamA/sensorsは「teamAでsensorsにサブスクライブする」という意味になります。それに加えてクライアントSubB1/B2が$share/teamB/sensorsをサブスクライブしている場合はteamAとは独立にSubB1/B2のいずれかひとつがパケットを受信します。
 以下は teamAとteamBの2つのShared Subscriptionグループが存在し、それぞれ複数のsubscriberがいる例です。

                   ◎ ShareGroup: teamA
                      ($share/teamA/sensors)
                          |
          ---------------------------------------------
          |                    |                      |
        ◎ Sub A1             ◎ Sub A2              ◎ Sub A3
      (client-id A1)      (client-id A2)        (client-id A3)



                   ◎ ShareGroup: teamB
                      ($share/teamB/sensors)
                          |
                  -----------------------
                  |                     |
                ◎ Sub B1             ◎ Sub B2
            (client-id B1)       (client-id B2)


逆にShareName が同じでも、トピックフィルタが違えば別物となります。Shared Subscriptionのグループは「ShareNameとTopic Filterの組み合わせ」で一意に決まるため、teamA/sensors と teamA/# は別グループになります。

【Shared Subscription グループ1】  (topic: sensors)

                   ◎ ShareGroup: teamA
                      ($share/teamA/sensors)
                          |
          ---------------------------------------------
          |                    |                      |
        ◎ Sub S1             ◎ Sub S2              ◎ Sub S3
      (client-id S1)      (client-id S2)        (client-id S3)



【Shared Subscription グループ2】  (topic: #)

                   ◎ ShareGroup: teamA
                      ($share/teamA/#)
                          |
                  -----------------------------
                  |             |             |
                ◎ Sub W1     ◎ Sub W2      ◎ Sub W3
            (client-id W1) (client-id W2) (client-id W3)

この例では、

  • topic: sensorsで送信→グループ1の誰かに送達 and グループ2の誰かに送達
  • topic machineで送信→グループ2の誰かに送達
    となります。

実験

共有サブスクリプションについて実際に確かめて見ましょう
1日目のGoコードを流用します。

サーバのGoコード

クライアントコード

$share/teamA/sensorsをサブスクライブしているクライアントを3つ作成します。

実行すると、Hello iがpublish されるたびに、
3クライアント(A/B/C)のどれか1つだけが受信します。

shared.py
import paho.mqtt.client as mqtt
import threading
import time
import os

BROKER = "localhost"
TOPIC = "sensors"                     # Publish 先
SHARE = "$share/teamA/sensors"        # Shared Subscription

# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg):
    name = userdata.get("name", "?")
    print(f"[受信:{name}] {msg.topic}: {msg.payload.decode()}")

def subscriber(name):
    # client_id をユニークにする
    sub = mqtt.Client(
        client_id=f"sub-{name}",
        userdata={"name": name},
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2
    )
    sub.on_message = on_message

    sub.connect(BROKER, 1883, 60)

    # Shared Subscription を購読
    sub.subscribe(SHARE)
    print(f"[開始] Subscriber {name}{SHARE} を購読")

    sub.loop_forever()

# =================
# Publisher
# =================
def publisher():
    pub = mqtt.Client(client_id="pub-client", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
    pub.connect(BROKER, 1883, 60)

    for i in range(10):
        msg = f"Hello {i}"
        print(f"[送信] {msg}")
        pub.publish(TOPIC, msg)
        time.sleep(1)

    pub.disconnect()

# ======================
# メイン処理
# ======================
if __name__ == "__main__":

    # Subscriber 3つ起動
    for name in ["A", "B", "C"]:
        t = threading.Thread(target=subscriber, args=(name,), daemon=True)
        t.start()

    # 初期化待ち
    time.sleep(2)

    publisher()

    time.sleep(2)

実行すると、下記のように
B->A->B->B->B->C...のようにPublishパケット1つは1つのクライアントのみに送達していることがわかります。

[~/qiita/client]$uv run shared.py 
[開始] Subscriber C が $share/teamA/sensors を購読
[開始] Subscriber B が $share/teamA/sensors を購読
[開始] Subscriber A が $share/teamA/sensors を購読
[送信] Hello 0
[受信:B] sensors: Hello 0
[送信] Hello 1
[受信:A] sensors: Hello 1
[送信] Hello 2
[受信:B] sensors: Hello 2
[送信] Hello 3
[受信:B] sensors: Hello 3
[送信] Hello 4
[受信:C] sensors: Hello 4
[送信] Hello 5
[受信:C] sensors: Hello 5
[送信] Hello 6
[受信:B] sensors: Hello 6
[送信] Hello 7
[受信:A] sensors: Hello 7
[送信] Hello 8
[受信:B] sensors: Hello 8
[送信] Hello 9
[受信:C] sensors: Hello 9
[~/qiita/client]$

次にソースを変更し、teamA/sensorsteamA/+の組を作って、別の組として構成されるので1組あたり1つずつ送達することを確認します。teamA/sensorsteamA/+は ShareName が同じですがTopicFilter が違うため別のShared Subscriptionグループとして扱われます。よって、Publish 1回につき各グループから1クライアントずつが選ばれて受信することが確認できます。

shared_hash.py
import paho.mqtt.client as mqtt
import threading
import time

BROKER = "localhost"

TOPIC = "sensors"                  # publisher が送るトピック

# Shared Subscription Filters
SHARE_SENSORS = "$share/teamA/sensors"
SHARE_PLUS    = "$share/teamA/+"

# ======================
# Subscriber(共通)
# ======================
def on_message(client, userdata, msg):
    name = userdata["name"]
    group = userdata["group"]
    print(f"[受信:{group}:{name}] {msg.topic}: {msg.payload.decode()}")

def subscriber(name, filter_name, topic_filter):
    sub = mqtt.Client(
        client_id=f"sub-{filter_name}-{name}",
        userdata={"name": name, "group": filter_name},
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2
    )

    sub.on_message = on_message
    sub.connect(BROKER, 1883, 60)

    sub.subscribe(topic_filter)
    print(f"[開始] Subscriber {name} ({filter_name}) が {topic_filter} を購読")

    sub.loop_forever()

# =================
# Publisher
# =================
def publisher():
    pub = mqtt.Client(client_id="pub-client", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
    pub.connect(BROKER, 1883, 60)

    for i in range(5):
        msg = f"Hello {i}"
        print(f"[送信] {msg}")
        pub.publish(TOPIC, msg)
        time.sleep(1)

    pub.disconnect()

# ======================
# メイン処理
# ======================
if __name__ == "__main__":

    # group sensors
    for n in ["A1", "A2"]:
        threading.Thread(
            target=subscriber,
            args=(n, "teamA-sensors", SHARE_SENSORS),
            daemon=True
        ).start()

    # group '+'
    for n in ["B1", "B2"]:
        threading.Thread(
            target=subscriber,
            args=(n, "teamA-plus", SHARE_PLUS),
            daemon=True
        ).start()

    time.sleep(2)
    publisher()
    time.sleep(2)

teamAでもプラスの組とsensorsの組それぞれで1つずつ受信されていることが確認できます。

[~/qiita/client]$uv run shared_plus.py 
[開始] Subscriber A2 (teamA-sensors) が $share/teamA/sensors を購読
[開始] Subscriber B1 (teamA-plus) が $share/teamA/+ を購読
[開始] Subscriber A1 (teamA-sensors) が $share/teamA/sensors を購読
[開始] Subscriber B2 (teamA-plus) が $share/teamA/+ を購読
[送信] Hello 0
[受信:teamA-plus:B1] sensors: Hello 0
[受信:teamA-sensors:A2] sensors: Hello 0
[送信] Hello 1
[受信:teamA-plus:B1] sensors: Hello 1
[受信:teamA-sensors:A2] sensors: Hello 1
[送信] Hello 2
[受信:teamA-plus:B1] sensors: Hello 2
[受信:teamA-sensors:A2] sensors: Hello 2
[送信] Hello 3
[受信:teamA-plus:B1] sensors: Hello 3
[受信:teamA-sensors:A2] sensors: Hello 3
[送信] Hello 4
[受信:teamA-sensors:A1] sensors: Hello 4
[受信:teamA-plus:B1] sensors: Hello 4
[~/qiita/client]$

まとめ

共有サブスクリプションについて確認しました。とあるトピックをグループを組んでサブスクライブする方法でした。
 分散するアーキテクチャを前提に考える場合は使い所もありそうですね。例えばセンサーデータの処理を複数のワーカーに分散したり、MQTT を使った簡易的なワークキューを構成する際に有効そうです。

著作権情報

Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?