この記事について
AWS IoT Device SDK v2 for Pythonでmessageのpubsubをしたコードにshadowのupdate/getを追加します。
ラズパイに接続された仮想device_0の正常稼働とdevice停止をAWS IoTのdevice shadowに反映させます。
shadowのtopic
paho-mqttでshadow update/getした時と同様になります。
callback
今回はshadowのgetter(subscriber)を起動する際に呼び出す5種類のcallbackを準備します。awsiotでshadow用sample提供されているsourceを簡略化して使います。
custom callbackを作る必要なし、特にshadow deltaのsubscriberが準備されてるところは刺さりました、何といってもAWS IoT Device SDKですから!
on_update_shadow_accepted
subscribe_to_update_named_shadow_accepted()で呼び出されるcallbackです。
updateが成功したshadowをgetします。
subscribe_to_update_named_shadow_accepted()
Subscribes to the accepted topic for the UpdateNamedShadow operation
subの対象になるtopicはこちらです。
topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/update/accepted'
callbackの実態として以下のメソッドを準備します。
def on_update_shadow_accepted(response):
try:
if hasattr(response, 'state'):
print("shadow_on_update_accepted")
print(f"Update state: {response.state}")
if hasattr(response, 'metadata'):
print(f"Metadata: {response.metadata}")
except Exception as e:
print(f"Error processing update shadow response: {e}")
on_shadow_update_rejected
subscribe_to_update_named_shadow_rejected()で呼び出されるcallbackです。
shadow updateの失敗を通知します。
subscribe_to_update_named_shadow_rejected()
Subscribes to the rejected topic for the UpdateNamedShadow operation
subの対象になるtopicはこちらです。
topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/update/rejected'
callbackの実態として以下のメソッドを準備します。
def on_shadow_update_rejected(error):
print("Shadow Update Rejected:")
print(error)
on_get_shadow_accepted
subscribe_to_get_named_shadow_accepted()で呼び出されるcallbackです。
publish_get_named_shadow()でget topicがacceptされたときに出力先となるtopicのmessageを取得します。
何だか分かり難いですよね苦笑。
メソッド使い分けの説明と考察は後述します。
subscribe_to_get_named_shadow_accepted()
Subscribes to the accepted topic for the GetNamedShadow operation
subの対象になるtopicはこちらです。
topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/get/accepted'
callbackの実態として以下のメソッドを準備します。
def on_get_shadow_accepted(response):
try:
if hasattr(response, 'state'):
print("shadow_on_get_accepted")
print(f"Update state: {response.state}")
if hasattr(response, 'metadata'):
print(f"Metadata: {response.metadata}")
except Exception as e:
print(f"Error processing get shadow response: {e}")
on_shadow_update_rejected
subscribe_to_get_named_shadow_rejected()で呼び出されるcallbackです。
shadow get の失敗を通知します。
subscribe_to_get_named_shadow_rejected()
Subscribes to the rejected topic for the GetNamedShadow operation
subの対象になるtopicはこちらです。
topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/get/rejected'
callbackの実態として以下のメソッドを準備します。
def on_shadow_get_rejected(error):
print("Shadow Get Rejected:")
print(error)
on_shadow_delta_updated
subscribe_to_named_shadow_delta_updated_events()で呼び出されるcallbackです。
shadow deltaのupdateを通知します。
subscribe_to_named_shadow_delta_updated_events()
Subscribe to NamedShadowDelta events for a named shadow of an AWS IoT thing
subの対象になるtopicはこちらです。
topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/update/delta'
callbackの実態として以下のメソッドを準備します。
def on_shadow_delta_updated(delta):
try:
print(f"Received shadow delta event:: Delta State: {delta.state}")
except Exception as e:
print(f"Error processing shadow delta: {e}")
customメソッド
customメソッド名はpaho-mqtt版と揃えます。
shadow_update()
desiredとreportedの状態をjsonに設定してpubするメソッドです。desired stateを固定してreported stateを変化させて後ほどdelta発生時の動作を確認します。
stateは同じセットを使いますヨシッ👉
desired_state = "ヨシッ👉"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
state = state_ok
#state = state_ng
def shadow_update(shadow, state):
update_shadow_future = shadow.publish_update_named_shadow(
request=iotshadow.UpdateNamedShadowRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
state=iotshadow.ShadowState(
# デバイスの現在の状態
reported = {
"device_no": device_no,
"state": state
},
# デバイスが達成すべき目標の状態
desired = {
"device_no": device_no,
"state": desired_state
},
),
),
qos=mqtt.QoS.AT_LEAST_ONCE
)
update_shadow_future.result()
print("Shadow updated!")
shadow_get_request()
このメソッドはpubメソッドとなります。
用途: 現在のshadowのstateを取得したい場合に使います。現在のshadowのstateを取得するためには以下のtopicに空のpayloadをpubします。
topic='$aws/things/{0.thing_name}/shadow/name/{0.shadow_name}/get'
その結果が/get/acceptedまたは/get/rejectedにpubされます。subscriberはいずれかのレスポンスを受け取ることになります。
つまりこのメソッドは現在のshadow state確認要求の空メールのようなものです。
def shadow_get_request(shadow):
# Getを送信する
get_shadow_future = shadow.publish_get_named_shadow(
request=iotshadow.GetNamedShadowRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
)
get_shadow_future.result()
shadow_get()
shadow_get_request()のレスポンスを受け取るsubメソッドとなります。
今回は4種類のshadow getのcallbackを準備してstateについて全てのgetする構成にしました。
customしなくて良いとはいえpahoでの説明と粒度に差がありますね、pahoごめんなさい。
def shadow_get(shadow):
# Updateの成功をサブスクライブする
update_accepted_future, _ = shadow.subscribe_to_update_named_shadow_accepted(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_accepted,
)
update_accepted_future.result()
print("Subscribed to update shadow accepted")
# Updateの失敗をサブスクライブする
update_rejected_future, _ = shadow.subscribe_to_update_named_shadow_rejected(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_update_rejected,
)
update_rejected_future.result()
print("Subscribed to update shadow rejected")
# Getの成功をサブスクライブする
get_accepted_future, _ = shadow.subscribe_to_get_named_shadow_accepted(
request=iotshadow.GetNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_accepted,
)
get_accepted_future.result()
print("Subscribed to get shadow accepted")
# Getの失敗をサブスクライブする
get_rejected_future, _ = shadow.subscribe_to_get_named_shadow_rejected(
request=iotshadow.GetNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_get_rejected,
)
get_rejected_future.result()
print("Subscribed to get shadow rejected")
shadowdelta_get()
shadow deltaのstateを取得するメソッドです。
def shadowdelta_get(shadow):
try:
delta_subscribed_future, _ = shadow.subscribe_to_named_shadow_delta_updated_events(
request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
thing_name=hostname,
shadow_name=SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_delta_updated
)
delta_subscribed_future.result()
print("Subscribed to shadow delta updates")
except Exception as e:
print(f"Error subscribing to shadow delta updates: {e}")
shadow_on_updateとshadow_on_getの違い
shadow_on_update
shadow_update()時のpublish_update_named_shadow()に対するレスポンスを取得します。
用途: shadow stateのupdate時に取得
制限: shadow_update()でdeltaを明示しないとdeltaに関する情報は帰ってこない
shadow_on_get
現在のshadowのstateを取得
用途: 知りたい時にshadow stateを取得
うれしさ: delta stateの情報が含まれる
個人的に、どちらか一つならshadow_on_getを設定します。コスト許容可なら保険で二つかけておくのもありだと思います。
main()
作成したメソッドをmainに追加します。
if __name__ == '__main__':
clientid = f"{hostname}"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
topic = TOPIC
state = state_ng
#クライアント作成
mqtt_connection = mqtt_init()
mqtt_connect(mqtt_connection)
# Device shadowクライアント作成
shadow = iotshadow.IotShadowClient(mqtt_connection)
#sub開始
mqtt_subscribe(mqtt_connection, topic)
shadow_get(shadow)
shadowdelta_get(shadow)
try:
# publish massage
mqtt_publish(mqtt_connection, topic, msg)
shadow_update(shadow, state)
shadow_get_request(shadow)
time.sleep(3)
mqtt_disconnoect(mqtt_connection)
except KeyboardInterrupt:
print("Waiting for shadow updates...")
mqtt_disconnoect(mqtt_connection)
Demo code 全体
前回のコードに今回作成したcallbackとメソッドを追加すると以下のようになります。
#!/usr/bin/python
# -*- coding: utf-8 -*-
from awscrt import mqtt
from awsiot import mqtt_connection_builder, iotshadow
import os
import time
import json
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'
device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT Device SDK v2接続ヨシッ👉"
def on_connection_interrupted(connection, error, **kwargs):
print(f"Connection interrupted. error: {error}")
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print(f"Connection resumed. return_code: {return_code} session_present: {return_code}")
def on_message(topic, payload, dup, qos, retain, **kwargs):
payload_str = payload.decode('utf-8')
payload_dict = json.loads(payload_str)
print(f"on_message payload dict: {payload_dict}")
return
def on_connection_success(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
print(f"Connection Successful with return code: {callback_data.return_code} session present: {callback_data.session_present}")
def on_connection_failure(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionFailureData)
print(f"Connection failed with error code: {callback_data.error}")
print(f"Error details: {callback_data}")
def on_connection_closed(connection, callback_data):
print("Connection closed")
def on_update_shadow_accepted(response):
try:
if hasattr(response, 'state'):
print("shadow_on_update_accepted")
print(f"Update state: {response.state}")
if hasattr(response, 'metadata'):
print(f"Metadata: {response.metadata}")
except Exception as e:
print(f"Error processing update shadow response: {e}")
def on_shadow_update_rejected(error):
print("Shadow Update Rejected:")
print(error)
def on_get_shadow_accepted(response):
try:
if hasattr(response, 'state'):
print("shadow_on_get_accepted")
print(f"Update state: {response.state}")
if hasattr(response, 'metadata'):
print(f"Metadata: {response.metadata}")
except Exception as e:
print(f"Error processing get shadow response: {e}")
def on_shadow_get_rejected(error):
print("Shadow Get Rejected:")
print(error)
def on_shadow_delta_updated(delta):
try:
print(f"Received shadow delta event:: Delta State: {delta.state}")
except Exception as e:
print(f"Error processing shadow delta: {e}")
def mqtt_init():
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
port=port,
cert_filepath=cert,
pri_key_filepath=key,
ca_filepath=ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=clientid,
clean_session=False,
keep_alive_secs=30,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed
)
print("Connecting to endpoint with client ID")
return mqtt_connection
def mqtt_connect(mqtt_connection):
connect_future = mqtt_connection.connect()
connect_future.result()
def mqtt_disconnoect(mqtt_connection):
print("Disconnecting...")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
def mqtt_publish(mqtt_connection, topic, msg):
data = {}
print("topic:", topic)
data['Timestamp'] = int(time.time())
data['hostname'] = hostname
data['device_no'] = device_no
data['msg'] = msg
message_json = json.dumps(data)
mqtt_connection.publish(
topic=topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE,
)
print(f"published message payload dict: {data}")
time.sleep(1)
def mqtt_subscribe(mqtt_connection, topic):
print(f"Subscribing to topic '{topic}'...")
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message)
subscribe_result = subscribe_future.result()
print(f"Subscribed with {subscribe_result}")
time.sleep(1)
def shadow_update(shadow, state):
update_shadow_future = shadow.publish_update_named_shadow(
request=iotshadow.UpdateNamedShadowRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
state=iotshadow.ShadowState(
# デバイスの現在の状態
reported = {
"device_no": device_no,
"state": state
},
# デバイスが達成すべき目標の状態
desired = {
"device_no": device_no,
"state": desired_state
},
),
),
qos=mqtt.QoS.AT_LEAST_ONCE
)
update_shadow_future.result()
print("Shadow updated!")
def shadow_get_request(shadow):
# Getを送信する
get_shadow_future = shadow.publish_get_named_shadow(
request=iotshadow.GetNamedShadowRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
)
get_shadow_future.result()
def shadow_get(shadow):
# Updateの成功をサブスクライブする
update_accepted_future, _ = shadow.subscribe_to_update_named_shadow_accepted(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_accepted,
)
update_accepted_future.result()
print("Subscribed to update shadow accepted")
# Updateの失敗をサブスクライブする
update_rejected_future, _ = shadow.subscribe_to_update_named_shadow_rejected(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_update_rejected,
)
update_rejected_future.result()
print("Subscribed to update shadow rejected")
# Getの成功をサブスクライブする
get_accepted_future, _ = shadow.subscribe_to_get_named_shadow_accepted(
request=iotshadow.GetNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_accepted,
)
get_accepted_future.result()
print("Subscribed to get shadow accepted")
# Getの失敗をサブスクライブする
get_rejected_future, _ = shadow.subscribe_to_get_named_shadow_rejected(
request=iotshadow.GetNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_get_rejected,
)
get_rejected_future.result()
print("Subscribed to get shadow rejected")
def shadowdelta_get(shadow):
try:
delta_subscribed_future, _ = shadow.subscribe_to_named_shadow_delta_updated_events(
request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
thing_name=hostname,
shadow_name=SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_delta_updated
)
delta_subscribed_future.result()
print("Subscribed to shadow delta updates")
except Exception as e:
print(f"Error subscribing to shadow delta updates: {e}")
if __name__ == '__main__':
clientid = f"{hostname}"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
topic = TOPIC
state = state_ng
mqtt_connection = mqtt_init()
mqtt_connect(mqtt_connection)
shadow = iotshadow.IotShadowClient(mqtt_connection)
mqtt_subscribe(mqtt_connection, topic)
shadow_get(shadow)
shadowdelta_get(shadow)
try:
mqtt_publish(mqtt_connection, topic, msg)
shadow_update(shadow, state)
shadow_get_request(shadow)
time.sleep(3)
mqtt_disconnoect(mqtt_connection)
except KeyboardInterrupt:
print("Waiting for shadow updates...")
mqtt_disconnoect(mqtt_connection)
実行結果
OK state
state_okをreported stateとしてupdateします。
state_ok = "ヨシッ👉"
下図の状態です。
正常にpubsubを実行してます。
topic: MQTTServer/MQTTServer1/0
published message payload dict: {'Timestamp': 1719734710, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
on_message payload dict: {'Timestamp': 1719734710, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
正常にshadowのupdateを実行してupdateをgetできました。
こちらにはdeltaの情報は入りません。
Shadow updated!
shadow_on_update_accepted
Update state: awsiot.iotshadow.ShadowState(desired={'device_no': 0, 'state': 'ヨシッ👉'}, desired_is_nullable=False, reported={'device_no': 0, 'state': 'ヨシッ👉'}, reported_is_nullable=False)
Metadata: awsiot.iotshadow.ShadowMetadata(desired={'device_no': {'timestamp': 1719734711}, 'state': {'timestamp': 1719734711}}, reported={'device_no': {'timestamp': 1719734711}, 'state': {'timestamp': 1719734711}})
正常にshadow_on_getもgetできました。
こちらにはdeltaの情報がnoneと入っています。
shadow_on_get_accepted
Update state: awsiot.iotshadow.ShadowStateWithDelta(delta=None, desired={'device_no': 0, 'state': 'ヨシッ👉'}, reported={'device_no': 0, 'state': 'ヨシッ👉'})
Metadata: awsiot.iotshadow.ShadowMetadata(desired={'device_no': {'timestamp': 1719734711}, 'state': {'timestamp': 1719734711}}, reported={'device_no': {'timestamp': 1719734711}, 'state': {'timestamp': 1719734711}})
存在しないのでメソッドはshadow deltaを取得していません。
AWS IoTのテストクライアント上での確認です。
NG state
異常検知を想定してstate_ngをreported stateの値に代入してshadow updateします。
state_ng = "ダメ🙅"
下図の状態です。
reportedを「ダメ」でupdateできています。
Shadow updated!
shadow_on_update_accepted
Update state: awsiot.iotshadow.ShadowState(desired={'device_no': 0, 'state': 'ヨシッ👉'}, desired_is_nullable=False, reported={'device_no': 0, 'state': 'ダメ🙅'}, reported_is_nullable=False)
Metadata: awsiot.iotshadow.ShadowMetadata(desired={'device_no': {'timestamp': 1719734895}, 'state': {'timestamp': 1719734895}}, reported={'device_no': {'timestamp': 1719734895}, 'state': {'timestamp': 1719734895}})
shadow updateをgetすると。
正常にshadow_on_getも「ダメ」が確認できます。
こちらにはdelta stateがnoneからdesiredのstateに変わっているのが分かります。
shadow_on_get_accepted
Update state: awsiot.iotshadow.ShadowStateWithDelta(delta={'state': 'ヨシッ👉'}, desired={'device_no': 0, 'state': 'ヨシッ👉'}, reported={'device_no': 0, 'state': 'ダメ🙅'})
Metadata: awsiot.iotshadow.ShadowMetadata(desired={'device_no': {'timestamp': 1719734895}, 'state': {'timestamp': 1719734895}}, reported={'device_no': {'timestamp': 1719734895}, 'state': {'timestamp': 1719734895}})
今度はshadow deltaのstateもgetできます。
delta更新を取得するメソッドの呼び方はgetではなくsubscribeのほうが正しい気がしますがこのまま行かせてくださいm(__)m
Received shadow delta event:: Delta State: {'state': 'ヨシッ👉'}
今回shadow update時にdeltaを明示的にupdateしてないのでAWS IoTのテストクライアント上では確認できません。
shadowのstatusには反映されています。
paho編で説明を端折りました(と言うか抜けました)が、
- 能動的にdelat発生を把握する為のget要求は準備されていないので
- Lambda発火のトリガーに使う等の場合にはshadowのupdateに紐づけてdelta発生の有無を/update/deltaをsubscribeして確認する必要がある
- shadow.subscribe_to_named_shadow_delta_updated_events()はその為のsub用メソッド
と言うことだと思います(結局pahoでやったことも同じです)。
いずれにしてもsubしたdeltaをトリガーとして自動実行ワークロードに組み込むことが可能です。
次回
shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑧awsiotsdkv2でretainとwill