pythonにおけるmqttのクライアントであるpaho-mqttの使い方、そしてクライアントとMQTTブローカー間の接続、publish/subscribeなどの機能をPythonで実装する方法を紹介します。
準備
今回はPython 3.6を使います。以下のコマンドでPythonのバージョンを確認できます。
➜ ~ python3 --version
Python 3.6.7
paho (mqttクライアント) について
Paho Python Clientには、Python 2.7または3.x上でMQTT v3.1とv3.1.1をサポートするクライアントのクラスが用意されています。また、MQTTサーバーに単発でpublishするようなヘルパー関数も用意されています。
pahoのインストール
pip3 install paho-mqtt
pipは、Pythonパッケージの管理ツールです。pipを使って、Pythonパッケージの検索、ダウンロード、インストール、アンインストールを行えます。
PythonでMQTTを使う
MQTTブローカーに接続
EMQ Xが提供しているテスト用のMQTTブローカーを使用します(このブローカーは、EMQ X Cloudをベースに運用されています)。ブローカーのアクセス情報は以下の通りです。接続先は以下の通りです。
- Host: broker.emqx.io
- TCP Port: 1883
- Websocket Port: 8083
pahoをimport
from paho.mqtt import client as mqtt_client
接続のパラメータ設定
ブローカーに接続するためのアドレス、ポート、トピックを設定します。今回は、Pythonの関数random.randint
を使って、Client IDをランダムに生成します。
broker = 'broker.emqx.io'
port = 1883
topic = "/python/mqtt"
client_id = f'python-mqtt-{random.randint(0, 1000)}'
# username = 'emqx'
# password = 'public'
接続する関数を書く
on_connect
はコールバック関数です。この関数は、クライアントが接続した後に呼び出されます。この関数にあるrc
(return code) によって、クライアントが正常に接続したかどうかを判断することができます。このコールバック関数を使ってMQTTクライアントを作成し、broker.emqx.io
に接続するクライアントを作ります。
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
# Set Connecting Client ID
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
publish (送信)
上記のクライアントを引数に取るpublish関数を作ります。今回は、while
ループを使って1秒ごとにトピック/python/mqtt
にメッセージを送信するようにします。
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
subscribe (受信)
コールバック関数on_message
を書きます。この関数は、クライアントがブローカーからメッセージを受信したときに呼び出されます。今回は、subscribeしたトピックの名前と受信したメッセージを出力します。
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
スクリプトを完成させる
メッセージをpublishするスクリプト
# python 3.6
import random
import time
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 1000)}'
# username = 'emqx'
# password = 'public'
def connect_mqtt():
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def publish(client):
msg_count = 0
while True:
time.sleep(1)
msg = f"messages: {msg_count}"
result = client.publish(topic, msg)
# result: [0, 1]
status = result[0]
if status == 0:
print(f"Send `{msg}` to topic `{topic}`")
else:
print(f"Failed to send message to topic {topic}")
msg_count += 1
def run():
client = connect_mqtt()
client.loop_start()
publish(client)
if __name__ == '__main__':
run()
メッセージをsubscribeするスクリプト
# python3.6
import random
from paho.mqtt import client as mqtt_client
broker = 'broker.emqx.io'
port = 1883
topic = "python/mqtt"
# generate client ID with pub prefix randomly
client_id = f'python-mqtt-{random.randint(0, 100)}'
# username = 'emqx'
# password = 'public'
def connect_mqtt() -> mqtt_client:
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("Connected to MQTT Broker!")
else:
print("Failed to connect, return code %d\n", rc)
client = mqtt_client.Client(client_id)
client.username_pw_set(username, password)
client.on_connect = on_connect
client.connect(broker, port)
return client
def subscribe(client: mqtt_client):
def on_message(client, userdata, msg):
print(f"Received `{msg.payload.decode()}` from `{msg.topic}` topic")
client.subscribe(topic)
client.on_message = on_message
def run():
client = connect_mqtt()
subscribe(client)
client.loop_forever()
if __name__ == '__main__':
run()
実行
publish
python3 pub.py
subscibe
python3 sub.py
おわりに
この記事は How to use MQTT in Python (Paho)の翻訳です。