はじめに
今回はReceive Maximumについてみていきます。
QoS1はPubを送ってからPubackを返すまで、QoS2はPubcompでシーケンスが
終了するまで、パケットを保持する必要がありますが、大量かつ高速になってくるとメモリリソースを食い潰す懸念があります。そこで、QoS1/2において双方で滞留するパケット数に制限をもたせることでリソースを守ることができる仕組みが仕様として用意されています。
Receive Maximumとは
Connect/Connackの際にクライアントとサーバそれぞれがReceive Maximumを宣言することで滞留メッセージにクォータ制限を設定できます。この宣言に応じてサーバとクライアントはQoS1/2でメッセージを送信する際に、滞留するメッセージ数に応じて送信を実行しないという実装になります。
双方で独立の値を持ちます。サーバはConnectでクライアントから宣言された値を使用し、
The Server uses this value to limit the number of QoS 1 and QoS 2 publications that it is willing to process concurrently for the Client. - Connect
クライアントはConnackでサーバから宣言された値を利用します。
The Client uses this value to limit the number of QoS 1 and QoS 2 publications that it is willing to process concurrently. - Connack
何も設定しないと65533になります。
実際のサーバの実装上はバッファーに上限を持たせて、ある一定数以上はデータロストを割り切るか適切にReceive Maximumを設定するのがよりよいです。超えた値はエラー処理をしないと無限にリソースを食い潰すことになりアプリケーションやサーバ丸ごと落とすおそれもあります。
back-pressureとReceive Maximum
滞留するメッセージ数に応じて送信を実行しないということは、"back-pressure(背圧)"のアプリケーション層の実装ということになります。
一般に、ストリーミング処理では上流の処理速度 > 下流の処理速度になると、そのままでは処理が滞留しデータで溢れてしまうことになります。そのため、対策としては上流の速度に制限を加える仕組みが必要となり、それをbackpressureと呼びます。
backpressureはストリーミング処理だけでなく非同期の処理の至る所で実装されていて例を上げればキリがありませんが、思いつくものだけ挙げておきます。
- Apache Kafka (https://www.designandexecute.com/designs/how-to-manage-backpressure-in-kafka/)
- Rust / tokio (https://biriukov.dev/docs/async-rust-tokio-io/2-io-loop/#backpressure-propagation)
- Amazon OpenSearch Service (https://aws.amazon.com/jp/blogs/news/improved-resiliency-with-backpressure-and-admission-control-for-amazon-opensearch-service/ )
MQTTの話に戻りますと、システム全体でReceive Maximumを正しく設定し、実装することでクライアント・サーバの両方のリソースをコントロールすることが必要です。また、ブローカーを実装する際はクライアントの大量パケット送信に対して安全にエラーを返すような実装が必要です。
実践
- サーバ側は1日目のものを利用します
- クライアントは受信にRecieveMaximumを設定し、受信の挙動を見ます
- 受信はWiresharkでも確認します。Receiveの値によって受信する際のPub/Pubackのタイミングが変化
import paho.mqtt.client as mqtt
from paho.mqtt.packettypes import PacketTypes
from paho.mqtt.properties import Properties
import threading
import time
import queue
BROKER = "localhost"
TOPIC = "test/topic"
# クライアントがメッセージが1になっていればブロッキングになる()
msg_queue = queue.Queue(maxsize=1)
# ======================
# Subscriber
# ======================
def on_message(client, userdata, msg):
msg_queue.put(msg)
def worker():
while True:
msg = msg_queue.get()
print(f"[受信] {msg.topic}: {msg.payload.decode()}")
time.sleep(2) # 重い処理
msg_queue.task_done()
def subscriber():
sub = mqtt.Client(
client_id="sub-client",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
props = Properties(PacketTypes.CONNECT)
props.ReceiveMaximum = 10 # or 1 # backpressure を可視化
sub.on_message = on_message
sub.connect(BROKER, 1883, 60, properties=props)
sub.subscribe(TOPIC, qos=1)
sub.loop_forever()
# ======================
# Publisher
# ======================
def publisher():
pub = mqtt.Client(
client_id="pub-client",
protocol=mqtt.MQTTv5,
callback_api_version=mqtt.CallbackAPIVersion.VERSION2,
)
pub.connect(BROKER, 1883, 60)
for i in range(5):
msg = f"Hello {i}"
pub.publish(TOPIC, msg, qos=1)
print(f"[送信] {msg}")
time.sleep(0.2)
pub.disconnect()
# ======================
# Main
# ======================
if __name__ == "__main__":
threading.Thread(target=worker, daemon=True).start()
t = threading.Thread(target=subscriber, daemon=True)
t.start()
time.sleep(1)
publisher()
time.sleep(15) # ← ここは十分長く
Recieve Maximumが10の場合
Recieve Maximumが1の場合
まとめ
Recieve Maximumの挙動を確認しました