はじめに
トピックにメッセージを保持できるRetain機能について解説して動作させてみます。
Retainについて
Retainとは、クライアントがトピックに1つだけ、将来Subscribeするクライアントのために保持できる機能のことです。そのトピック名をキーに最新1件だけをブローカーが保持することができます。そのためにはPublish時にfixed header内のRetainのフラグを立てる必要があります。
サーバはRetainのメッセージのトピックがサブスクライブされたら保持しているメッセージを送信します。その際にはRetainフラグを立てて送信されます。
受信時は保持されたメッセージの場合のみRetainが1になり、(元のRetainのフラグによらず)通常のリアルタイム配信ではRetainは0になります。
Payloadが空でRetain=1のPublishを送るとそのトピックのRetainメッセージは削除されます。
注意点として
- 最新1件のみ
- QoSも保持
- ワイルドカードの購読でも該当のメッセージをすべて受信
一般的な流れをシーケンス図にします。
実験
import paho.mqtt.client as mqtt
import time
import threading
BROKER = "localhost"
TOPIC = "retain/test"
# =========================
# Publisher
# =========================
def publisher():
pub = mqtt.Client(
client_id="pub-retain",
protocol=mqtt.MQTTv311,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
pub.connect(BROKER, 1883, 60)
print("[PUB] publish retained message -> broker stores it")
pub.publish(TOPIC, "RETAINED_PAYLOAD", retain=True)
print("[PUB] finished")
pub.disconnect()
# =========================
# Subscriber
# =========================
def on_message(client, userdata, msg):
print(f"[SUB] recv topic={msg.topic} payload={msg.payload.decode()} retain={msg.retain}")
def on_connect(client, userdata, flags, reason_code, properties):
print("[SUB] connected, subscribing")
client.subscribe(TOPIC)
def subscriber():
sub = mqtt.Client(
client_id="sub-retain",
protocol=mqtt.MQTTv311,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
sub.on_connect = on_connect
sub.on_message = on_message
sub.connect(BROKER, 1883, 60)
sub.loop_forever()
# =========================
# Main
# =========================
if __name__ == "__main__":
publisher()
time.sleep(3)
# subscribe thread
t = threading.Thread(target=subscriber, daemon=True)
t.start()
time.sleep(2)
print("done")
- ConnectしてRetainフラグをtrueにして
Publishします。 その接続は閉じます。
- 別のクライアントが接続してSubscribeすると、Subackが返ってすぐ、Retainされていたパケットがパブリッシュされます。こちらも
フラグはRetain=trueになっています。
Retainの消去
PublishのPayload部分のトピック名のあとに何も存在しない場合、そのトピック名に保存されているRetainは消去されるのでSubscribeをしても送付されなくなります。

まとめ
Retainについて確認し、動作を確かめました。トピックに状態を保持できると考えるといろいろ使い所はありそうですね!


