CleanSessionの廃止
[MQTT] Connect / CleanSessionについてではCleanSessionを紹介しましたが、MQTT5ではCleanSessionに代わって、CleanStartとSession Expiry Intervalが導入されました。
StartSession
MQTT3でのConnect時のCleanSessionは、「当該接続で生じるセッションを次回切断時で維持するかどうか」を決めるものでしたが、それではConnect時に「この接続は前の接続を引き継ぐかどうか」を決めることはできません。
そこで、接続時に前回のセッションを引き継ぐかどうかを決めることができるようになりました。
- CleanStart: True → 必ず新規セッションで接続開始
- CleanStart: False → 過去セッションがあれば復元、なければ接続新規作成
Session Expiry Interval
また、Session Expiry Intervalで、セッション維持/維持しないだけではなくて、セッションを維持させる時間をコントロールすることができるようになりました。切断してもセッションを維持する場合は0xFFFFFFFF、セッションを即時破棄する場合は0を入れます。
mqtt3からの置き換え
上記2つがCleanSessionに代わります。CleanSession=1はSession Expiry Intervalを0に設定すること。CleanSession=0はSession Expiry Intervalを設定し、CleanStartを0に設定することで代替になります。
実験
- SessionExpiry=10秒
- サブスクライブが正常なことを確認
- 正常切断
- 30秒待って再接続
⇒ 購読は復元されないので、Publishしても受信されないことを確認
import time
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes
BROKER = "localhost"
TOPIC = "test/topic"
def on_message(client, userdata, msg):
print(f"[受信] {msg.topic}: {msg.payload.decode()}")
# ===================================================
# 初回接続 → SUBSCRIBE → 指定 expiry 秒でセッション保持
# 初回にメッセージ受信テストを追加
# ===================================================
def first_connection(expiry):
print(f"\n=== 初回接続: SUBSCRIBE & SessionExpiry={expiry} ===")
cli = mqtt.Client(
client_id="tester",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
cli.on_message = on_message
props = Properties(PacketTypes.CONNECT)
props.SessionExpiryInterval = expiry # ★セッション保持秒数
cli.connect(BROKER, 1883, keepalive=30, clean_start=True, properties=props)
cli.loop_start()
cli.subscribe(TOPIC)
print("[初回] SUBSCRIBE 済み")
# --- ★追加:最初に届くか確認するためのテスト送信 ---
test_pub = mqtt.Client(protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
test_pub.connect(BROKER, 1883, 30)
print("[初回] テストメッセージ送信: 'Initial Test'")
test_pub.publish(TOPIC, "Initial Test")
test_pub.disconnect()
# --------------------------------------------------------
time.sleep(1) # メッセージ受信待ち
cli.disconnect()
cli.loop_stop()
print("[初回] 正常切断(セッションは保持されます)")
# ===================================================
# 再接続(SUBSCRIBEしないで受信確認)
# ===================================================
def reconnect_without_sub():
print("\n=== 再接続: SUBSCRIBEなし(セッション残存チェック) ===")
cli = mqtt.Client(
client_id="tester",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
cli.on_message = on_message
cli.connect(BROKER, 1883, keepalive=30, clean_start=False)
cli.loop_start()
return cli
# ===================================================
# Publisher(セッションが残っていればSubscriberで受信される)
# ===================================================
def publisher_send():
pub = mqtt.Client(protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
pub.connect(BROKER, 1883, 30)
for i in range(3):
msg = f"Hello {i}"
print(f"[送信] {msg}")
pub.publish(TOPIC, msg)
time.sleep(1)
pub.disconnect()
# ===================================================
# 実行部分
# ===================================================
if __name__ == "__main__":
SESSION_EXPIRY = 10
WAIT_BEFORE_RECONNECT = 30
# 1. 初回接続+受信テスト
first_connection(expiry=SESSION_EXPIRY)
print(f"\n=== {WAIT_BEFORE_RECONNECT} 秒待機してから再接続 ===")
time.sleep(WAIT_BEFORE_RECONNECT)
# 2. SUBSCRIBEなしで再接続
cli = reconnect_without_sub()
# 3. publisher が送信して受信されるかチェック
print("\n[送信テスト] セッションが残っていれば受信される。消えていれば届かない。")
time.sleep(1)
publisher_send()
time.sleep(2)
cli.disconnect()
cli.loop_stop()
print("\n=== テスト終了 ===")
~/qiita/client]$uv run session5.py
=== 初回接続: SUBSCRIBE & SessionExpiry=10 ===
[初回] SUBSCRIBE 済み
[初回] テストメッセージ送信: 'Initial Test'
[受信] test/topic: Initial Test
[初回] 正常切断(セッションは保持されます)
=== 30 秒待機してから再接続 ===
=== 再接続: SUBSCRIBEなし(セッション残存チェック) ===
[送信テスト] セッションが残っていれば受信される。消えていれば届かない。
[送信] Hello 0
[送信] Hello 1
[送信] Hello 2
=== テスト終了 ===
[~/qiita/client]$
まとめ
CleanStart, Session Expiry Intervalについて確認しました
著作権情報
Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html