はじめに
Paho MQTT ClientはメジャーなMQTTクライアントです。MQTTは常時接続が前提であるのせいで、というか、Pythonで非同期が慣れていないせいで、ライブラリ自体がひと癖あると感じました。そこでPython Paho MQTT Clientのネットワークループについて整理しようと思います。
ネットワークループ処理
3種類ループ関数が用意されています。受信したパケットはループ関数で処理されます。
loop_start() / loop_stop()
loop_start()はバックグラウンドでループスレッドを起動し、そのスレッドは内部でloop() を繰り返し実行します。loop_start() のスレッドは loop_stop() を呼ぶまで動作します。自動再接続が有効な場合、接続断があってもスレッドは動き続けます。
mqttc.loop_start()
while True:
temperature = sensor.blocking_read()
mqttc.publish("paho/temperature", temperature)
mqttc.loop_stop()
loop_forever()
最も単純な blocking APIです。ここで永遠にブロックされ、別スレッドから停止するようなAPIがないのでコントロールがしずらいことに注意です。
mqttc.loop_forever(retry_first_connection=False)
loop()
一番プリミティブです。これはloop_forever()のように永遠にはブロックされませんが(タイムアウトする)、ブロッキングの呼び出しです。手動でループを組みたい場合に使用されます。
It is strongly recommended that you use `loop_start()`, or `loop_forever()`, or if you are using an external > event loop using `loop_read()`, `loop_write()`, and `loop_misc()`. >Using loop() on it's own is no longer recommended.
run = True
while run:
rc = mqttc.loop(timeout=1.0)
if rc != 0:
# need to handle error, possible reconnecting or stopping the application
loop()の内部実装
一番プリミティブな関数であるので、内部の実装をみてみましょう。
内部でselectを呼んでいます。selectはReadのTCPソケットとWriteのTCPソケットとこちら側で用意した叩き起こす用のソケットを監視して書き込み/読み込みを待機します。
try:
socklist = select.select(rlist, wlist, [], timeout)
- 書き込み可能になったら、内部からキューを取り出してPublishする
def _packet_write - 読み込み可能になったら
def _packet_read(self)で読み出す -
_packet_readではステートマシンに基づいてfixed header, variable header, payloadを読み出す - 読み取ってそれぞれパケットに応じてCallbackの処理を行う
もっと高度な使い方をする場合は、read/write/keepaliveやQoSそれぞれの用途にloop_read(), loop_write(), loop_misc()が用意されています。いずれも、自らループを管理したい場合に使用されます
-
loop_read(): ソケットから受信しMQTTパケットをdecodeします -
loop_write(): 未送信のキューからパケットを取り出して送信する -
loop_misc(): 1度呼ぶとkeepaliveなどの必要な雑用タスク(misc)を一通り行います。 実装部分:https://github.com/eclipse-paho/paho.mqtt.python/blob/d45de3737879cfe7a6acc361631fa5cb1ef584bb/src/paho/mqtt/client.py#L2140
まとめ
ネットワークループについて見てみました。楽する使い方と色々コントロールできる使い方の2パターンのAPIを用意されているのはありがたいですね。
参考資料
Paho自体の使い方