非同期とは
CPUの速度はディスク、ネットワーク、その他のIO操作よりもはるかに速いです。しかし、スレッド内では、いくらCPUが高速に実行されていても、IO操作に遭遇すると、読み書きが完了するまで停止して待たなければならず、多くの時間が無駄になります。
この問題を解決するために、Pythonには非同期IOの機能が追加されました。Python 3.4ではasyncioが標準ライブラリに正式に含まれ、Python 3.5ではasync/awaitキーワードが追加されました。関数の前にasyncキーワードを追加することで、ユーザーは簡単に関数を非同期関数にすることができます。
PythonのMQTTクライアントライブラリでは、HBMQTTが非同期IOをサポートする最初のPython MQTTライブラリでした。
注意
HBMQTTはもはやメンテナンスされていません。別の開発者によってアクティブに開発されているフォーク aMQTT があります。詳細はこちらを参照してください。
HBMQTTライブラリ
HBMQTTは、Python上で書かれたオープンソースのライブラリで、MQTT 3.1.1プロトコルを実装しています。特徴は以下の通りです:
- QoS 0、QoS 1、QoS 2メッセージのサポート
- クライアントは自動的に再接続する
- TCPとWebSocketのサポート
- SSLのサポート
- プラグインシステムのサポート
この記事では、Python MQTT非同期フレームワーク - HBMQTTを使用して、MQTTのパブリッシュおよびサブスクライブ機能を備えた非同期デモを簡単に実装する方法を紹介します。
プロジェクトの初期化
Pythonバージョンの確認
このプロジェクトはPython 3.6を使用して開発およびテストされています。Pythonのバージョンを確認するには、以下のコマンドを使用できます。
Python 3.5以上でなければなりません。asyncキーワードを使用する必要があるためです。
➜ ~ python3 --version
Python 3.6.7
Pipを使ってHBMQTTライブラリをインストール
PipはPythonパッケージの管理ツールです。このツールは、Pythonパッケージを見つける、ダウンロードする、インストールする、アンインストールする機能を提供します。
pip3 install -i https://pypi.doubanio.com/simple hbmqtt
MQTTブローカーへの接続
この記事では、EMQXが提供する無料の公開MQTTブローカーを使用します。このサービスは、EMQXのIoTクラウドプラットフォームに基づいて作成されています。ブローカーへのアクセス情報は以下の通りです:
- ブローカー: broker.emqx.io
- TCPポート: 1883
- Websocketポート: 8083
まず、HBMQTTクライアントライブラリをインポートします。
from hbmqtt.client import MQTTClient
client = MQTTClient()
# ブローカーに接続
client.connect('mqtt://broker.emqx.io/')
# 切断
client.disconnect()
非同期関数は次のようです:
async def test_pub():
client = MQTTClient()
await client.connect('mqtt://broker.emqx.io/')
await client.disconnect()
メッセージのパブリッシュ
パブリッシュ関数は、MQTTClientクラスのpublish関数です。
client = MQTTClient()
# 関数の3つのパラメータはトピック、メッセージ内容、QoSです
client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)
非同期関数は次のようです:
async def test_pub():
client = MQTTClient()
await Client.connect('mqtt://broker.emqx.io/')
await asyncio.gather(
client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),
client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),
client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
)
logging.info("messages published")
await Client.disconnect()
このコードでは、これら3つのメッセージ送信関数をasyncioのタスクリストに入れ、順番に実行されます。すべてのタスクが完了したら、接続が切断されます。
サブスクライブ
サブスクライブ関数は、MQTTClientクラスのsubscribe関数です。
client = MQTTClient()
# サブスクライブ
client.subscribe([
('topic/0', QOS_0),
('topic/1', QOS_1),
])
# アンサブスクライブ
client.unsubscribe([
('topic/0', QOS_0),
]
非同期関数は次のようです:
async def test_sub():
client = MQTTClient()
await client.connect('mqtt://broker.emqx.io/')
await client.subscribe([
('a/b', QOS_1),
])
for i in range(0, 10):
message = await client.deliver_message()
packet = message.publish_packet
print(f"{i}: {packet.variable_header.topic_name} => {packet.payload.data}")
await client.disconnect
このコードでは、メッセージを受信する際にawaitを設定しているので、コードが次の位置に到達したとき、CPUはまず他のタスクを実行し、メッセージが配信された後でそれを印刷します。
message = await client.deliver_message()
最終的に、プログラムは10回メッセージの受信を待ち、その後接続を閉じます。
完全なサンプルコード
メッセージのサブスクライブ用コード
# sub.py
# python 3.6+
import asyncio
import logging
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1
async def test_sub():
client = MQTTClient()
await client.connect('mqtt://broker.emqx.io/')
await client.subscribe([
('a/b', QOS_1),
])
for i in range(0, 10):
message = await client.deliver_message()
packet = message.publish_packet
print(f"{i}: {packet.variable_header.topic_name} => {packet.payload.data}")
await client.disconnect()
if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.run(test_sub())
メッセージのパブリッシュ用コード
# pub.py
# python 3.6+
import asyncio
import logging
from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2
async def test_pub():
client = MQTTClient()
await client.connect('mqtt://broker.emqx.io/')
await asyncio.gather(
client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),
client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),
client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
)
logging.info("messages published")
await client.disconnect()
if __name__ == '__main__':
formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
logging.basicConfig(level=logging.INFO, format=formatter)
asyncio.run(test_pub())
テスト
メッセージのパブリッシュ
MQTTメッセージパブリッシュコードを実行すると、クライアントが正常に接続され、メッセージが正常にパブリッシュされたことが確認できます。
以下は、MQTTXクライアントがHBMQTTクライアントによってパブリッシュされたメッセージを正常に受信したことを示しています。
サブスクライブ
MQTTメッセージサブスクリプションコードを実行すると、クライアントが正常に接続され、メッセージが届くのを待っていることが確認できます。
MQTTXクライアントをbroker.emqx.ioに接続し、トピックa/bに10回メッセージを送信します。
ターミナルに戻ると、クライアントがメッセージを受信して印刷することが確認できます。また、10回のメッセージを受信した後、プログラムは自動的に終了します。
まとめ
これまでのところ、HBMQTTライブラリを公開MQTTブローカーに接続し、テストクライアントとMQTTブローカー間の接続、メッセージのパブリッシュ、サブスクライブを実装しました。Pythonの非同期IOを使用してメッセージの送受信を行うことで、より効果的なMQTTクライアントを実装することができます。