mqtt5で記載された内容です
はじめに
Shared subscription (共有サブスクリプション)について確かめたあと、実際に動作を確認します。
Shared Subscriptionについて
Shared Subscriptionとは、同じShareName/TopicFilterを購読している複数のクライアントが存在する場合に、Publish されたメッセージをそのグループ内のどれか 1クライアントだけに配信するための仕組みです。
使用方法は$share/{ShareName}/{filter}のトピックフィルターをサブスクライブします。そうするとfilterでサブスクライブを行ったことになります。Publish するときは、
$share/teamA/sensorsに対してPublishします。 Shared Subscription はあくまで「Subscribe 時に $share/... を使う」だけでPublishは通常通りsensorsに送るのみです。
例えば、下記のように、$share/teamA/sensorsは「teamAでsensorsにサブスクライブする」という意味になります。それに加えてクライアントSubB1/B2が$share/teamB/sensorsをサブスクライブしている場合はteamAとは独立にSubB1/B2のいずれかひとつがパケットを受信します。
以下は teamAとteamBの2つのShared Subscriptionグループが存在し、それぞれ複数のsubscriberがいる例です。
◎ ShareGroup: teamA
($share/teamA/sensors)
|
---------------------------------------------
| | |
◎ Sub A1 ◎ Sub A2 ◎ Sub A3
(client-id A1) (client-id A2) (client-id A3)
◎ ShareGroup: teamB
($share/teamB/sensors)
|
-----------------------
| |
◎ Sub B1 ◎ Sub B2
(client-id B1) (client-id B2)
逆にShareName が同じでも、トピックフィルタが違えば別物となります。Shared Subscriptionのグループは「ShareNameとTopic Filterの組み合わせ」で一意に決まるため、teamA/sensors と teamA/# は別グループになります。
【Shared Subscription グループ1】 (topic: sensors)
◎ ShareGroup: teamA
($share/teamA/sensors)
|
---------------------------------------------
| | |
◎ Sub S1 ◎ Sub S2 ◎ Sub S3
(client-id S1) (client-id S2) (client-id S3)
【Shared Subscription グループ2】 (topic: #)
◎ ShareGroup: teamA
($share/teamA/#)
|
-----------------------------
| | |
◎ Sub W1 ◎ Sub W2 ◎ Sub W3
(client-id W1) (client-id W2) (client-id W3)
この例では、
- topic: sensorsで送信→グループ1の誰かに送達 and グループ2の誰かに送達
- topic machineで送信→グループ2の誰かに送達
となります。
実験
共有サブスクリプションについて実際に確かめて見ましょう
1日目のGoコードを流用します。
クライアントコード
$share/teamA/sensorsをサブスクライブしているクライアントを3つ作成します。
実行すると、Hello iがpublish されるたびに、
3クライアント(A/B/C)のどれか1つだけが受信します。
import paho.mqtt.client as mqtt
import threading
import time
import os
BROKER = "localhost"
TOPIC = "sensors" # Publish 先
SHARE = "$share/teamA/sensors" # Shared Subscription
# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg):
name = userdata.get("name", "?")
print(f"[受信:{name}] {msg.topic}: {msg.payload.decode()}")
def subscriber(name):
# client_id をユニークにする
sub = mqtt.Client(
client_id=f"sub-{name}",
userdata={"name": name},
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
sub.on_message = on_message
sub.connect(BROKER, 1883, 60)
# Shared Subscription を購読
sub.subscribe(SHARE)
print(f"[開始] Subscriber {name} が {SHARE} を購読")
sub.loop_forever()
# =================
# Publisher
# =================
def publisher():
pub = mqtt.Client(client_id="pub-client", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
pub.connect(BROKER, 1883, 60)
for i in range(10):
msg = f"Hello {i}"
print(f"[送信] {msg}")
pub.publish(TOPIC, msg)
time.sleep(1)
pub.disconnect()
# ======================
# メイン処理
# ======================
if __name__ == "__main__":
# Subscriber 3つ起動
for name in ["A", "B", "C"]:
t = threading.Thread(target=subscriber, args=(name,), daemon=True)
t.start()
# 初期化待ち
time.sleep(2)
publisher()
time.sleep(2)
実行すると、下記のように
B->A->B->B->B->C...のようにPublishパケット1つは1つのクライアントのみに送達していることがわかります。
[~/qiita/client]$uv run shared.py
[開始] Subscriber C が $share/teamA/sensors を購読
[開始] Subscriber B が $share/teamA/sensors を購読
[開始] Subscriber A が $share/teamA/sensors を購読
[送信] Hello 0
[受信:B] sensors: Hello 0
[送信] Hello 1
[受信:A] sensors: Hello 1
[送信] Hello 2
[受信:B] sensors: Hello 2
[送信] Hello 3
[受信:B] sensors: Hello 3
[送信] Hello 4
[受信:C] sensors: Hello 4
[送信] Hello 5
[受信:C] sensors: Hello 5
[送信] Hello 6
[受信:B] sensors: Hello 6
[送信] Hello 7
[受信:A] sensors: Hello 7
[送信] Hello 8
[受信:B] sensors: Hello 8
[送信] Hello 9
[受信:C] sensors: Hello 9
[~/qiita/client]$
次にソースを変更し、teamA/sensorsとteamA/+の組を作って、別の組として構成されるので1組あたり1つずつ送達することを確認します。teamA/sensorsとteamA/+は ShareName が同じですがTopicFilter が違うため別のShared Subscriptionグループとして扱われます。よって、Publish 1回につき各グループから1クライアントずつが選ばれて受信することが確認できます。
import paho.mqtt.client as mqtt
import threading
import time
BROKER = "localhost"
TOPIC = "sensors" # publisher が送るトピック
# Shared Subscription Filters
SHARE_SENSORS = "$share/teamA/sensors"
SHARE_PLUS = "$share/teamA/+"
# ======================
# Subscriber(共通)
# ======================
def on_message(client, userdata, msg):
name = userdata["name"]
group = userdata["group"]
print(f"[受信:{group}:{name}] {msg.topic}: {msg.payload.decode()}")
def subscriber(name, filter_name, topic_filter):
sub = mqtt.Client(
client_id=f"sub-{filter_name}-{name}",
userdata={"name": name, "group": filter_name},
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
sub.on_message = on_message
sub.connect(BROKER, 1883, 60)
sub.subscribe(topic_filter)
print(f"[開始] Subscriber {name} ({filter_name}) が {topic_filter} を購読")
sub.loop_forever()
# =================
# Publisher
# =================
def publisher():
pub = mqtt.Client(client_id="pub-client", callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
pub.connect(BROKER, 1883, 60)
for i in range(5):
msg = f"Hello {i}"
print(f"[送信] {msg}")
pub.publish(TOPIC, msg)
time.sleep(1)
pub.disconnect()
# ======================
# メイン処理
# ======================
if __name__ == "__main__":
# group sensors
for n in ["A1", "A2"]:
threading.Thread(
target=subscriber,
args=(n, "teamA-sensors", SHARE_SENSORS),
daemon=True
).start()
# group '+'
for n in ["B1", "B2"]:
threading.Thread(
target=subscriber,
args=(n, "teamA-plus", SHARE_PLUS),
daemon=True
).start()
time.sleep(2)
publisher()
time.sleep(2)
teamAでもプラスの組とsensorsの組それぞれで1つずつ受信されていることが確認できます。
[~/qiita/client]$uv run shared_plus.py
[開始] Subscriber A2 (teamA-sensors) が $share/teamA/sensors を購読
[開始] Subscriber B1 (teamA-plus) が $share/teamA/+ を購読
[開始] Subscriber A1 (teamA-sensors) が $share/teamA/sensors を購読
[開始] Subscriber B2 (teamA-plus) が $share/teamA/+ を購読
[送信] Hello 0
[受信:teamA-plus:B1] sensors: Hello 0
[受信:teamA-sensors:A2] sensors: Hello 0
[送信] Hello 1
[受信:teamA-plus:B1] sensors: Hello 1
[受信:teamA-sensors:A2] sensors: Hello 1
[送信] Hello 2
[受信:teamA-plus:B1] sensors: Hello 2
[受信:teamA-sensors:A2] sensors: Hello 2
[送信] Hello 3
[受信:teamA-plus:B1] sensors: Hello 3
[受信:teamA-sensors:A2] sensors: Hello 3
[送信] Hello 4
[受信:teamA-sensors:A1] sensors: Hello 4
[受信:teamA-plus:B1] sensors: Hello 4
[~/qiita/client]$
まとめ
共有サブスクリプションについて確認しました。とあるトピックをグループを組んでサブスクライブする方法でした。
分散するアーキテクチャを前提に考える場合は使い所もありそうですね。例えばセンサーデータの処理を複数のワーカーに分散したり、MQTT を使った簡易的なワークキューを構成する際に有効そうです。
著作権情報
Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html