0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑦awsiotsdkv2でshadow update

Last updated at Posted at 2024-06-30

この記事について

AWS IoT Device SDK v2 for Pythonでmessageのpubsubをしたコードにshadowのupdate/getを追加します。

ラズパイに接続された仮想device_0の正常稼働とdevice停止をAWS IoTのdevice shadowに反映させます。


shadowのtopic

paho-mqttでshadow update/getした時と同様になります。

callback

今回はshadowのgetter(subscriber)を起動する際に呼び出す5種類のcallbackを準備します。awsiotでshadow用sample提供されているsourceを簡略化して使います。

custom callbackを作る必要なし、特にshadow deltaのsubscriberが準備されてるところは刺さりました、何といってもAWS IoT Device SDKですから!

on_update_shadow_accepted

subscribe_to_update_named_shadow_accepted()で呼び出されるcallbackです。

updateが成功したshadowをgetします。

subscribe_to_update_named_shadow_accepted()

Subscribes to the accepted topic for the UpdateNamedShadow operation

subの対象になるtopicはこちらです。

topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/update/accepted'

callbackの実態として以下のメソッドを準備します。

def on_update_shadow_accepted(response):
    try:
        if hasattr(response, 'state'):
            print("shadow_on_update_accepted")
            print(f"Update state: {response.state}")
        if hasattr(response, 'metadata'):
            print(f"Metadata: {response.metadata}")
    except Exception as e:
        print(f"Error processing update shadow response: {e}")

on_shadow_update_rejected

subscribe_to_update_named_shadow_rejected()で呼び出されるcallbackです。
shadow updateの失敗を通知します。

subscribe_to_update_named_shadow_rejected()

Subscribes to the rejected topic for the UpdateNamedShadow operation

subの対象になるtopicはこちらです。

topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/update/rejected'

callbackの実態として以下のメソッドを準備します。

def on_shadow_update_rejected(error):
    print("Shadow Update Rejected:")
    print(error)

on_get_shadow_accepted

subscribe_to_get_named_shadow_accepted()で呼び出されるcallbackです。
publish_get_named_shadow()でget topicがacceptされたときに出力先となるtopicのmessageを取得します。

何だか分かり難いですよね苦笑。

メソッド使い分けの説明と考察は後述します。

subscribe_to_get_named_shadow_accepted()

Subscribes to the accepted topic for the GetNamedShadow operation

subの対象になるtopicはこちらです。

topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/get/accepted'

callbackの実態として以下のメソッドを準備します。

def on_get_shadow_accepted(response):
    try:
        if hasattr(response, 'state'):
            print("shadow_on_get_accepted")
            print(f"Update state: {response.state}")
        if hasattr(response, 'metadata'):
            print(f"Metadata: {response.metadata}")
    except Exception as e:
        print(f"Error processing get shadow response: {e}")

on_shadow_update_rejected

subscribe_to_get_named_shadow_rejected()で呼び出されるcallbackです。
shadow get の失敗を通知します。

subscribe_to_get_named_shadow_rejected()

Subscribes to the rejected topic for the GetNamedShadow operation

subの対象になるtopicはこちらです。

topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/get/rejected'

callbackの実態として以下のメソッドを準備します。

def on_shadow_get_rejected(error):
    print("Shadow Get Rejected:")
    print(error)

on_shadow_delta_updated

subscribe_to_named_shadow_delta_updated_events()で呼び出されるcallbackです。
shadow deltaのupdateを通知します。

subscribe_to_named_shadow_delta_updated_events()

Subscribe to NamedShadowDelta events for a named shadow of an AWS IoT thing

subの対象になるtopicはこちらです。

topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/update/delta'

callbackの実態として以下のメソッドを準備します。

def on_shadow_delta_updated(delta):
    try:
        print(f"Received shadow delta event::  Delta State: {delta.state}")
    except Exception as e:
        print(f"Error processing shadow delta: {e}")

customメソッド

customメソッド名はpaho-mqtt版と揃えます。

shadow_update()

desiredとreportedの状態をjsonに設定してpubするメソッドです。desired stateを固定してreported stateを変化させて後ほどdelta発生時の動作を確認します。

stateは同じセットを使いますヨシッ👉

desired_state = "ヨシッ👉"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
state = state_ok
#state = state_ng

def shadow_update(shadow, state):
    update_shadow_future = shadow.publish_update_named_shadow(
        request=iotshadow.UpdateNamedShadowRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
            state=iotshadow.ShadowState(
                # デバイスの現在の状態
                reported  = {
                    "device_no": device_no,
                    "state": state
                },
                # デバイスが達成すべき目標の状態
                desired =  {
                    "device_no": device_no,
                    "state": desired_state
                },
            ),
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE
    )
    update_shadow_future.result()
    print("Shadow updated!")

shadow_get_request()

このメソッドはpubメソッドとなります。

用途: 現在のshadowのstateを取得したい場合に使います。現在のshadowのstateを取得するためには以下のtopicに空のpayloadをpubします

topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/get'

その結果が/get/acceptedまたは/get/rejectedにpubされます。subscriberはいずれかのレスポンスを受け取ることになります。

つまりこのメソッドは現在のshadow state確認要求の空メールのようなものです。

def shadow_get_request(shadow):
    # Getを送信する
    get_shadow_future = shadow.publish_get_named_shadow(
        request=iotshadow.GetNamedShadowRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
    )
    get_shadow_future.result()

shadow_get()

shadow_get_request()のレスポンスを受け取るsubメソッドとなります。

今回は4種類のshadow getのcallbackを準備してstateについて全てのgetする構成にしました。

customしなくて良いとはいえpahoでの説明と粒度に差がありますね、pahoごめんなさい。

def shadow_get(shadow):
    # Updateの成功をサブスクライブする
    update_accepted_future, _ = shadow.subscribe_to_update_named_shadow_accepted(
        request=iotshadow.UpdateNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_update_shadow_accepted,
    )
    update_accepted_future.result()
    print("Subscribed to update shadow accepted")

    # Updateの失敗をサブスクライブする
    update_rejected_future, _ = shadow.subscribe_to_update_named_shadow_rejected(
        request=iotshadow.UpdateNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_shadow_update_rejected,
    )
    update_rejected_future.result()
    print("Subscribed to update shadow rejected")

    # Getの成功をサブスクライブする
    get_accepted_future, _ = shadow.subscribe_to_get_named_shadow_accepted(
        request=iotshadow.GetNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_get_shadow_accepted,
    )
    get_accepted_future.result()
    print("Subscribed to get shadow accepted")

    # Getの失敗をサブスクライブする
    get_rejected_future, _ = shadow.subscribe_to_get_named_shadow_rejected(
        request=iotshadow.GetNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_shadow_get_rejected,
    )
    get_rejected_future.result()
    print("Subscribed to get shadow rejected")

shadowdelta_get()

shadow deltaのstateを取得するメソッドです。

def shadowdelta_get(shadow):
    try:
        delta_subscribed_future, _ = shadow.subscribe_to_named_shadow_delta_updated_events(
            request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
                thing_name=hostname,
                shadow_name=SHADOWNAME,
            ),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=on_shadow_delta_updated
        )
        delta_subscribed_future.result()
        print("Subscribed to shadow delta updates")
    except Exception as e:
        print(f"Error subscribing to shadow delta updates: {e}")

shadow_on_updateとshadow_on_getの違い

shadow_on_update

shadow_update()時のpublish_update_named_shadow()に対するレスポンスを取得します。
用途: shadow stateのupdate時に取得
制限: shadow_update()でdeltaを明示しないとdeltaに関する情報は帰ってこない

shadow_on_get

現在のshadowのstateを取得
用途: 知りたい時にshadow stateを取得
うれしさ: delta stateの情報が含まれる

個人的に、どちらか一つならshadow_on_getを設定します。コスト許容可なら保険で二つかけておくのもありだと思います。

main()

作成したメソッドをmainに追加します。

main()
if __name__ == '__main__':
    clientid = f"{hostname}"
    state_ok = "ヨシッ👉"
    state_ng = "ダメ🙅"
    topic = TOPIC
    state = state_ng

    #クライアント作成
    mqtt_connection = mqtt_init()
    mqtt_connect(mqtt_connection)
    # Device shadowクライアント作成
    shadow = iotshadow.IotShadowClient(mqtt_connection)

    #sub開始
    mqtt_subscribe(mqtt_connection, topic)
    shadow_get(shadow)
    shadowdelta_get(shadow)
 
    try:
        # publish massage
        mqtt_publish(mqtt_connection, topic, msg)
        shadow_update(shadow, state)
        shadow_get_request(shadow)
        time.sleep(3)
        mqtt_disconnoect(mqtt_connection)

    except KeyboardInterrupt:
        print("Waiting for shadow updates...")
        mqtt_disconnoect(mqtt_connection)

Demo code 全体

前回のコードに今回作成したcallbackとメソッドを追加すると以下のようになります。

pubsub_awssdk.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from awscrt import mqtt
from awsiot import mqtt_connection_builder, iotshadow
import os
import time
import json


endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'

device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT Device SDK v2接続ヨシッ👉"


def on_connection_interrupted(connection, error, **kwargs):
    print(f"Connection interrupted. error: {error}")

def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print(f"Connection resumed. return_code: {return_code} session_present: {return_code}")

def on_message(topic, payload, dup, qos, retain, **kwargs):
    payload_str = payload.decode('utf-8')
    payload_dict = json.loads(payload_str)
    print(f"on_message payload dict: {payload_dict}")
    return

def on_connection_success(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
    print(f"Connection Successful with return code: {callback_data.return_code} session present: {callback_data.session_present}")

def on_connection_failure(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionFailureData)
    print(f"Connection failed with error code: {callback_data.error}")
    print(f"Error details: {callback_data}")
    
def on_connection_closed(connection, callback_data):
    print("Connection closed")

def on_update_shadow_accepted(response):
    try:
        if hasattr(response, 'state'):
            print("shadow_on_update_accepted")
            print(f"Update state: {response.state}")
        if hasattr(response, 'metadata'):
            print(f"Metadata: {response.metadata}")
    except Exception as e:
        print(f"Error processing update shadow response: {e}")

def on_shadow_update_rejected(error):
    print("Shadow Update Rejected:")
    print(error)

def on_get_shadow_accepted(response):
    try:
        if hasattr(response, 'state'):
            print("shadow_on_get_accepted")
            print(f"Update state: {response.state}")
        if hasattr(response, 'metadata'):
            print(f"Metadata: {response.metadata}")
    except Exception as e:
        print(f"Error processing get shadow response: {e}")

def on_shadow_get_rejected(error):
    print("Shadow Get Rejected:")
    print(error)

def on_shadow_delta_updated(delta):
    try:
        print(f"Received shadow delta event::  Delta State: {delta.state}")
    except Exception as e:
        print(f"Error processing shadow delta: {e}")

def mqtt_init():
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        port=port,
        cert_filepath=cert,
        pri_key_filepath=key,
        ca_filepath=ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=clientid,
        clean_session=False,
        keep_alive_secs=30,
        on_connection_success=on_connection_success,
        on_connection_failure=on_connection_failure,
        on_connection_closed=on_connection_closed
        )
    print("Connecting to endpoint with client ID")
    return mqtt_connection

def mqtt_connect(mqtt_connection):
    connect_future = mqtt_connection.connect()
    connect_future.result()

def mqtt_disconnoect(mqtt_connection):
    print("Disconnecting...")
    disconnect_future = mqtt_connection.disconnect()
    disconnect_future.result()

def mqtt_publish(mqtt_connection, topic, msg):
    data = {}  
    print("topic:", topic)
    data['Timestamp'] = int(time.time())
    data['hostname'] = hostname
    data['device_no'] = device_no
    data['msg'] = msg
    message_json = json.dumps(data)
    mqtt_connection.publish(
        topic=topic,
        payload=message_json,
        qos=mqtt.QoS.AT_LEAST_ONCE,
    )
    print(f"published message payload dict: {data}")
    time.sleep(1)

def mqtt_subscribe(mqtt_connection, topic):
    print(f"Subscribing to topic '{topic}'...")
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message)
    subscribe_result = subscribe_future.result()
    print(f"Subscribed with {subscribe_result}")
    time.sleep(1)    

def shadow_update(shadow, state):
    update_shadow_future = shadow.publish_update_named_shadow(
        request=iotshadow.UpdateNamedShadowRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
            state=iotshadow.ShadowState(
                # デバイスの現在の状態
                reported  = {
                    "device_no": device_no,
                    "state": state
                },
                # デバイスが達成すべき目標の状態
                desired =  {
                    "device_no": device_no,
                    "state": desired_state
                },
            ),
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE
    )
    update_shadow_future.result()
    print("Shadow updated!")

def shadow_get_request(shadow):
    # Getを送信する
    get_shadow_future = shadow.publish_get_named_shadow(
        request=iotshadow.GetNamedShadowRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
    )
    get_shadow_future.result()

def shadow_get(shadow):
    # Updateの成功をサブスクライブする
    update_accepted_future, _ = shadow.subscribe_to_update_named_shadow_accepted(
        request=iotshadow.UpdateNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_update_shadow_accepted,
    )
    update_accepted_future.result()
    print("Subscribed to update shadow accepted")

    # Updateの失敗をサブスクライブする
    update_rejected_future, _ = shadow.subscribe_to_update_named_shadow_rejected(
        request=iotshadow.UpdateNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_shadow_update_rejected,
    )
    update_rejected_future.result()
    print("Subscribed to update shadow rejected")

    # Getの成功をサブスクライブする
    get_accepted_future, _ = shadow.subscribe_to_get_named_shadow_accepted(
        request=iotshadow.GetNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_get_shadow_accepted,
    )
    get_accepted_future.result()
    print("Subscribed to get shadow accepted")

    # Getの失敗をサブスクライブする
    get_rejected_future, _ = shadow.subscribe_to_get_named_shadow_rejected(
        request=iotshadow.GetNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_shadow_get_rejected,
    )
    get_rejected_future.result()
    print("Subscribed to get shadow rejected")

def shadowdelta_get(shadow):
    try:
        delta_subscribed_future, _ = shadow.subscribe_to_named_shadow_delta_updated_events(
            request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
                thing_name=hostname,
                shadow_name=SHADOWNAME,
            ),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=on_shadow_delta_updated
        )
        delta_subscribed_future.result()
        print("Subscribed to shadow delta updates")
    except Exception as e:
        print(f"Error subscribing to shadow delta updates: {e}")


if __name__ == '__main__':
    clientid = f"{hostname}"
    state_ok = "ヨシッ👉"
    state_ng = "ダメ🙅"
    topic = TOPIC
    state = state_ng

    mqtt_connection = mqtt_init()
    mqtt_connect(mqtt_connection)
    shadow = iotshadow.IotShadowClient(mqtt_connection)

    mqtt_subscribe(mqtt_connection, topic)
    shadow_get(shadow)
    shadowdelta_get(shadow)
 
    try:
        mqtt_publish(mqtt_connection, topic, msg)
        shadow_update(shadow, state)
        shadow_get_request(shadow)
        time.sleep(3)
        mqtt_disconnoect(mqtt_connection)

    except KeyboardInterrupt:
        print("Waiting for shadow updates...")
        mqtt_disconnoect(mqtt_connection)

実行結果

OK state

state_okをreported stateとしてupdateします。

state_ok = "ヨシッ👉"

下図の状態です。

正常にpubsubを実行してます。

topic: MQTTServer/MQTTServer1/0
published message payload dict: {'Timestamp': 1719734710, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
on_message payload dict: {'Timestamp': 1719734710, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}

正常にshadowのupdateを実行してupdateをgetできました。
こちらにはdeltaの情報は入りません。

Shadow updated!
shadow_on_update_accepted
Update state: awsiot.iotshadow.ShadowState(desired={'device_no': 0, 'state': 'ヨシッ👉'}, desired_is_nullable=False, reported={'device_no': 0, 'state': 'ヨシッ👉'}, reported_is_nullable=False)
Metadata: awsiot.iotshadow.ShadowMetadata(desired={'device_no': {'timestamp': 1719734711}, 'state': {'timestamp': 1719734711}}, reported={'device_no': {'timestamp': 1719734711}, 'state': {'timestamp': 1719734711}})

正常にshadow_on_getもgetできました。
こちらにはdeltaの情報がnoneと入っています。

shadow_on_get_accepted
Update state: awsiot.iotshadow.ShadowStateWithDelta(delta=None, desired={'device_no': 0, 'state': 'ヨシッ👉'}, reported={'device_no': 0, 'state': 'ヨシッ👉'})
Metadata: awsiot.iotshadow.ShadowMetadata(desired={'device_no': {'timestamp': 1719734711}, 'state': {'timestamp': 1719734711}}, reported={'device_no': {'timestamp': 1719734711}, 'state': {'timestamp': 1719734711}})

存在しないのでメソッドはshadow deltaを取得していません。

AWS IoTのテストクライアント上での確認です。

NG state

異常検知を想定してstate_ngをreported stateの値に代入してshadow updateします。

state_ng = "ダメ🙅"

下図の状態です。

reportedを「ダメ」でupdateできています。

Shadow updated!
shadow_on_update_accepted
Update state: awsiot.iotshadow.ShadowState(desired={'device_no': 0, 'state': 'ヨシッ👉'}, desired_is_nullable=False, reported={'device_no': 0, 'state': 'ダメ🙅'}, reported_is_nullable=False)
Metadata: awsiot.iotshadow.ShadowMetadata(desired={'device_no': {'timestamp': 1719734895}, 'state': {'timestamp': 1719734895}}, reported={'device_no': {'timestamp': 1719734895}, 'state': {'timestamp': 1719734895}})

shadow updateをgetすると。

正常にshadow_on_getも「ダメ」が確認できます。
こちらにはdelta stateがnoneからdesiredのstateに変わっているのが分かります。

shadow_on_get_accepted
Update state: awsiot.iotshadow.ShadowStateWithDelta(delta={'state': 'ヨシッ👉'}, desired={'device_no': 0, 'state': 'ヨシッ👉'}, reported={'device_no': 0, 'state': 'ダメ🙅'})
Metadata: awsiot.iotshadow.ShadowMetadata(desired={'device_no': {'timestamp': 1719734895}, 'state': {'timestamp': 1719734895}}, reported={'device_no': {'timestamp': 1719734895}, 'state': {'timestamp': 1719734895}})

今度はshadow deltaのstateもgetできます。

delta更新を取得するメソッドの呼び方はgetではなくsubscribeのほうが正しい気がしますがこのまま行かせてくださいm(__)m

Received shadow delta event::  Delta State: {'state': 'ヨシッ👉'}

今回shadow update時にdeltaを明示的にupdateしてないのでAWS IoTのテストクライアント上では確認できません。


shadowのstatusには反映されています。

paho編で説明を端折りました(と言うか抜けました)が、

  • 能動的にdelat発生を把握する為のget要求は準備されていないので
  • Lambda発火のトリガーに使う等の場合にはshadowのupdateに紐づけてdelta発生の有無を/update/deltaをsubscribeして確認する必要がある
  • shadow.subscribe_to_named_shadow_delta_updated_events()はその為のsub用メソッド

と言うことだと思います(結局pahoでやったことも同じです)。

いずれにしてもsubしたdeltaをトリガーとして自動実行ワークロードに組み込むことが可能です。

次回

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑧awsiotsdkv2でretainとwill

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?