はじめに
今回は基本的なことですが、Disconnectについて確認して、Reasonの送信が行われることを確認しましょう。
Disconnectについて
- サーバ側から閉じる場合とクライアント側から閉じる場合があります
- 閉じる際にはDisconnectのパケットを送信したあとに、下のレイヤ(TCP/Websocket)を閉じます
Disconnect Reason
Disconnectのパケットには切断理由を伝えるメタデータを同封できます。
規格書の[3.14.2.1 Disconnect Reason Code]にReasonが記載してありますので,
動作的に重要そうな箇所のみ記述してみます。
- 0x00 Normal disconnection :通常切断
- 0x04 Disconnect with Will Message : 通常Disconnectで停止した場合にはWillメッセージは送信されませんがクライアントからこのメッセージが送信されてDisconnectされた場合にはWillメッセージを処理します
- 0x8E Session taken over : MQTTは同一IDの複数接続を許可していないうえに、あとからの接続を優先します。このメッセージは他に新たに接続が来たことを示します
- 0x9C Use another server : サーバ側をクラスタ構成した場合に同一クラスタで他のクラスタに接続してほしい場合にサーバ側から切断した際のReasonです。一時的なものでHTTPでいうと302 Foundです
- 0x9D Server moved : これも他サーバに接続してほしいというメッセージです. HTTPでいうと301 Moved Permanently
にあたります。 - 0x98 : Administrative action : 管理者によって切断がされることが定義されています
実践 : Reasonを見てみよう
Session taken overを確認してみましょう。Subscriberと同じmqtt idで接続すると後に接続したクライアントが前に接続していたクライアントを奪い、前のクライアントが切断されます。切断されるときのReasonがSession take overであることを確認します。
import paho.mqtt.client as mqtt
import threading
import time
BROKER = "localhost"
TOPIC = "test/topic"
CLIENT_ID = "shared-client" # 同じ ClientID
# ================ Subscriber ==================
def on_disconnect(client, userdata, flags, reason_code, properties):
print(f"[切断] reason={reason_code} ({reason_code.getName()})")
def on_message(client, userdata, msg):
print(f"[受信] {msg.topic}: {msg.payload.decode()}")
def subscriber():
sub = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
sub.on_disconnect = on_disconnect
sub.on_message = on_message
sub.connect(BROKER, 1883, 60)
sub.subscribe(TOPIC)
print("[Subscriber] 接続完了、待機中…")
# ================ Attacker =====================
def attacker():
time.sleep(2)
atk = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
print("[Attacker] Session takeover を開始")
atk.connect(BROKER, 1883, 60)
time.sleep(1)
atk.disconnect()
# ================ Publisher ====================
def publisher():
time.sleep(1)
pub = mqtt.Client(
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
pub.connect(BROKER, 1883, 60)
for i in range(3):
msg = f"Hello {i}"
pub.publish(TOPIC, msg)
print(f"[送信] {msg}")
time.sleep(1)
pub.disconnect()
# ================ Main =========================
if __name__ == "__main__":
t1 = threading.Thread(target=subscriber, daemon=True)
t1.start()
t2 = threading.Thread(target=attacker, daemon=True)
t2.start()
publisher()
time.sleep(5)
切断されたあとSubscriberが切れたので受信されません。
[~/qiita/client]$uv run take_over.py
[Subscriber] 接続完了、待機中…
[送信] Hello 0
[受信] test/topic: Hello 0
[Attacker] Session takeover を開始
[切断] reason=Session taken over (Session taken over)
[送信] Hello 1
[送信] Hello 2
[~/qiita/client]$
著作権情報
Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html