0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow update

Last updated at Posted at 2024-06-27

この記事について

shadowとwillとretainを活用してAWS IoT CoreとPubSubする③pahoでpubsubで行ったmessageのpubsubにshadowのupdate/getを追加します。

今回はラズパイに接続された仮想device_0の正常稼働とdevice停止をAWS IoTのdevice shadowに反映させます。


shadowのtopic

Device shadowはAWS IoTで用意されているサービスの一種で、一般的なMQTTメソッドで準備されているライブラリーに専用メソッドは含まれていません。

AWS IoT Coreでshadowの各種イベントが発生する際に使われる11種類のtopicはシステム予約で決められています。

今回使っているhostnameの場合、shadowのtopicは下表のようになります。

この中の/update/documentsと/update/deltaを使ってshdowとshadow deltaのupdateをキャッチするcustom callbackを作成します。

名前 アクション MQTT トピック
/get 発行 $aws/things/MQTTServer1/shadow/name/device_0/get 
/get/accepted サブスクライブ $aws/things/MQTTServer1/shadow/name/device_0/get/accepted 
/get/rejected サブスクライブ $aws/things/MQTTServer1/shadow/name/device_0/get/rejected 
/update 発行 $aws/things/MQTTServer1/shadow/name/device_0/update 
/update/delta サブスクライブ $aws/things/MQTTServer1/shadow/name/device_0/update/delta 
/update/accepted サブスクライブ $aws/things/MQTTServer1/shadow/name/device_0/update/accepted 
/update/documents サブスクライブ $aws/things/MQTTServer1/shadow/name/device_0/update/documents 
/update/rejected サブスクライブ $aws/things/MQTTServer1/shadow/name/device_0/update/rejected 
/delete 発行 $aws/things/MQTTServer1/shadow/name/device_0/delete 
/delete/accepted サブスクライブ $aws/things/MQTTServer1/shadow/name/device_0/accepted 
/delete/rejected サブスクライブ $aws/things/MQTTServer1/shadow/name/device_0/rejected

callback

custom callback

shadow用のcallbackが用意されていないのでpaho-mqttのclient.message_callback_add関数を使いcustom callbackを作成します。

message_callback_add(sub : str , callback : Callable)
特定のトピックのメッセージ コールバックを登録します。 'sub' に一致するメッセージは 'callback' に渡されます。一致しないメッセージはデフォルトのon_messageコールバックに渡されます。

複数のトピック固有のコールバックを定義するには、異なる「sub」を使用して複数回呼び出します。

トピック固有のコールバックはmessage_callback_remove()で削除できます。

shdow updateのsubscribe用callback

/update/documents

AWS IoT は、シャドウの更新が正常に実行されるたびに、このトピックに状態ドキュメントを公開します。

shdow updateをキャッチするには/update/documentsが利用できます。

今回のホスト名でdevice_0について/update/documentsをsubする場合topicは次のようになります。

$aws/things/MQTTServer1/shadow/name/device_0/update/documents 

/update/documentsで呼び出すcallbackの中身としてon_shadow_updateメソッドを準備します。

機能はon_messageと同じですが、shadowがupdateされた時だけpayloadをdecodeして出力するのにメソッドを分けます。

def on_shadow_update(client, userdata, msg):
    payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
    print("Shadow Update detected: " + payload_decoded)

他のcallbackと同様に、接続時にon_shadow_update()を設定しておくことでupdate時にcallbackの出力が得られます。

def mqtt_connect(client):
    .
    .
    client.message_callback_add(f"$aws/things/MQTTServer1/shadow/name/device_0/update/documents", on_shadow_update)

shdow delta updateのsubscribe用callback

/update/delta

shdow delta updateをキャッチするには/update/deltaが利用できます。

AWS IoTは、デバイスのシャドウの変更を受け入れると、このトピックに応答状態ドキュメントを公開します。また、リクエスト状態ドキュメントには、desiredとreportedの値とさまざまな状態が含まれます。

とありますが、responseを見るとdesiredのみが返されます。当然ですがdeltaが発生(更新)していないときはこのresponseは返されません。

今回のホスト名でdevice_0について/update/deltaをsubする場合topicは次のようになります。

$aws/things/MQTTServer1/shadow/name/device_0/update/delta  

/update/deltaで呼び出すcallbackの中身としてon_shadowdelta_updateメソッドを準備します。

機能はon_messageと同じですが、shadowのdeltaがupdateされた時だけpayloadをdecodeして出力するのにメソッドを分けます。

def on_shadowdelta_update(client, userdata, msg):
    payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
    print("Shadow Delta Update detected: " + payload_decoded)

他のcallbackと同様に、接続時にon_shadowdelta_update()を設定しておくことでupdate時にcallbackの出力が得られます。

def mqtt_connect(client):
    .
    .
    .
    client.message_callback_add(f"$aws/things/MQTTServer1/shadow/name/device_0/update/delta", on_shadowdelta_update)

customメソッド

shadow_update()

desiredとreportedの状態をjsonに設定してpubするメソッドです。desired stateを固定してreported stateを変化させて後ほどdelta発生時の動作を確認します。

stateは…最後までこの路線で行きますヨシッ👉

desired_state = "ヨシッ👉"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"

def shadow_update(client, state):
    topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update"
    print(topic)
    payload = {
        "state": {
            "desired": {
                "device_no": device_no,
                "state": desired_state
            },
            "reported": {
                "device_no": device_no,
                "state": state_ok #state_ng
            }
        }
    }
    client.publish(topic, json.dumps(payload), qos=1)
    print(f"Shadow update sent: {payload}")

shadow_get()

updateされたshadowをgetします。今回は確認用で使います。

def shadow_get(client):
    shadow_update_topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update/documents"
    print("Subscribing to shadow update topic:", shadow_update_topic)
    client.subscribe(shadow_update_topic, qos=1)

shadowdelta_get()

shadowのdeltaがupdateされた場合にgetします。確認用で使います。

def shadowdelta_get(client):
    shadow_update_topic = f"$aws/things/MQTTServer1/shadow/name/device_0/update/delta"
    print("Subscribing to shadow delta update topic:", shadow_update_topic)
    client.subscribe(shadow_update_topic, qos=1)

main()

作成したメソッドをmainに追加します。

main()
if __name__ == '__main__':
    clientid = f"{hostname}"
    state_ok = "ヨシッ👉"
    state_ng = "ダメ🙅"
    time.sleep(5)
    try:
        #wifi接続チェック
        get_ssid()
        #クライアント作成
        client = mqtt_init(clientid)
        #client idで接続
        mqtt_connect(client)

        #sub開始
        subscribe_pub(client)
        time.sleep(1)
        shadow_get(client)
        time.sleep(1)
        shadowdelta_get(client)
        time.sleep(1)
        
        #クライアント接続ループ開始
        client.loop_start()
        time.sleep(1)

        #message発行
        mqtt_publish(client, msg)
        #shadow update
        shadow_update(client, state_ng)
 
        #切断
        mqtt_disconnect(client)
    except KeyboardInterrupt:
        aws_disconnect(client)
        time.sleep(3)
        sys.exit()

Demo code 全体

前回のコードに今回作成したcallbackとメソッドを追加すると以下のようになります。

pubsub_shadow_paho.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import json
import ssl
import time
import subprocess
import paho.mqtt.client as mqtt


endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'

device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT 接続ヨシッ👉"
desired_state = "ヨシッ👉"

def on_connect(client, userdata, flags, respons_code):
    if respons_code != 0:
        print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}")
    print('Connected')

def on_disconnect(client, userdata, respons_code):
    if respons_code != 0:
        print(f"Unexpected disconnection.")
    else:
        print(f"Disconnected successfully.")

def on_message(client, userdata, msg):
    print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")
    return

def on_shadow_update(client, userdata, msg):
    payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
    print("Shadow Update detected: " + payload_decoded)

def on_shadowdelta_update(client, userdata, msg):
    payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
    print("Shadow Delta Update detected: " + payload_decoded)

def get_ssid():
    cmd = 'iwconfig wlan0|grep ESSID'
    r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip()
    idx = r.find('ESSID:')
    if r[idx + 7:-1] == "ff/an":
        print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}")
        time.sleep(120)
        subprocess.call(["sudo", "reboot"])

def mqtt_init(clientId):
    try:
        client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311)
        client.tls_set(
            ca,
            certfile=cert,
            keyfile=key,
            tls_version=ssl.PROTOCOL_TLSv1_2)
        client.tls_insecure_set(True)
    except Exception as e:
        print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}")  
    return client

def mqtt_connect(client):
    client.on_connect = on_connect
    client.on_message = on_message
    client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents", on_shadow_update)
    client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta", on_shadowdelta_update)
    client.connect(endpoint, port, keepalive=60)
    time.sleep(1)

def mqtt_disconnect(client):
    client.on_disconnect = on_disconnect
    client.disconnect()
    client.loop_stop()

def mqtt_publish(client, msg):
    data = {}
    _topic = topic + "/" + str(device_no)
    print("Publishing to topic:", _topic)
    data['Timestamp'] = int(time.time())
    data['hostname'] = os.uname()[1]
    data['device_no'] = device_no
    data['msg'] = msg
    client.publish(_topic, json.dumps(data, default=json_serial), qos=1)
    print(_topic, data)
    return

def mqtt_subscribe(client):
    _topic = topic + "/" + str(device_no)
    print("Subscribing to topic:", _topic)
    client.subscribe(_topic, qos=1)
    return

def json_serial(para):
    return para.isoformat()

def shadow_update(client, state):
    topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update"
    print(topic)
    payload = {
        "state": {
            "desired": {
                "device_no": device_no,
                "state": desired_state
            },
            "reported": {
                "device_no": device_no,
                "state": state
            }
        }
    }
    client.publish(topic, json.dumps(payload), qos=1)
    print(f"Shadow update sent: {payload}")

def shadow_get(client):
    shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents"
    print("Subscribing to shadow update topic:", shadow_update_topic)
    client.subscribe(shadow_update_topic, qos=1)

def shadowdelta_get(client):
    shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta"
    print("Subscribing to shadow delta update topic:", shadow_update_topic)
    client.subscribe(shadow_update_topic, qos=1)


if __name__ == '__main__':
    clientid = f"{hostname}"
    state_ok = "ヨシッ👉"
    state_ng = "ダメ🙅"
    time.sleep(5)
    try:
        get_ssid()
        client = mqtt_init(clientid)
        mqtt_connect(client)
        subscribe_pub(client)
        time.sleep(1)
        shadow_get(client)
        time.sleep(1)
        shadowdelta_get(client)
        time.sleep(1)
        client.loop_start()
        time.sleep(1)

        publish_pub(client, msg)
        shadow_update(client, state_ng)

        time.sleep(1)

        mqtt_disconnect(client)
    except KeyboardInterrupt:
        mqtt_disconnect(client)
        time.sleep(3)
        sys.exit()

実行結果

OK state

state_okをreported stateとしてupdateします。

state_ok = "ヨシッ👉"

下図の状態です。

topicは正しく設定されています。

Subscribing to topic: MQTTServer/MQTTServer1/0
Subscribing to shadow update topic: $aws/things/MQTTServer1/shadow/name/device_0/update/documents
Subscribing to shadow delta update topic: $aws/things/MQTTServer1/shadow/name/device_0/update/delta
Publishing to topic: MQTTServer/MQTTServer1/0
Connected

正常にpubsubを実行してます。

Message pblished : {'Timestamp': 1719502382, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Received message: {"Timestamp": 1719502382, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}

正常にshadowのupdateを実行してます。

$aws/things/MQTTServer1/shadow/name/device_0/update
Shadow update sent: {'state': {'desired': {'device_no': 0, 'state': 'ヨシッ👉'}, 'reported': {'device_no': 0, 'state': 'ヨシッ👉'}}}

shadowのupdateをgetできました。

Shadow Update detected: {"previous": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ダメ🙅"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719502318}, "state": {"timestamp": 1719502318}}, "reported": {"device_no": {"timestamp": 1719502318}, "state": {"timestamp": 1719502318}}}, "version": 56}, "current": {"state": {"desired": {"device_no": 0, "state": " ヨシッ👉"}, "reported": {"device_no": 0, "state": "ヨシッ👉"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719502382}, "state": {"timestamp": 1719502382}}, "reported": {"device_no": {"timestamp": 1719502382}, "state": {"timestamp": 1719502382}}}, "version": 57}, "timestamp": 1719502382}

shadow deltaは出力されていません。

AWS IoTのテストクライアント上でも確認できました。

NG state

ラズパイで実行しているルーチンでdevice_0の異常停止を検知した、ような場面だと思ってください。

異常検知を受けてstate_ngをreported stateの値に代入してshadow updateします。

state_ng = "ダメ🙅"

下図の状態です。

reportedを「ダメ」でupdateできています。

$aws/things/MQTTServer1/shadow/name/device_0/update
Shadow update sent: {'state': {'desired': {'device_no': 0, 'state': 'ヨシッ👉'}, 'reported': {'device_no': 0, 'state': 'ダメ🙅'}}}

shadow updateをgetすると「ダメ」が確認できます。

Shadow Update detected: {"previous": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ヨシッ👉"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719507396}, "state": {"timestamp": 1719507396}}, "reported": {"device_no": {"timestamp": 1719507396}, "state": {"timestamp": 1719507396}}}, "version": 58}, "current": {"state": {"desired": {"device_no": 0, "state": "ヨシッ👉"}, "reported": {"device_no": 0, "state": "ダメ🙅"}}, "metadata": {"desired": {"device_no": {"timestamp": 1719508744}, "state": {"timestamp": 1719508744}}, "reported": {"device_no": {"timestamp": 1719508744}, "state": {"timestamp": 1719508744}}}, "version": 59}, "timestamp": 1719508744}

今度はshadow deltaのレスポンスを確認できます。

Shadow Delta Update detected: {"version": 59, "timestamp": 1719508744, "state": {"state": "ヨシッ👉"}, "metadata": {"state": {"timestamp": 1719508744}}}

AWS IoTのテストクライアント上でも確認できました。

このdeltaをトリガーとして自動実行ワークロードに組み込むことが可能です。

次回

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑤pahoでretainとwill

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?