Will Delay Intervalとは
Willとは、クライアントが異常切断した場合にサーバ(ブローカー)が代わりに発行する予約済PUBLISHです。MQTT3まではクライアントが切断されるとWillは必ず即時発行されていました。MQTT5からは新たにWill Delay Intervalが導入され、クライアントは「切断されてからWillを発行するまでの猶予時間(秒)」を指定できるようになりました。
これにより、
- 一時的なネットワーク切断
- 短時間での再接続
といったケースで、不要なWillの発行を防ぐことができます。
嬉しい点としては、一時的な通信断が頻発する環境において、
「Broker側の不要なPublishを減らしSubscriber側の負荷を下げる」という効果があります。
Session Expiry Intervalとの関係
わかりづらい点として、WillはSessionと結びついています。
仕様上、サーバは次のどちらかが先に起きた時点でWillを発効させます。
- Will Delay Intervalが経過
- Sessionが終了
つまり、
- Sessionが存続している間だけWill Delayは有効
- Sessionが終了した瞬間、WillはDelayを待たずに即時発効
です。そのため、Session Expiry Interval = 0の場合は
「切断と同時にSessionが終了⇒Will Delay Intervalが設定されていてもWillは即時発効」となります。
| 事象 | Session の状態 | Will Delay Interval | Will は発行されるか | 理由 |
|---|---|---|---|---|
| 異常切断 | 継続 | 0 | ✅ 即時 | Will Delay が無い |
| 異常切断 | 継続 | > 0 | ⏳ Delay 待ち | Session が生きている |
| Will Delay 経過 | 継続 | > 0 | ✅ 発行 | 規定時間経過 |
| Delay 中に再接続(Clean Start = 0) | 継続 | > 0 | ❌ 抑止 | Session が継続 |
| Delay 中に再接続(Clean Start = 1) | 終了 | > 0 | ✅ 即時 | Session 終了が先 |
| 異常切断 | 終了 | > 0 | ✅ 即時 | Session 終了が Delay より先 |
| 正常 DISCONNECT | 終了 | 任意 | ❌ 発行されない | Will 条件を満たさない |
実験
サーバ側は1日目のmochi-coを利用しました。
4パターンを試します。
- Will Delay only(通常発行)
異常切断⇒Will Delay経過⇒Will発行 - Will Delay + Clean Start = 0
異常切断後も再接続でSession引き継ぐ⇒Will抑止 - Will Delay + Clean Start = 1
異常切断後、再接続でSessionを破棄⇒再接続時にWill発行 - "Will Delay + Session Expiry = 0
異常切断後、セッションが破棄され即時発効
import paho.mqtt.client as mqtt
import threading
import time
import socket
from typing import Optional
BROKER = "localhost"
PORT = 1883
TOPIC = "test/topic"
CLIENT_ID = "will-test-client"
# ======================
# Subscriber
# ======================
# Will の PUBLISH を受信するための Subscriber
def on_sub_message(client, userdata, msg):
print(f"[SUB受信] {msg.topic}: {msg.payload.decode()}")
def subscriber():
sub = mqtt.Client(
client_id="sub-client",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
sub.on_message = on_sub_message
sub.connect(BROKER, PORT)
sub.subscribe(TOPIC)
sub.loop_forever()
# ======================
# Will Client
# ======================
def connect_with_will(
clean_start: bool,
label: str,
session_expiry: Optional[int] = None,
):
print(f"\n=== {label} (clean_start={clean_start}) ===")
connected = threading.Event()
def on_connect(client, userdata, flags, reason_code, properties):
print("-> on_connect (CONNACK received)")
connected.set()
client = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
client.on_connect = on_connect
# ---- CONNECT Properties ----
# Session Expiry Interval を指定する場合のみ CONNECT Properties を付与
conn_props = None
if session_expiry is not None:
conn_props = mqtt.Properties(mqtt.PacketTypes.CONNECT)
setattr(conn_props, "Session Expiry Interval", session_expiry)
# ---- Will Properties ----
# Will Delay Interval = 5 秒
# → 異常切断後、Session が存続していれば 5 秒後に Will が発行される
will_props = mqtt.Properties(mqtt.PacketTypes.WILLMESSAGE)
setattr(will_props, "Will Delay Interval", 5)
# Will メッセージを設定
client.will_set(
TOPIC,
payload=f"WILL FIRED ({label})",
qos=0,
retain=False,
properties=will_props
)
# CONNECT
client.connect(
BROKER,
PORT,
clean_start=clean_start,
properties=conn_props
)
client.loop_start()
connected.wait(timeout=5)
# 正常 DISCONNECT ではなく、TCP を直接 close して異常切断を発生させる
print("-> 強制切断(TCP close)")
sock: socket.socket = client._sock
sock.close()
client.loop_stop()
# ======================
# Main
# ======================
if __name__ == "__main__":
# Subscriber 起動(Will 受信用)
threading.Thread(target=subscriber, daemon=True).start()
time.sleep(1)
# --------------------------------------------------
# ケース0:
# Will Delay Interval 経過後、通常どおり Will が発行される
# --------------------------------------------------
connect_with_will(
clean_start=False,
label="Will Delay only(通常発行)"
)
print("-> 6 秒待機(Will Delay 経過)")
time.sleep(6)
# --------------------------------------------------
# ケース1:
# Clean Start = 0 で再接続すると Session が復活し、
# Will Delay Interval 内であれば Will は抑止される
# --------------------------------------------------
connect_with_will(
clean_start=False,
label="Will Delay + Clean Start = 0"
)
time.sleep(2)
# 同一 ClientID で再接続(Session 継続)
reconnect = mqtt.Client(
client_id=CLIENT_ID,
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2
)
reconnect.connect(BROKER, PORT, clean_start=False)
reconnect.disconnect()
print("-> 再接続完了(Will は送信されない)")
time.sleep(6)
# --------------------------------------------------
# ケース2:
# Clean Start = 1 は既存 Session を終了させるため、
# Will Delay Interval に関係なく Will は即時発行される
# --------------------------------------------------
connect_with_will(
clean_start=True,
label="Will Delay + Clean Start = 1"
)
time.sleep(6)
# --------------------------------------------------
# ケース3:
# Session Expiry Interval = 0 のため、切断と同時に Session が終了
# → Will Delay Interval は適用されず、Will は即時発行される
# --------------------------------------------------
connect_with_will(
clean_start=False,
label="Will Delay + Session Expiry = 0",
session_expiry=0
)
time.sleep(6)
[~/qiita/client]$uv run will_delay_interval.py
=== Will Delay only(通常発行) (clean_start=False) ===
-> on_connect (CONNACK received)
-> 強制切断(TCP close)
-> 6 秒待機(Will Delay 経過)
[SUB受信] test/topic: WILL FIRED (Will Delay only(通常発行))
=== Will Delay + Clean Start = 0 (clean_start=False) ===
-> on_connect (CONNACK received)
-> 強制切断(TCP close)
-> 再接続完了(Will は送信されない)
=== Will Delay + Clean Start = 1 (clean_start=True) ===
-> on_connect (CONNACK received)
-> 強制切断(TCP close)
[SUB受信] test/topic: WILL FIRED (Will Delay + Clean Start = 1)
=== Will Delay + Session Expiry = 0 (clean_start=False) ===
-> on_connect (CONNACK received)
-> 強制切断(TCP close)
[SUB受信] test/topic: WILL FIRED (Will Delay + Session Expiry = 0)
[~/qiita/client]$
まとめ
Will Delay Interval は、異常切断が起きてもすぐに Willを送らず、しばらく様子を見るための仕組みです。
その待ち時間のあいだに Sessionが再接続で引き継がれれば Willは送られず、逆にSessionが終了してしまった場合はWillが発行されます。
著作権情報
Copyright © OASIS Open 2014. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html
Copyright © OASIS Open 2019. All Rights Reserved.
Available at: https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html