0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

[MQTT] Retainについて

Last updated at Posted at 2025-12-06

はじめに

トピックにメッセージを保持できるRetain機能について解説して動作させてみます。

Retainについて

 Retainとは、クライアントがトピックに1つだけ、将来Subscribeするクライアントのために保持できる機能のことです。そのトピック名をキーに最新1件だけをブローカーが保持することができます。そのためにはPublish時にfixed header内のRetainのフラグを立てる必要があります。
サーバはRetainのメッセージのトピックがサブスクライブされたら保持しているメッセージを送信します。その際にはRetainフラグを立てて送信されます。
 受信時は保持されたメッセージの場合のみRetainが1になり、(元のRetainのフラグによらず)通常のリアルタイム配信ではRetainは0になります。
Payloadが空でRetain=1のPublishを送るとそのトピックのRetainメッセージは削除されます。

注意点として

  • 最新1件のみ
  • QoSも保持
  • ワイルドカードの購読でも該当のメッセージをすべて受信

一般的な流れをシーケンス図にします。

実験

import paho.mqtt.client as mqtt
import time
import threading

BROKER = "localhost"
TOPIC = "retain/test"

# =========================
# Publisher
# =========================
def publisher():
    pub = mqtt.Client(
        client_id="pub-retain",
        protocol=mqtt.MQTTv311,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    )
    pub.connect(BROKER, 1883, 60)
    print("[PUB] publish retained message -> broker stores it")
    pub.publish(TOPIC, "RETAINED_PAYLOAD", retain=True)
    print("[PUB] finished")
    pub.disconnect()

# =========================
# Subscriber
# =========================
def on_message(client, userdata, msg):
    print(f"[SUB] recv topic={msg.topic} payload={msg.payload.decode()} retain={msg.retain}")

def on_connect(client, userdata, flags, reason_code, properties):
    print("[SUB] connected, subscribing")
    client.subscribe(TOPIC)

def subscriber():
    sub = mqtt.Client(
        client_id="sub-retain",
        protocol=mqtt.MQTTv311,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    )
    sub.on_connect = on_connect
    sub.on_message = on_message
    sub.connect(BROKER, 1883, 60)
    sub.loop_forever()

# =========================
# Main
# =========================
if __name__ == "__main__":
    publisher()
    time.sleep(3)
    # subscribe thread
    t = threading.Thread(target=subscriber, daemon=True)
    t.start()

    time.sleep(2)
    print("done")

  1. ConnectしてRetainフラグをtrueにして
    Publishします。 その接続は閉じます。

スクリーンショット 2025-11-16 23.23.25.jpg

Retain=trueのPublishパケット
スクリーンショット 2025-11-16 23.24.27.jpg

  1. 別のクライアントが接続してSubscribeすると、Subackが返ってすぐ、Retainされていたパケットがパブリッシュされます。こちらも
    フラグはRetain=trueになっています。

スクリーンショット 2025-11-16 23.27.25.jpg

Retainの消去

PublishのPayload部分のトピック名のあとに何も存在しない場合、そのトピック名に保存されているRetainは消去されるのでSubscribeをしても送付されなくなります。
スクリーンショット 2025-11-16 23.32.19.jpg

まとめ

Retainについて確認し、動作を確かめました。トピックに状態を保持できると考えるといろいろ使い所はありそうですね!

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?