0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PythonでMQTT非同期フレームワーク - HBMQTT

Posted at

非同期とは

CPUの速度はディスク、ネットワーク、その他のIO操作よりもはるかに速いです。しかし、スレッド内では、いくらCPUが高速に実行されていても、IO操作に遭遇すると、読み書きが完了するまで停止して待たなければならず、多くの時間が無駄になります。

この問題を解決するために、Pythonには非同期IOの機能が追加されました。Python 3.4ではasyncioが標準ライブラリに正式に含まれ、Python 3.5ではasync/awaitキーワードが追加されました。関数の前にasyncキーワードを追加することで、ユーザーは簡単に関数を非同期関数にすることができます。

PythonのMQTTクライアントライブラリでは、HBMQTTが非同期IOをサポートする最初のPython MQTTライブラリでした。

注意

HBMQTTはもはやメンテナンスされていません。別の開発者によってアクティブに開発されているフォーク aMQTT があります。詳細はこちらを参照してください。

HBMQTTライブラリ

HBMQTTは、Python上で書かれたオープンソースのライブラリで、MQTT 3.1.1プロトコルを実装しています。特徴は以下の通りです:

  • QoS 0、QoS 1、QoS 2メッセージのサポート
  • クライアントは自動的に再接続する
  • TCPとWebSocketのサポート
  • SSLのサポート
  • プラグインシステムのサポート

この記事では、Python MQTT非同期フレームワーク - HBMQTTを使用して、MQTTのパブリッシュおよびサブスクライブ機能を備えた非同期デモを簡単に実装する方法を紹介します。

プロジェクトの初期化

Pythonバージョンの確認

このプロジェクトはPython 3.6を使用して開発およびテストされています。Pythonのバージョンを確認するには、以下のコマンドを使用できます。

Python 3.5以上でなければなりません。asyncキーワードを使用する必要があるためです。

➜  ~ python3 --version
Python 3.6.7

Pipを使ってHBMQTTライブラリをインストール

PipはPythonパッケージの管理ツールです。このツールは、Pythonパッケージを見つける、ダウンロードする、インストールする、アンインストールする機能を提供します。

pip3 install -i https://pypi.doubanio.com/simple hbmqtt

MQTTブローカーへの接続

この記事では、EMQXが提供する無料の公開MQTTブローカーを使用します。このサービスは、EMQXのIoTクラウドプラットフォームに基づいて作成されています。ブローカーへのアクセス情報は以下の通りです:

  • ブローカー: broker.emqx.io
  • TCPポート: 1883
  • Websocketポート: 8083

まず、HBMQTTクライアントライブラリをインポートします。

from hbmqtt.client import MQTTClient

client = MQTTClient()
# ブローカーに接続
client.connect('mqtt://broker.emqx.io/')
# 切断
client.disconnect()

非同期関数は次のようです:

async def test_pub():
    client = MQTTClient()
    await client.connect('mqtt://broker.emqx.io/')
    await client.disconnect()

メッセージのパブリッシュ

パブリッシュ関数は、MQTTClientクラスのpublish関数です。

client = MQTTClient()
# 関数の3つのパラメータはトピック、メッセージ内容、QoSです
client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0)

非同期関数は次のようです:

async def test_pub():
    client = MQTTClient()
    await Client.connect('mqtt://broker.emqx.io/')
    await asyncio.gather(
        client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
    )
    logging.info("messages published")
    await Client.disconnect()

このコードでは、これら3つのメッセージ送信関数をasyncioのタスクリストに入れ、順番に実行されます。すべてのタスクが完了したら、接続が切断されます。

サブスクライブ

サブスクライブ関数は、MQTTClientクラスのsubscribe関数です。

client = MQTTClient()
# サブスクライブ
client.subscribe([
  ('topic/0', QOS_0),
  ('topic/1', QOS_1),  
])
# アンサブスクライブ
client.unsubscribe([
  ('topic/0', QOS_0),
]

非同期関数は次のようです:

async def test_sub():
    client = MQTTClient()
    await client.connect('mqtt://broker.emqx.io/')
    await client.subscribe([
            ('a/b', QOS_1),
         ])
    for i in range(0, 10):
        message = await client.deliver_message()
        packet = message.publish_packet
        print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
    await client.disconnect

このコードでは、メッセージを受信する際にawaitを設定しているので、コードが次の位置に到達したとき、CPUはまず他のタスクを実行し、メッセージが配信された後でそれを印刷します。

message = await client.deliver_message()

最終的に、プログラムは10回メッセージの受信を待ち、その後接続を閉じます。

完全なサンプルコード

メッセージのサブスクライブ用コード

# sub.py
# python 3.6+

import asyncio
import logging

from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_1

async def test_sub():
    client = MQTTClient()
    await client.connect('mqtt://broker.emqx.io/')
    await client.subscribe([
        ('a/b', QOS_1),
    ])
    for i in range(0, 10):
        message = await client.deliver_message()
        packet = message.publish_packet
        print(f"{i}:  {packet.variable_header.topic_name} => {packet.payload.data}")
    await client.disconnect()

if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.INFO, format=formatter)
    asyncio.run(test_sub())

メッセージのパブリッシュ用コード

# pub.py
# python 3.6+

import asyncio
import logging

from hbmqtt.client import MQTTClient
from hbmqtt.mqtt.constants import QOS_0, QOS_1, QOS_2

async def test_pub():
    client = MQTTClient()

    await client.connect('mqtt://broker.emqx.io/')
    await asyncio.gather(
        client.publish('a/b', b'TEST MESSAGE WITH QOS_0', qos=QOS_0),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_1', qos=QOS_1),
        client.publish('a/b', b'TEST MESSAGE WITH QOS_2', qos=QOS_2)
    )
    logging.info("messages published")
    await client.disconnect()

if __name__ == '__main__':
    formatter = "[%(asctime)s] %(name)s {%(filename)s:%(lineno)d} %(levelname)s - %(message)s"
    logging.basicConfig(level=logging.INFO, format=formatter)
    asyncio.run(test_pub())

テスト

メッセージのパブリッシュ

MQTTメッセージパブリッシュコードを実行すると、クライアントが正常に接続され、メッセージが正常にパブリッシュされたことが確認できます。

hbmqtt_pub.png

以下は、MQTTXクライアントがHBMQTTクライアントによってパブリッシュされたメッセージを正常に受信したことを示しています。

mqttx_sub.png

サブスクライブ

MQTTメッセージサブスクリプションコードを実行すると、クライアントが正常に接続され、メッセージが届くのを待っていることが確認できます。

running_sub_py.png

MQTTXクライアントをbroker.emqx.ioに接続し、トピックa/bに10回メッセージを送信します。

pub_from_mqttx.png

ターミナルに戻ると、クライアントがメッセージを受信して印刷することが確認できます。また、10回のメッセージを受信した後、プログラムは自動的に終了します。

finished_sub_py.png

まとめ

これまでのところ、HBMQTTライブラリを公開MQTTブローカーに接続し、テストクライアントとMQTTブローカー間の接続、メッセージのパブリッシュ、サブスクライブを実装しました。Pythonの非同期IOを使用してメッセージの送受信を行うことで、より効果的なMQTTクライアントを実装することができます。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?