はじめに
今回はmqtt3のConnect時に設定するCleanSessionについて確認します。
CleanSessionについて
Connect時の挙動として、Variable HeaderのCleanSession のフラグを 1 にするとCONNECTした時点で既存セッションが破棄され、その接続中に作られたセッションは接続の終了と同時に破棄されます。
- 0 → セッションを残す
- 1 → セッションを削除する
維持/破棄されるSessionの中身は下記です。
- Subscribe
- QoS1の未達メッセージ(未ACKのPUBLISH)
- QoS2の未処理メッセージ(PUBREC/PUBRELの途中状態)
下記はSessionには含まれません
- Retain:トピックに紐づいているので、セッションには紐づかない
- Will:切断したら破棄される、再登録にはConnect時に維持が必要
重要なのはCleanSessionが当該接続時にセッションを破棄するということです。つまり例えば、0で接続していたにも関わらず気が変わって、その接続をCleanSessionしたい場合には、一度切断しCleanSessionを1にして再接続するというオペレーションが必要です。このことは規格のNon normative commentにも記載されています。
Clients should only connect with CleanSession set to 0, if they intend to reconnect to the Server at some later point in time. When a Client has determined that it has no further use for the session it should do a final connect with CleanSession set to 1 and then disconnect.
クライアントは、将来的にサーバーに再接続する予定がある場合に限り、CleanSessionを0に設定して接続する必要があります。クライアントがセッションをこれ以上必要としないと判断した場合、CleanSessionを1に設定した最終接続を行った後、切断する必要があります。
接続A(CleanSession=0) ← セッション作成(永続)
↓ 切断(セッションはサーバ側に保持される)
接続B(CleanSession=1) ← ← ← ここで "既存セッションを破棄"(サーバが即時破棄)
↓ 切断(Bで作られたセッションは接続のみ有効だったので消える)
筆者にはあまり現実的に使用される仕様だとは思われませんが、どうしてもデータを残したくない場合は使用することになるかと思います。またブローカーやIoTプラットフォームサーバ側を実装している場合はSessionを削除するような機能があれば親切だと思います
実験
ここでは一旦Sessionを維持してサブスクライブが維持される様子を確認してみましょう。
import threading
import time
import paho.mqtt.client as mqtt
BROKER = "localhost"
TOPIC = "test/topic"
CLIENT_ID = "my-session-client"
# =========================================
# Subscriber function (controlled by stop_event)
# =========================================
def subscriber(name: str, stop_event: threading.Event):
client = mqtt.Client(
client_id=CLIENT_ID,
clean_session=False, # ★ セッションを保持
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
def on_connect(c, u, flags, rc, properties=None):
print(f"[{name}] CONNECT rc={rc} session_present={flags.session_present}")
def on_message(c, u, msg):
print(f"[{name}] RECV {msg.payload.decode()}")
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, 1883, 60)
client.loop_start()
# t1 のみ subscribe させる(t2 は subscribe しない)
if name == "t1":
print(f"[{name}] サブスクライブします!")
client.subscribe(TOPIC, qos=1)
else:
print(f"[{name}] サブスクライブしません!")
# 停止合図まで待つ
stop_event.wait()
print(f"[{name}] STOP")
client.disconnect()
client.loop_stop()
# =========================================
# Publisher
# =========================================
def publisher(label: str):
pub = mqtt.Client(callback_api_version=mqtt.CallbackAPIVersion.VERSION2)
pub.connect(BROKER, 1883, 60)
for i in range(3):
msg = f"{label} {i}"
pub.publish(TOPIC, msg, qos=1)
print(f"[PUB] SEND {msg}")
time.sleep(1)
pub.disconnect()
# =========================================
# Main
# =========================================
if __name__ == "__main__":
print("=== Start subscriber t1 ===")
stop1 = threading.Event()
t1 = threading.Thread(target=subscriber, args=("t1", stop1))
t1.start() # 1回目のSubscribeスタート
time.sleep(2)
print("=== Publish messages for t1 ===")
publisher("Hello_t1")
# ---- kill t1 ----
print("=== Disconnect t1 ===")
stop1.set() # 1回目のSubscribe終了
t1.join()
time.sleep(1)
print("=== Start subscriber t2 (same ClientID) ===")
stop2 = threading.Event() # 2回目のSubscribeスタート
t2 = threading.Thread(target=subscriber, args=("t2", stop2))
t2.start()
time.sleep(2)
print("=== Publish messages for t2 ===")
publisher("Hello_t2")
stop2.set()
t2.join()
print("=== END ===")
1回目の接続でCleanSessionを0にしているので2回目の接続では接続後にサブスクライブしなくても1回目のサブスクライブが有効になって受信ができていることが確認できました。以前の接続が残っているためConnackのsession_presentフラグはtrueになっています。
[~/qiita/client]$uv run session_sub.py
=== Start subscriber t1 ===
[t1] サブスクライブします!
[t1] CONNECT rc=Success session_present=True
=== Publish messages for t1 ===
[PUB] SEND Hello_t1 0
[t1] RECV Hello_t1 0
[PUB] SEND Hello_t1 1
[t1] RECV Hello_t1 1
[PUB] SEND Hello_t1 2
[t1] RECV Hello_t1 2
=== Disconnect t1 ===
[t1] STOP
=== Start subscriber t2 (same ClientID) ===
[t2] サブスクライブしません!
[t2] CONNECT rc=Success session_present=True
=== Publish messages for t2 ===
[PUB] SEND Hello_t2 0
[t2] RECV Hello_t2 0
[PUB] SEND Hello_t2 1
[t2] RECV Hello_t2 1
[PUB] SEND Hello_t2 2
[t2] RECV Hello_t2 2
[t2] STOP
=== END ===
まとめ
mqtt3のConnect時に使用されるCleanSessionについて確認しました。mqtt5では異なるフラグが用いられますのでそれは別の記事で書こうと思います。
著作権情報
Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html