はじめに
MQTTは、パブリッシュ/サブスクライブモデルに基づくIoT向けの軽量メッセージングプロトコルで、最小限のコードと帯域幅で信頼性の高いリアルタイム通信を提供します。リソースや帯域幅が限られたデバイスに特に有利であり、IoT、モバイルインターネット、IoV、電力産業などで広く採用されています。
Pythonはその汎用性、使いやすさ、豊富なライブラリによりIoT分野で広く使用されています。大量のデータを扱う能力により、スマートホームオートメーション、環境モニタリング、産業制御に理想的です。Pythonはマイクロコントローラとも互換性があり、IoTデバイスの開発に有用なツールです。
この記事では、paho-mqttクライアントを使用してPythonプロジェクトでMQTTクライアントとMQTTブローカー間の接続、サブスクライブ、メッセージングなどの機能を実装する方法を主に紹介します。
なぜPaho MQTT Pythonクライアントを選ぶ理由
Paho Pythonクライアントは、Python 2.7または3.x上でMQTT v5.0、MQTT v3.1.1、およびv3.1をサポートするクライアントクラスを提供しています。また、MQTTサーバーにワンオフメッセージを簡単にパブリッシュするためのヘルパー関数も提供しています。
Pythonコミュニティで最も人気のあるMQTTクライアントライブラリであるPaho MQTT Pythonクライアントには、以下のような利点があります:
- オープンソースでコミュニティサポートされている。
- MQTTサーバーへの接続やMQTTメッセージのパブリッシュ/サブスクライブに対して使いやすいAPI。
- 様々なセキュリティメカニズムをサポート。
- IoTの急速な進化に合わせて積極的に開発・保守されている。
PythonのMQTTクライアントライブラリをさらに探求したい場合は、Python MQTTクライアントの比較ブログ記事をチェックしてください。
Python MQTTプロジェクトの準備
Pythonバージョン
このプロジェクトはPython 3.6を使用して開発およびテストされています。インストールされているPythonのバージョンを確認するには、次のコマンドを使用できます。
$ python3 --version
Python 3.6.7
Paho MQTTクライアントのインストール
Pipを使用してpaho-mqttライブラリをインストールします。
pip3 install paho-mqtt
Pipのインストールに関するヘルプが必要な場合は、公式ドキュメント https://pip.pypa.io/en/stable/installation/ を参照してください。このリソースには、異なるオペレーティングシステムと環境でPipをインストールするための詳細な手順が記載されています。
MQTTブローカーの準備
EMQX Cloud は、大量のIoTデバイスとさまざまなデータベースやビジネスシステムを接続できる完全に管理されたクラウドネイティブのMQTTサービスです。EMQX Cloudを使用すると、数分でMQTTサービスを開始し、AWS、Google Cloud、Microsoft Azureの20以上のリージョンで実行することができ、全世界での可用性と高速な接続を保証します。
この記事では、プロセスを簡素化するために無料の公開MQTTブローカーを使用します:
- サーバー:
broker.emqx.io
- TCPポート:
1883
- WebSocketポート:
8083
- SSL/TLSポート:
8883
- セキュアWebSocketポート:
8084
Paho MQTT Pythonクライアントの使用方法
Paho MQTTクライアントのインポート
from paho.mqtt import client as mqtt_client
MQTT接続の作成
TCP接続
MQTT接続には、ブローカーのアドレス、ポート、トピックを指定する必要があります。さらに、Pythonのrandom.randint関数を使用して接続用のランダムなクライアントIDを生成できます。
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
# username = 'emqx'
# password = 'public'
詳細は、ブログ記事MQTT接続を確立する際のパラメータ設定方法をご覧ください。
次に、ブローカーへの接続に使用するon_connect
コールバック関数を記述します。この関数はクライアントが正常に接続した後に呼び出され、rc
パラメータを使用して接続状態を確認します。通常、同時にbroker.emqx.io
へのクライアントオブジェクトを作成します。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("MQTTブローカーに接続しました!")
else:
print("接続に失敗しました、リターンコード %d\n", rc)
# 接続するクライアントIDを設定
client = mqtt_client.Client(client_id)
# client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
自動再接続
MQTTクライアントライブラリの自動再接続機能は、不安定なネットワーク状況下でのデバイスとブローカー間の信頼性のある通信を人間の介入なしに保証します。これにより、ネットワーク接続が中断された場合やブローカーが一時的に利用できない場合に、クライアントがトピックのパブリッシュやサブスクライブを再開できるようになります。これは自動車システムや医療機器などの高信頼性が求められるアプリケーションにとって重要です。
Paho MQTTクライアントの自動再接続コードは以下の通りです:
FIRST_RECONNECT_DELAY = 1
RECONNECT_RATE = 2
MAX_RECONNECT_COUNT = 12
MAX_RECONNECT_DELAY = 60
def on_disconnect(client, userdata, rc):
logging.info("切断されました、結果コード: %s", rc)
reconnect_count, reconnect_delay = 0, FIRST_RECONNECT_DELAY
while reconnect_count < MAX_RECONNECT_COUNT:
logging.info("%d秒後に再接続します...", reconnect_delay)
time.sleep(reconnect_delay)
try:
client.reconnect()
logging.info("再接続に成功しました!")
return
except Exception as err:
logging.error("%s。再接続に失敗しました。再試行します...", err)
reconnect_delay *= RECONNECT_RATE
reconnect_delay = min(reconnect_delay, MAX_RECONNECT_DELAY)
reconnect_count += 1
logging.info("%s回の試行後に再接続に失敗しました。終了します...", reconnect_count)
その後、クライアントオブジェクトのon_disconnect
として設定します。
client.on_disconnect = on_disconnect
クライアントの自動再接続の完全なコードは、GitHubで見ることができます。
TLS/SSL
MQTTでTLSを使用することで、情報の機密性と完全性を保ち、情報漏洩や改ざんを防ぐことができます。TLS認証は、片方向認証と双方向認証に分類されます。
単方向認証
Paho MQTTクライアントの片方向認証コードは以下の通りです:
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.tls_set(ca_certs='./broker.emqx.io-ca.crt')
双方向認証
Paho MQTTクライアントの双方向認証コードは以下の通りです:
def connect_mqtt():
client = mqtt_client.Client(CLIENT_ID)
client.tls_set(
ca_certs='./server-ca.crt',
certfile='./client.crt',
keyfile='./client.key'
)
メッセージのパブリッシュ
トピック/python/mqtt
に毎秒メッセージを送信し、5つのメッセージを送信した後にループを終了するwhileループを作成します。
def publish(client):
msg_count = 1
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
if msg_count > 5:
break
サブスクライブ
MQTTブローカーからメッセージを受信した際にトリガーされるメッセージコールバック関数on_message
を作成します。この関数では、サブスクライブしたトピック名と受信したメッセージを出力します。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
Python MQTTの完全なコード例
MQTTメッセージのパブリッシュ用コード
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
client_id = f'publish-{random.randint(0, 1000)}'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 1
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
if msg_count > 5:
break
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
client.loop_stop()
if __name__ == '__main__':
run()
MQTTサブスクリプション用コード
# python3.6
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
client_id = f'subscribe-{random.randint(0, 100)}'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
テスト
サブスクライブ
MQTTサブスクリプションスクリプトsub.py
を実行すると、クライアントが成功裏に接続され、パブリッシャーがメッセージをパブリッシュするのを待ち始めたことが確認できます。
python3 sub.py
メッセージのパブリッシュ
MQTTメッセージパブリッシュスクリプトpub.py
を実行すると、クライアントが成功裏に接続し、5つのメッセージをパブリッシュすることが確認できます。同時に、sub.py
も5つのメッセージを正常に受信します。
python3 pub.py
Paho MQTT PythonクライアントについてのQ&A
loop_stop()が実行されない場合、どうなりますか?
loop_stop()
メソッドは、MQTTクライアントのメッセージループを停止し、停止状態としてマークするために使用されます。このプロセスにより、クライアントの優雅なシャットダウンを保証し、メッセージの損失、接続リーク、異常なプログラム動作などのリスクを減少させます。
例えば、この記事で提供されているpub.pyの例で、client.loop_stop()
メソッドを削除すると、sub.py
スクリプトが5つ未満のメッセージを受信する可能性があります。
したがって、MQTTクライアントの優雅なシャットダウンを保証し、未閉じの接続によって生じる可能性のある問題を防ぐために、loop_stop()メソッドを適切に使用することが重要です。
connect_async()はどのような場合に使用されますか?
connect_async()
は、MQTTクライアントアプリケーションが長期間のMQTT接続を必要とする場合や、メインスレッドをブロックせずにMQTT接続をバックグラウンドで維持する必要がある場合に役立ちます。主な使用例は以下の通りです:
-
長期間のMQTT接続:
connect_async()
は、長期間のMQTT接続が必要な産業用アプリケーションなどで、MQTTクライアントアプリケーションの停止や反応不良を防ぎます。 -
不安定なネットワーク接続:不安定なネットワーク接続環境で
connect_async()
を使用すると、再接続や遅延で接続を確立することでアプリケーションの信頼性が向上します。 -
頻繁な接続とパラメータ変更:接続パラメータや他の設定が頻繁に変更される場合、
connect_async()
はアプリケーションの応答性を向上させ、スタッタリングを防ぎます。 -
バックグラウンドでのMQTT接続:
connect_async()
は、アプリケーションが他のプロセスを実行している間にMQTT接続行します。
続けると、connect_async()
は、アプリケーションが他のプロセスを実行している間にMQTT接続をバックグラウンドで確立することを可能にし、ユーザーエクスペリエンスを向上させます。
まとめ
これまでに、paho-mqttクライアントを使用して無料の公開MQTTブローカーに接続する方法を説明しました。テストクライアントからブローカーへのpublish()
メソッドを使用したメッセージ送信および、ブローカーからのメッセージをsubscribe()
メソッドを使用してサブスクライブするプロセスを正常に実装しました。
次に、EMQが提供するMQTTガイド:初心者から上級者までシリーズをチェックして、MQTTプロトコルの特徴を学び、MQTTのより高度なアプリケーションを探求し、MQTTアプリケーションおよびサービス開発を始めることができます。