この記事について
shadowとwillとretainを活用してAWS IoT CoreとPubSubする②AWSリソースの構築で構築した環境を使ってラズパイGW broker <-> AWS IoT間でのMQTT通信を始めます。
ここから3回に分けてpaho-mqttライブラリーでmessageのpubsub、shadowのupdate/get、retainとwillの設定をします。
今回はラズパイに接続された仮想のdevice_0のシグナルをAWS IoTとpubsubします。
ライブラリー
GitHub
GitHub: eclipse/paho.mqtt.python
PyPi
開発者ガイド
Eclipse Paho™ MQTT Python クライアント
callback
コールバック(callback)とは、コンピュータプログラミングにおいて、あるサブルーチン(関数など)を呼び出す際に別のサブルーチンを途中で実行するよう指定する手法
コールバックの嬉しさの一つは、connect、disconnectなどのメソッド実行をトリガーにresponse、payload、errorなどを自分の好みに出力できることです。
paho-mqttで準備されているcallback関数のうち今回のpubsubでは3種類を使います。
client.on_connect
ブローカーが接続要求に応答したときに呼び出されるコールバック
client.on_connectインスタンスで呼び出す中身としてon_connectメソッドを準備します。
呼び出されたらrespons_codeをチェックして異常の場合に出力させます。
def on_connect(client, userdata, flags, respons_code):
if respons_code != 0:
print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}")
print('Connected')
client.on_message
クライアントがサブスクライブしているトピックでメッセージが受信されたときに呼び出されるコールバック
client.on_messageインスタンスで呼び出す中身としてon_messageメソッドを準備します。
呼び出されたらpayloadをdecodeして出力させます。
def on_message(client, userdata, msg):
print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")
on_disconnect
クライアントがブローカーから切断したときに呼び出されるコールバック
client.on_disconnectインスタンスで呼び出す中身としてon_disconnectメソッドを準備します。
切断の正常・異常を出力させます。
def on_disconnect(client, userdata, respons_code):
if respons_code != 0:
print(f"Unexpected disconnection.")
else:
print(f"Disconnected successfully.")
customメソッド
自分使い用にまとめ直したメソッドに再構築して使います。
wifi接続判定
初期のころから常用しているメソッドです。
そもそも本来備わった手段がありますが、連続運用する際に起動時wifi接続出来なかったら自律的にリトライするために今でも使っています。
def get_ssid():
cmd = 'iwconfig wlan0|grep ESSID'
r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip()
idx = r.find('ESSID:')
if r[idx + 7:-1] == "ff/an":
print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}")
time.sleep(120)
subprocess.call(["sudo", "reboot"])
MQTTクライアント作成
一意のclient idでAWS IoTに接続するクライアントobjectを作成します。
証明書と秘密鍵で認証されます。MQTTはversion 3.1.1、TLSは1.2を使用します。
def mqtt_init(clientId):
try:
client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311)
client.tls_set(
ca,
certfile=cert,
keyfile=key,
tls_version=ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
except Exception as e:
print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}")
return client
MQTT接続
mqtt_init()で作成したクライアントでAWS IoTに接続します。
1つのコード中でsubscriberを立ち上げておく為には、ループを作る前に、接続時subscriberとon_messageを呼び出しておく必要があります。
過去に他の方の記事でも散見した記憶がありますがはまりポイントだと思います。
【訂正:2024.6.30】試行錯誤の結果の思い込みでした。必要なのはclientの作成->接続の確立->subscriber起動の順番です。最初の組み合わせでも可能ですが、本稿での説明ではmain()の中でsub()メソッドを呼び出す構成に変更します。
通常はこのメソッドの中でループ開始が呼び出されることが多いですが、本稿では一つのフローでpubsubしたいのでこのメソッドから外します。
def mqtt_connect(client):
client.on_connect = on_connect
client.on_message = on_message
client.connect(endpoint, port, keepalive=60)
切断
client idによる接続をdisconnectメソッドで切断します。
disconnectメソッドでの送信ナシでの接続は異常終了と認識されます。この切断時の正常・異常の違いを使ってwillの送信が行われます。
def mqtt_disconnect(client):
client.on_disconnect = on_disconnect
client.disconnect()
client.loop_stop()
topic発行(publish)
いわゆるpubです。「topicを発行」「topicを購読」という言い方に慣れるまで相当時間がかかりました苦笑
今回は仮想のホスト名:MQTTServer1のラズパイGW/brokerから、device_0のシグナル:msgを以下のメソッドでAWS IoTにpubします。
説明を端折りましたがqos=1で。
def mqtt_publish(client, msg):
data = {}
_topic = topic + "/" + str(device_no)
print("Publishing to topic:", _topic)
data['Timestamp'] = int(time.time())
data['hostname'] = os.uname()[1]
data['device_no'] = device_no
data['msg'] = msg
client.publish(_topic, json.dumps(data, default=json_serial), qos=1)
print(_topic, data)
def json_serial(para):
return para.isoformat()
topic購読(subcrribe)
いわゆるsubです。
以下のメソッドで自分が接続しているMQTTネットワーク内でtopicにpubされたメッセージをsubします。
def mqtt_subscribe(client):
_topic = topic + "/" + str(device_no)
print("Subscribing to topic:", _topic)
client.subscribe(_topic, qos=1)
ラズパイGW <-> AWS IoT でpubsub
AWS IoTとport 8883で接続するための認証情報とラズパイの識別子(client id=hostname)を準備します。
topicはベースとなる階層を固定します。この下にdevice Noであったりprocess idであったりサブの階層をつなげます。
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'
device_no = 0
topic = f'MQTTServer/MQTTServer1'
今回、device_0のシグナルは以下とします。
こんなシグナル出すdeviceって一体…ヨシッとして下さいm(__)m
msg = "AWS IoT 接続ヨシッ👉"
ここまでで準備した認証情報とメソッドを組み合わせるとラズパイとAWS IoTが接続されたNWでpubsubが可能になります。
以下を実行すると接続後にsubscriberが待ち受けした状態となり、pubされた後に自分自身でもmessageをsubして回線を切断します。
上手くいかないときは1~3秒くらいのtime.sleepを挟むとpubsubが出来ると思います。
clientid = f"{hostname}"
time.sleep(5)
try:
#wifi接続チェック
get_ssid()
#クライアント作成
client = mqtt_init(clientid)
#client idで接続
mqtt_connect(client)
#sub開始
mqtt_subscribe(client)
#クライアント接続ループ開始
client.loop_start()
#message発行
mqtt_publish(client, msg)
#切断
mqtt_disconnect(client)
except KeyboardInterrupt:
mqtt_disconnect(client)
time.sleep(3)
sys.exit()
Demo code 全体
コードを全てつなげると以下のようになります。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import json
import ssl
import time
import subprocess
import paho.mqtt.client as mqtt
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'
device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT 接続ヨシッ👉"
def on_connect(client, userdata, flags, respons_code):
if respons_code != 0:
print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}")
print('Connected')
def on_disconnect(client, userdata, respons_code):
if respons_code != 0:
print(f"Unexpected disconnection.")
else:
print(f"Disconnected successfully.")
def on_message(client, userdata, msg):
print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")
return
def get_ssid():
cmd = 'iwconfig wlan0|grep ESSID'
r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip()
idx = r.find('ESSID:')
if r[idx + 7:-1] == "ff/an":
print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}")
time.sleep(120)
subprocess.call(["sudo", "reboot"])
def mqtt_init(clientId):
try:
client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311)
client.tls_set(
ca,
certfile=cert,
keyfile=key,
tls_version=ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
except Exception as e:
print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}")
return client
def mqtt_connect(client):
client.on_connect = on_connect
client.on_message = on_message
client.connect(endpoint, port, keepalive=60)
def mqtt_disconnect(client):
client.on_disconnect = on_disconnect
client.disconnect()
client.loop_stop()
def mqtt_publish(client, msg):
data = {}
_topic = topic + "/" + str(device_no)
print("Publishing to topic:", _topic)
data['Timestamp'] = int(time.time())
data['hostname'] = os.uname()[1]
data['device_no'] = device_no
data['msg'] = msg
client.publish(_topic, json.dumps(data, default=json_serial), qos=1)
print(_topic, data)
return
def mqtt_subscribe(client):
_topic = topic + "/" + str(device_no)
print("Subscribing to topic:", _topic)
client.subscribe(_topic, qos=1)
return
def json_serial(para):
return para.isoformat()
if __name__ == '__main__':
clientid = f"{hostname}"
time.sleep(5)
try:
get_ssid()
client = mqtt_init(clientid)
mqtt_connect(client)
mqtt_subscribe(client)
client.loop_start()
mqtt_publish(client, msg)
mqtt_disconnect(client)
except KeyboardInterrupt:
mqtt_disconnect(client)
time.sleep(3)
sys.exit()
実行結果
~ $ python pubsub_paho.py
・
・
Subscribing to topic: MQTTServer/MQTTServer1/0
Publishing to topic: MQTTServer/MQTTServer1/0
Connected
MQTTServer/MQTTServer1/0 {'Timestamp': 1719403174, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Received message: {"Timestamp": 1719403174, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
on topic: MQTTServer/MQTTServer1/0
with QoS: 1
Disconnected successfully.
~ $
AWS IoTのテストクライアント上でもsubscribeを確認できました。
次回
shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update