この記事について
shadowとwillとretainを活用してAWS IoT CoreとPubSubする③pahoでpubsubで行ったmessageのpubsubにshadowのupdate/getを追加します。
今回はラズパイに接続された仮想device_0の正常稼働とdevice停止をAWS IoTのdevice shadowに反映させます。
shadowのtopic
Device shadowはAWS IoTで用意されているサービスの一種で、一般的なMQTTメソッドで準備されているライブラリーに専用メソッドは含まれていません。
AWS IoT Coreでshadowの各種イベントが発生する際に使われる11種類のtopicはシステム予約で決められています。
今回使っているhostnameの場合、shadowのtopicは下表のようになります。
この中の/update/documentsと/update/deltaを使ってshdowとshadow deltaのupdateをキャッチするcustom callbackを作成します。
名前 | アクション | MQTT トピック |
---|---|---|
/get | 発行 | $aws/things/MQTTServer1/shadow/name/device_0/get |
/get/accepted | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/get/accepted |
/get/rejected | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/get/rejected |
/update | 発行 | $aws/things/MQTTServer1/shadow/name/device_0/update |
/update/delta | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/update/delta |
/update/accepted | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/update/accepted |
/update/documents | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/update/documents |
/update/rejected | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/update/rejected |
/delete | 発行 | $aws/things/MQTTServer1/shadow/name/device_0/delete |
/delete/accepted | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/accepted |
/delete/rejected | サブスクライブ | $aws/things/MQTTServer1/shadow/name/device_0/rejected |
callback
custom callback
shadow用のcallbackが用意されていないのでpaho-mqttのclient.message_callback_add関数を使いcustom callbackを作成します。
message_callback_add(sub : str , callback : Callable)
特定のトピックのメッセージ コールバックを登録します。 'sub' に一致するメッセージは 'callback' に渡されます。一致しないメッセージはデフォルトのon_messageコールバックに渡されます。複数のトピック固有のコールバックを定義するには、異なる「sub」を使用して複数回呼び出します。
トピック固有のコールバックはmessage_callback_remove()で削除できます。
shdow updateのsubscribe用callback
AWS IoT は、シャドウの更新が正常に実行されるたびに、このトピックに状態ドキュメントを公開します。
shdow updateをキャッチするには/update/documentsが利用できます。
今回のホスト名でdevice_0について/update/documentsをsubする場合topicは次のようになります。
$aws/things/MQTTServer1/shadow/name/device_0/update/documents
/update/documentsで呼び出すcallbackの中身としてon_shadow_updateメソッドを準備します。
機能はon_messageと同じですが、shadowがupdateされた時だけpayloadをdecodeして出力するのにメソッドを分けます。
def on_shadow_update(client, userdata, msg):
payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
print("Shadow Update detected: " + payload_decoded)
他のcallbackと同様に、接続時にon_shadow_update()を設定しておくことでupdate時にcallbackの出力が得られます。
def mqtt_connect(client):
.
.
client.message_callback_add(f"$aws/things/MQTTServer1/shadow/name/device_0/update/documents", on_shadow_update)
shdow delta updateのsubscribe用callback
shdow delta updateをキャッチするには/update/deltaが利用できます。
AWS IoTは、デバイスのシャドウの変更を受け入れると、このトピックに応答状態ドキュメントを公開します。また、リクエスト状態ドキュメントには、desiredとreportedの値とさまざまな状態が含まれます。
とありますが、responseを見るとdesiredのみが返されます。当然ですがdeltaが発生(更新)していないときはこのresponseは返されません。
今回のホスト名でdevice_0について/update/deltaをsubする場合topicは次のようになります。
$aws/things/MQTTServer1/shadow/name/device_0/update/delta
/update/deltaで呼び出すcallbackの中身としてon_shadowdelta_updateメソッドを準備します。
機能はon_messageと同じですが、shadowのdeltaがupdateされた時だけpayloadをdecodeして出力するのにメソッドを分けます。
def on_shadowdelta_update(client, userdata, msg):
payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
print("Shadow Delta Update detected: " + payload_decoded)
他のcallbackと同様に、接続時にon_shadowdelta_update()を設定しておくことでupdate時にcallbackの出力が得られます。
def mqtt_connect(client):
.
.
.
client.message_callback_add(f"$aws/things/MQTTServer1/shadow/name/device_0/update/delta", on_shadowdelta_update)
customメソッド
shadow_update()
desiredとreportedの状態をjsonに設定してpubするメソッドです。desired stateを固定してreported stateを変化させて後ほどdelta発生時の動作を確認します。
stateは…最後までこの路線で行きますヨシッ👉
desired_state = "ヨシッ👉"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
def shadow_update(client, state):
topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update"
print(topic)
payload = {
"state": {
"desired": {
"device_no": device_no,
"state": desired_state
},
"reported": {
"device_no": device_no,
"state": state_ok #state_ng
}
}
}
client.publish(topic, json.dumps(payload), qos=1)
print(f"Shadow update sent: {payload}")
shadow_get()
updateされたshadowをgetします。今回は確認用で使います。
def shadow_get(client):
shadow_update_topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update/documents"
print("Subscribing to shadow update topic:", shadow_update_topic)
client.subscribe(shadow_update_topic, qos=1)
shadowdelta_get()
shadowのdeltaがupdateされた場合にgetします。確認用で使います。
def shadowdelta_get(client):
shadow_update_topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update/delta"
print("Subscribing to shadow delta update topic:", shadow_update_topic)
client.subscribe(shadow_update_topic, qos=1)
main()
作成したメソッドをmainに追加します。
if __name__ == '__main__':
clientid = f"{hostname}"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
time.sleep(5)
try:
#wifi接続チェック
get_ssid()
#クライアント作成
client = mqtt_init(clientid)
#client idで接続
mqtt_connect(client)
#sub開始
subscribe_pub(client)
time.sleep(1)
shadow_get(client)
time.sleep(1)
shadowdelta_get(client)
time.sleep(1)
#クライアント接続ループ開始
client.loop_start()
time.sleep(1)
#message発行
mqtt_publish(client, msg)
#shadow update
shadow_update(client, state_ng)
#切断
mqtt_disconnect(client)
except KeyboardInterrupt:
aws_disconnect(client)
time.sleep(3)
sys.exit()
Demo code 全体
前回のコードに今回作成したcallbackとメソッドを追加すると以下のようになります。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import json
import ssl
import time
import subprocess
import paho.mqtt.client as mqtt
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'
device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT 接続ヨシッ👉"
desired_state = "ヨシッ👉"
def on_connect(client, userdata, flags, respons_code):
if respons_code != 0:
print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}")
print('Connected')
def on_disconnect(client, userdata, respons_code):
if respons_code != 0:
print(f"Unexpected disconnection.")
else:
print(f"Disconnected successfully.")
def on_message(client, userdata, msg):
print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")
return
def on_shadow_update(client, userdata, msg):
payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
print("Shadow Update detected: " + payload_decoded)
def on_shadowdelta_update(client, userdata, msg):
payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
print("Shadow Delta Update detected: " + payload_decoded)
def get_ssid():
cmd = 'iwconfig wlan0|grep ESSID'
r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip()
idx = r.find('ESSID:')
if r[idx + 7:-1] == "ff/an":
print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}")
time.sleep(120)
subprocess.call(["sudo", "reboot"])
def mqtt_init(clientId):
try:
client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311)
client.tls_set(
ca,
certfile=cert,
keyfile=key,
tls_version=ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
except Exception as e:
print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}")
return client
def mqtt_connect(client):
client.on_connect = on_connect
client.on_message = on_message
client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents", on_shadow_update)
client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta", on_shadowdelta_update)
client.connect(endpoint, port, keepalive=60)
time.sleep(1)
def mqtt_disconnect(client):
client.on_disconnect = on_disconnect
client.disconnect()
client.loop_stop()
def mqtt_publish(client, msg):
data = {}
_topic = topic + "/" + str(device_no)
print("Publishing to topic:", _topic)
data['Timestamp'] = int(time.time())
data['hostname'] = os.uname()[1]
data['device_no'] = device_no
data['msg'] = msg
client.publish(_topic, json.dumps(data, default=json_serial), qos=1)
print(_topic, data)
return
def mqtt_subscribe(client):
_topic = topic + "/" + str(device_no)
print("Subscribing to topic:", _topic)
client.subscribe(_topic, qos=1)
return
def json_serial(para):
return para.isoformat()
def shadow_update(client, state):
topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update"
print(topic)
payload = {
"state": {
"desired": {
"device_no": device_no,
"state": desired_state
},
"reported": {
"device_no": device_no,
"state": state
}
}
}
client.publish(topic, json.dumps(payload), qos=1)
print(f"Shadow update sent: {payload}")
def shadow_get(client):
shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents"
print("Subscribing to shadow update topic:", shadow_update_topic)
client.subscribe(shadow_update_topic, qos=1)
def shadowdelta_get(client):
shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta"
print("Subscribing to shadow delta update topic:", shadow_update_topic)
client.subscribe(shadow_update_topic, qos=1)
if __name__ == '__main__':
clientid = f"{hostname}"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
time.sleep(5)
try:
get_ssid()
client = mqtt_init(clientid)
mqtt_connect(client)
subscribe_pub(client)
time.sleep(1)
shadow_get(client)
time.sleep(1)
shadowdelta_get(client)
time.sleep(1)
client.loop_start()
time.sleep(1)
publish_pub(client, msg)
shadow_update(client, state_ng)
time.sleep(1)
mqtt_disconnect(client)
except KeyboardInterrupt:
mqtt_disconnect(client)
time.sleep(3)
sys.exit()
実行結果
OK state
state_okをreported stateとしてupdateします。
state_ok = "ヨシッ👉"
下図の状態です。
topicは正しく設定されています。
Subscribing to topic: MQTTServer/MQTTServer1/0
Subscribing to shadow update topic: $aws/things/MQTTServer1/shadow/name/device_0/update/documents
Subscribing to shadow delta update topic: $aws/things/MQTTServer1/shadow/name/device_0/update/delta
Publishing to topic: MQTTServer/MQTTServer1/0
Connected
正常にpubsubを実行してます。
Message pblished : {'Timestamp': 1719502382, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Received message: {"Timestamp": 1719502382, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
正常にshadowのupdateを実行してます。
$aws/things/MQTTServer1/shadow/name/device_0/update
Shadow update sent: {'state': {'desired': {'device_no': 0, 'state': 'ヨシッ👉'}, 'reported': {'device_no': 0, 'state': 'ヨシッ👉'}}}
shadowのupdateをgetできました。
Shadow Update detected: {"previous": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ダメ🙅"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719502318}, "state": {"timestamp": 1719502318}}, "reported": {"device_no": {"timestamp": 1719502318}, "state": {"timestamp": 1719502318}}}, "version": 56}, "current": {"state": {"desired": {"device_no": 0, "state": " ヨシッ👉"}, "reported": {"device_no": 0, "state": "ヨシッ👉"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719502382}, "state": {"timestamp": 1719502382}}, "reported": {"device_no": {"timestamp": 1719502382}, "state": {"timestamp": 1719502382}}}, "version": 57}, "timestamp": 1719502382}
shadow deltaは出力されていません。
AWS IoTのテストクライアント上でも確認できました。
NG state
ラズパイで実行しているルーチンでdevice_0の異常停止を検知した、ような場面だと思ってください。
異常検知を受けてstate_ngをreported stateの値に代入してshadow updateします。
state_ng = "ダメ🙅"
下図の状態です。
reportedを「ダメ」でupdateできています。
$aws/things/MQTTServer1/shadow/name/device_0/update
Shadow update sent: {'state': {'desired': {'device_no': 0, 'state': 'ヨシッ👉'}, 'reported': {'device_no': 0, 'state': 'ダメ🙅'}}}
shadow updateをgetすると「ダメ」が確認できます。
Shadow Update detected: {"previous": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ヨシッ👉"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719507396}, "state": {"timestamp": 1719507396}}, "reported": {"device_no": {"timestamp": 1719507396}, "state": {"timestamp": 1719507396}}}, "version": 58}, "current": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ダメ🙅"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719508744}, "state": {"timestamp": 1719508744}}, "reported": {"device_no": {"timestamp": 1719508744}, "state": {"timestamp": 1719508744}}}, "version": 59}, "timestamp": 1719508744}
今度はshadow deltaのレスポンスを確認できます。
Shadow Delta Update detected: {"version": 59, "timestamp": 1719508744, "state": {"state": "ヨシッ👉"}, "metadata": {"state": {"timestamp": 1719508744}}}
AWS IoTのテストクライアント上でも確認できました。
このdeltaをトリガーとして自動実行ワークロードに組み込むことが可能です。
次回
shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑤pahoでretainとwill