はじめに
Sessionとそれに紐づくデータについて抑えて、QoS1/0でセッション存続時の配送の挙動を実際に確かめます。
Sessionとは
セッションはClient Identityに紐づき、接続断した際もSession Expiry Intervalが経過するまで存続します。セッションには下記状態を含みます。
- セッションの存在
- 購読(サブスクライブ)
- 送信待ちのメッセージ
- 未完了のQoS1/2のメッセージ, オプションでQoS0のメッセージ
- Willメッセージ及びWill Delay Interval
Retain MessageはTopicに紐づくものでありセッション状態に紐づくものではありません。
Sessionの保存
The Client and Server MUST NOT discard the Session State while the Network Connection is open [MQTT-4.1.0-1]. The Server MUST discard the Session State when the Network Connection is closed and the Session Expiry Interval has passed [MQTT-4.1.0-2].
訳:
クライアントとサーバは接続されている間はセッションは破棄してはいけない。サーバはネットワーク接続が閉じられてSession Expiry Intervalが過ぎた場合はセッション破棄しなければならない
サーバ側は保存しなければなりませんが、クライアント側については、切断後のセッション状態の保持方法は規定されておらず、実装や用途に委ねられていることが読み取れます。
規格書のその後に続く「4.1.1 Storing Session State
」「4.1.2 Session State non-normative examples
」には
- セッションが保存されているかどうか
- 保存するかどうか
- ストレージの可用性
は用途によって設計・評価する必要性があると読めます。
つまり、ここでSessionをどこに保存するか、その可用性は例えば
電力量計 → 保存先は揮発メモリでも許容
駐車料金 → 保存先は不揮発メモリ必須
など、ユースケースや要件によって変わってくるということです。
実験
サーバ側を
- mochi-co (https://qiita.com/heyanaohiro/items/3e38c348e1c9a84c7af5#mochi-co-mqtt-broker)
- mosquitto (https://qiita.com/heyanaohiro/items/3e38c348e1c9a84c7af5#mosquitto)
の2つ用意し、受信クライアントを接続断&セッションは残っている状態にします。
このとき、
- QoS0のメッセージ(OPTIONALで届く)
- QoS1のメッセージ(届くはず)
が届くかどうか確かめましょう
import paho.mqtt.client as mqtt
import time
BROKER = "localhost"
PORT = 1883
TOPIC = "test/topic"
SUB_CLIENT_ID = "sub-client"
PUB_CLIENT_ID = "pub-client"
# ======================
# Subscriber callback
# ======================
def on_message(client, userdata, msg):
print(f"[受信] QoS{msg.qos} {msg.topic}: {msg.payload.decode()}")
# ======================
# Step 1: Subscribe → Disconnect (Session を残す)
# ======================
def subscriber_connect_and_disconnect():
print("\n=== Step1: Subscriber connect & subscribe ===")
sub = mqtt.Client(
client_id=SUB_CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
sub.on_message = on_message
# ★ Session Expiry Interval を設定
props = mqtt.Properties(mqtt.PacketTypes.CONNECT)
props.SessionExpiryInterval = 30
sub.connect(
BROKER,
PORT,
keepalive=60,
clean_start=True,
properties=props
)
sub.subscribe(TOPIC, qos=1)
sub.loop_start()
time.sleep(2)
print("[Subscriber] Disconnect (Session should remain)")
sub.disconnect()
sub.loop_stop()
# ======================
# Step 2: Publish while offline
# ======================
def publisher_send_messages():
print("\n=== Step2: Publisher sends messages while offline ===")
pub = mqtt.Client(
client_id=PUB_CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
pub.connect(BROKER, PORT, 60)
pub.loop_start()
print("[Publisher] Send QoS0")
pub.publish(TOPIC, "QoS0 message", qos=0)
print("[Publisher] Send QoS1")
pub.publish(TOPIC, "QoS1 message", qos=1)
time.sleep(1)
pub.disconnect()
pub.loop_stop()
# ======================
# Step 3: Subscriber reconnects (Session 再利用)
# ======================
def subscriber_reconnect():
print("\n=== Step3: Subscriber reconnect ===")
sub = mqtt.Client(
client_id=SUB_CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
sub.on_message = on_message
# ★ clean_start=False が超重要
sub.connect(
BROKER,
PORT,
keepalive=60,
clean_start=False
)
sub.loop_start()
print("[Subscriber] Reconnected, waiting messages...")
time.sleep(5)
sub.disconnect()
sub.loop_stop()
# ======================
# Main
# ======================
if __name__ == "__main__":
subscriber_connect_and_disconnect()
time.sleep(1)
publisher_send_messages()
time.sleep(1)
subscriber_reconnect()
実行すると下記のようになりました。
mochi-co相手
[~/qiita/client]$uv run qos0_session.py
=== Step1: Subscriber connect & subscribe ===
[Subscriber] Disconnect (Session should remain)
=== Step2: Publisher sends messages while offline ===
[Publisher] Send QoS0
[Publisher] Send QoS1
=== Step3: Subscriber reconnect ===
[Subscriber] Reconnected, waiting messages...
[受信] QoS1 test/topic: QoS1 message
[~/qiita/client]$
mosquitto相手
[~/qiita/client]$uv run qos0_session.py
=== Step1: Subscriber connect & subscribe ===
[Subscriber] Disconnect (Session should remain)
=== Step2: Publisher sends messages while offline ===
[Publisher] Send QoS0
[Publisher] Send QoS1
=== Step3: Subscriber reconnect ===
[Subscriber] Reconnected, waiting messages...
[受信] QoS1 test/topic: QoS1 message
両方同じ結果になりました
- QoS1は規格仕様通り届く
- QoS0は今回のBroker実装では届かなかった
まとめ
MQTT5におけるSessionを確認し、Sessionが残っている際のQoS1とQoS0の配信の様子を確認しました。
mochi-coとmosquittoの2つのブローカーでQoS1が再接続時に配信され、QoS0では再配信されないという規格通りの挙動を確認しました。
実装する上でデータを正しく配送するには、セッションの設定を正しく選び、QoSを正しく選ぶことが大事であることがわかりました。
著作権情報
Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html