0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MQTT5で保存されるSession State

Last updated at Posted at 2025-12-24

はじめに

Sessionとそれに紐づくデータについて抑えて、QoS1/0でセッション存続時の配送の挙動を実際に確かめます。

Sessionとは

セッションはClient Identityに紐づき、接続断した際もSession Expiry Intervalが経過するまで存続します。セッションには下記状態を含みます。

  • セッションの存在
  • 購読(サブスクライブ)
  • 送信待ちのメッセージ
  • 未完了のQoS1/2のメッセージ, オプションでQoS0のメッセージ
  • Willメッセージ及びWill Delay Interval

Retain MessageはTopicに紐づくものでありセッション状態に紐づくものではありません。

Sessionの保存

The Client and Server MUST NOT discard the Session State while the Network Connection is open [MQTT-4.1.0-1]. The Server MUST discard the Session State when the Network Connection is closed and the Session Expiry Interval has passed [MQTT-4.1.0-2].

訳:

クライアントとサーバは接続されている間はセッションは破棄してはいけない。サーバはネットワーク接続が閉じられてSession Expiry Intervalが過ぎた場合はセッション破棄しなければならない

サーバ側は保存しなければなりませんが、クライアント側については、切断後のセッション状態の保持方法は規定されておらず、実装や用途に委ねられていることが読み取れます。

規格書のその後に続く「4.1.1 Storing Session State
」「4.1.2 Session State non-normative examples
」には

  • セッションが保存されているかどうか
  • 保存するかどうか
  • ストレージの可用性

は用途によって設計・評価する必要性があると読めます。

つまり、ここでSessionをどこに保存するか、その可用性は例えば

電力量計 → 保存先は揮発メモリでも許容
駐車料金 → 保存先は不揮発メモリ必須

など、ユースケースや要件によって変わってくるということです。

実験

サーバ側を

の2つ用意し、受信クライアントを接続断&セッションは残っている状態にします。
このとき、

  • QoS0のメッセージ(OPTIONALで届く)
  • QoS1のメッセージ(届くはず)

が届くかどうか確かめましょう

import paho.mqtt.client as mqtt
import time

BROKER = "localhost"
PORT = 1883
TOPIC = "test/topic"

SUB_CLIENT_ID = "sub-client"
PUB_CLIENT_ID = "pub-client"

# ======================
# Subscriber callback
# ======================
def on_message(client, userdata, msg):
    print(f"[受信] QoS{msg.qos} {msg.topic}: {msg.payload.decode()}")

# ======================
# Step 1: Subscribe → Disconnect (Session を残す)
# ======================
def subscriber_connect_and_disconnect():
    print("\n=== Step1: Subscriber connect & subscribe ===")

    sub = mqtt.Client(
        client_id=SUB_CLIENT_ID,
        protocol=mqtt.MQTTv5,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2
    )
    sub.on_message = on_message

    # ★ Session Expiry Interval を設定
    props = mqtt.Properties(mqtt.PacketTypes.CONNECT)
    props.SessionExpiryInterval = 30

    sub.connect(
        BROKER,
        PORT,
        keepalive=60,
        clean_start=True,
        properties=props
    )

    sub.subscribe(TOPIC, qos=1)
    sub.loop_start()

    time.sleep(2)

    print("[Subscriber] Disconnect (Session should remain)")
    sub.disconnect()
    sub.loop_stop()

# ======================
# Step 2: Publish while offline
# ======================
def publisher_send_messages():
    print("\n=== Step2: Publisher sends messages while offline ===")

    pub = mqtt.Client(
        client_id=PUB_CLIENT_ID,
        protocol=mqtt.MQTTv5,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2
    )

    pub.connect(BROKER, PORT, 60)
    pub.loop_start()

    print("[Publisher] Send QoS0")
    pub.publish(TOPIC, "QoS0 message", qos=0)

    print("[Publisher] Send QoS1")
    pub.publish(TOPIC, "QoS1 message", qos=1)

    time.sleep(1)
    pub.disconnect()
    pub.loop_stop()

# ======================
# Step 3: Subscriber reconnects (Session 再利用)
# ======================
def subscriber_reconnect():
    print("\n=== Step3: Subscriber reconnect ===")

    sub = mqtt.Client(
        client_id=SUB_CLIENT_ID,
        protocol=mqtt.MQTTv5,
        callback_api_version=mqtt.CallbackAPIVersion.VERSION2
    )
    sub.on_message = on_message

    # ★ clean_start=False が超重要
    sub.connect(
        BROKER,
        PORT,
        keepalive=60,
        clean_start=False
    )

    sub.loop_start()
    print("[Subscriber] Reconnected, waiting messages...")

    time.sleep(5)

    sub.disconnect()
    sub.loop_stop()

# ======================
# Main
# ======================
if __name__ == "__main__":
    subscriber_connect_and_disconnect()
    time.sleep(1)

    publisher_send_messages()
    time.sleep(1)

    subscriber_reconnect()

実行すると下記のようになりました。

mochi-co相手

[~/qiita/client]$uv run qos0_session.py 

=== Step1: Subscriber connect & subscribe ===
[Subscriber] Disconnect (Session should remain)

=== Step2: Publisher sends messages while offline ===
[Publisher] Send QoS0
[Publisher] Send QoS1

=== Step3: Subscriber reconnect ===
[Subscriber] Reconnected, waiting messages...
[受信] QoS1 test/topic: QoS1 message
[~/qiita/client]$

mosquitto相手

[~/qiita/client]$uv run qos0_session.py 

=== Step1: Subscriber connect & subscribe ===
[Subscriber] Disconnect (Session should remain)

=== Step2: Publisher sends messages while offline ===
[Publisher] Send QoS0
[Publisher] Send QoS1

=== Step3: Subscriber reconnect ===
[Subscriber] Reconnected, waiting messages...
[受信] QoS1 test/topic: QoS1 message

両方同じ結果になりました

  • QoS1は規格仕様通り届く
  • QoS0は今回のBroker実装では届かなかった

まとめ

MQTT5におけるSessionを確認し、Sessionが残っている際のQoS1とQoS0の配信の様子を確認しました。
mochi-coとmosquittoの2つのブローカーでQoS1が再接続時に配信され、QoS0では再配信されないという規格通りの挙動を確認しました。
実装する上でデータを正しく配送するには、セッションの設定を正しく選び、QoSを正しく選ぶことが大事であることがわかりました。

著作権情報

Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html

Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?