0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MQTT5 CleanStart / Session Expiry Interval について

Last updated at Posted at 2025-12-12

CleanSessionの廃止

[MQTT] Connect / CleanSessionについてではCleanSessionを紹介しましたが、MQTT5ではCleanSessionに代わって、CleanStartとSession Expiry Intervalが導入されました。

StartSession

 MQTT3でのConnect時のCleanSessionは、「当該接続で生じるセッションを次回切断時で維持するかどうか」を決めるものでしたが、それではConnect時に「この接続は前の接続を引き継ぐかどうか」を決めることはできません。
そこで、接続時に前回のセッションを引き継ぐかどうかを決めることができるようになりました。

  • CleanStart: True → 必ず新規セッションで接続開始
  • CleanStart: False → 過去セッションがあれば復元、なければ接続新規作成

Session Expiry Interval

 また、Session Expiry Intervalで、セッション維持/維持しないだけではなくて、セッションを維持させる時間をコントロールすることができるようになりました。切断してもセッションを維持する場合は0xFFFFFFFF、セッションを即時破棄する場合は0を入れます。

mqtt3からの置き換え

 上記2つがCleanSessionに代わります。CleanSession=1はSession Expiry Intervalを0に設定すること。CleanSession=0はSession Expiry Intervalを設定し、CleanStartを0に設定することで代替になります。

実験

  1. SessionExpiry=10秒
  2. サブスクライブが正常なことを確認
  3. 正常切断
  4. 30秒待って再接続

⇒ 購読は復元されないので、Publishしても受信されないことを確認

import time
import paho.mqtt.client as mqtt
from paho.mqtt.properties import Properties
from paho.mqtt.packettypes import PacketTypes

BROKER = "localhost"
TOPIC = "test/topic"


def on_message(client, userdata, msg):
    print(f"[受信] {msg.topic}: {msg.payload.decode()}")


# ===================================================
# 初回接続 → SUBSCRIBE → 指定 expiry 秒でセッション保持
# 初回にメッセージ受信テストを追加
# ===================================================
def first_connection(expiry):
    print(f"\n=== 初回接続: SUBSCRIBE & SessionExpiry={expiry} ===")

    cli = mqtt.Client(
        client_id="tester",
        protocol=mqtt.MQTTv5,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    )
    cli.on_message = on_message

    props = Properties(PacketTypes.CONNECT)
    props.SessionExpiryInterval = expiry  # ★セッション保持秒数

    cli.connect(BROKER, 1883, keepalive=30, clean_start=True, properties=props)
    cli.loop_start()

    cli.subscribe(TOPIC)
    print("[初回] SUBSCRIBE 済み")

    # --- ★追加:最初に届くか確認するためのテスト送信 ---
    test_pub = mqtt.Client(protocol=mqtt.MQTTv5,
                           callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
    test_pub.connect(BROKER, 1883, 30)
    print("[初回] テストメッセージ送信: 'Initial Test'")
    test_pub.publish(TOPIC, "Initial Test")
    test_pub.disconnect()
    # --------------------------------------------------------

    time.sleep(1)  # メッセージ受信待ち

    cli.disconnect()
    cli.loop_stop()
    print("[初回] 正常切断(セッションは保持されます)")


# ===================================================
# 再接続(SUBSCRIBEしないで受信確認)
# ===================================================
def reconnect_without_sub():
    print("\n=== 再接続: SUBSCRIBEなし(セッション残存チェック) ===")
    cli = mqtt.Client(
        client_id="tester",
        protocol=mqtt.MQTTv5,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
    )
    cli.on_message = on_message

    cli.connect(BROKER, 1883, keepalive=30, clean_start=False)
    cli.loop_start()

    return cli


# ===================================================
# Publisher(セッションが残っていればSubscriberで受信される)
# ===================================================
def publisher_send():
    pub = mqtt.Client(protocol=mqtt.MQTTv5,
                      callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
    pub.connect(BROKER, 1883, 30)

    for i in range(3):
        msg = f"Hello {i}"
        print(f"[送信] {msg}")
        pub.publish(TOPIC, msg)
        time.sleep(1)

    pub.disconnect()


# ===================================================
# 実行部分
# ===================================================
if __name__ == "__main__":
    SESSION_EXPIRY = 10
    WAIT_BEFORE_RECONNECT = 30

    # 1. 初回接続+受信テスト
    first_connection(expiry=SESSION_EXPIRY)

    print(f"\n=== {WAIT_BEFORE_RECONNECT} 秒待機してから再接続 ===")
    time.sleep(WAIT_BEFORE_RECONNECT)

    # 2. SUBSCRIBEなしで再接続
    cli = reconnect_without_sub()

    # 3. publisher が送信して受信されるかチェック
    print("\n[送信テスト] セッションが残っていれば受信される。消えていれば届かない。")
    time.sleep(1)

    publisher_send()

    time.sleep(2)

    cli.disconnect()
    cli.loop_stop()

    print("\n=== テスト終了 ===")

~/qiita/client]$uv run session5.py 

=== 初回接続: SUBSCRIBE & SessionExpiry=10 ===
[初回] SUBSCRIBE 済み
[初回] テストメッセージ送信: 'Initial Test'
[受信] test/topic: Initial Test
[初回] 正常切断(セッションは保持されます)

=== 30 秒待機してから再接続 ===

=== 再接続: SUBSCRIBEなし(セッション残存チェック) ===

[送信テスト] セッションが残っていれば受信される。消えていれば届かない。
[送信] Hello 0
[送信] Hello 1
[送信] Hello 2

=== テスト終了 ===
[~/qiita/client]$

まとめ

CleanStart, Session Expiry Intervalについて確認しました

著作権情報

Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?