この記事について
AWS IoT Device SDK v2 for Pythonでmessageのpubsubをしたコードにretainとwillの設定を追加します。
shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑤pahoでretainとwillで詳細に説明した内容は割愛します。
retain
AWS IoTでsubした最新messageを、S3やDynamoDBに保存するのではなくIoT Coreで保持します。
設定方法
topic発行(publish)メソッドの中で呼び出すmqtt_connection.publish()にretain flagを付けることでAWS IoTに最後のmessageを保持することができます。
mqtt_connection.publish(
topic=topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE
)
mqtt_connection.publish(
topic=topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE,
retain=True
)
実行結果
前回までのコードのmqtt_connect()の中のpublish()でretainありなしの違いを見ます。
retainなし
まず前回までのコードに、on_message()のmsg.retain出力だけ追加して実行します。
published message payload dict: {'Timestamp': 1719748369, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
Retain : False
on_message payload dict: {'Timestamp': 1719748369, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
retain flagはFalseなのでsubしたmessageがretain messageでは無いことが分かります。
AWS IoTのコーンソールでもretain messageは確認できません。
retainあり
retainありを設定したコードを実行します。
一度目のpubでは、subしたmessageにretainなしと同じレスポンスが戻ってきます。この時点でAWS IoT側にretain messageは保持されていないので正常なレスポンスです。
topic: MQTTServer/MQTTServer1/0
published message payload dict: {'Timestamp': 1719749715, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
Retain : False
on_message payload dict: {'Timestamp': 1719749715, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
一度disconnectして再度接続するとmessageを2つsubします。
片方はRetain: Trueとなっているのが分かります。
Timestampを比べると、前回のpubメッセージと今回のRetain: Trueのsub messageの時刻が同じであることを確認できます。
Subscribing to topic 'MQTTServer/MQTTServer1/0'...
<QoS.AT_LEAST_ONCE: 1>}
Retain : True
on_message payload dict: {'Timestamp': 1719749715, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
topic: MQTTServer/MQTTServer1/0
published message payload dict: {'Timestamp': 1719749726, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
Retain : False
on_message payload dict: {'Timestamp': 1719749726, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
AWS IoTのコーンソールでもretain messageが確認できます。最後にpubした時刻のmessageに書き換えられています。
will
あらかじめ「異常を知らせるmessage」をAWS IoTに持たせておいてtimeoutしたら指定のtopicに「異常を知らせるmessage」をpubする機能です。
設定方法
MQTT接続メソッド(mqtt_init())の中にwillを設定するpropertyを追加します。
引数の渡し方以外にpahoと異なることとして、AWS IoT Device SDK v2 for Pythonではwill messageにretain flagを付けることが出来ません。
なのでretain=Falseがmustです。
LWT (Will) with retain=True causes AWS_ERROR_MQTT_UNEXPECTED_HANGUP
How to call MQTT Last Will and Testament
mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
port=port,
cert_filepath=cert,
pri_key_filepath=key,
ca_filepath=ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=clientid,
clean_session=False,
keep_alive_secs=30,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed
)
WILL_PAYLOAD = json.dumps({
"hostname" : hostname,
"device_no" : device_no,
"error_status" : "The connection was closed unexpectedly"
}).encode('utf-8')
mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
port=port,
cert_filepath=cert,
pri_key_filepath=key,
ca_filepath=ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=clientid,
clean_session=False,
keep_alive_secs=30,
will=mqtt.Will(
topic=WILL_TOPIC,
qos=mqtt.QoS.AT_LEAST_ONCE,
payload=WILL_PAYLOAD,
retain=False
),
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed
)
今までの動作確認では1回通信が終わると切断してきましたが、異常切断を判定してwillをpubさせるためにmain関数をloop設定します。
if __name__ == '__main__':
clientid = f"{hostname}"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
topic = TOPIC
state = state_ng
mqtt_connection = mqtt_init()
mqtt_connect(mqtt_connection)
shadow = iotshadow.IotShadowClient(mqtt_connection)
mqtt_subscribe(mqtt_connection, topic)
shadow_get(shadow)
shadowdelta_get(shadow)
try:
while True:
mqtt_publish(mqtt_connection, topic, msg)
shadow_update(shadow, state)
shadow_get_request(shadow)
time.sleep(30)
except KeyboardInterrupt:
print("Waiting for shadow updates...")
mqtt_disconnoect(mqtt_connection)
実行結果
初めに今まで同様pubsubが正常に行われることを確認します。
確認出来たらi-net接続している回線を遮断します。
~ $ nmcli c
NAME UUID TYPE DEVICE
w0-inet 2969d6f3-b144-4c5f-8bda-4b3e607c4203 wifi wlan0
lo d851e136-18e8-43cc-b010-d682e8ec2674 loopback lo
e0-lo e6efbdcb-8856-45a9-992b-3770afe45ec9 ethernet eth0
~ $
# i-netにつないでいるwlan0を切断
~ $ sudo nmcli c down w0-inet
Connection 'w0-inet' successfully deactivated (D-Bus active path: /org/freedesktop/NetworkManager/ActiveConnection/9)
~ $ nmcli c
NAME UUID TYPE DEVICE
lo d851e136-18e8-43cc-b010-d682e8ec2674 loopback lo
e0-lo e6efbdcb-8856-45a9-992b-3770afe45ec9 ethernet eth0
w0-inet 2969d6f3-b144-4c5f-8bda-4b3e607c4203 wifi --
前回長かったのでkeepaliveを30秒に設定しました()
1分以内にAWS IoTのMQTTテストクライアントでwillがpubされてることを確認できました。
deltaと同様にトリガーとして自動実行ワークロードに組み込むことが可能です。
Demo code 全体
前回までのコードにretainとwillの設定を追加すると以下のようになります。
#!/usr/bin/python
# -*- coding: utf-8 -*-
from awscrt import mqtt
from awsiot import mqtt_connection_builder, iotshadow
import os
import time
import json
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'
device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT Device SDK v2接続ヨシッ👉"
def on_connection_interrupted(connection, error, **kwargs):
print(f"Connection interrupted. error: {error}")
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print(f"Connection resumed. return_code: {return_code} session_present: {return_code}")
def on_message(topic, payload, dup, qos, retain, **kwargs):
payload_str = payload.decode('utf-8')
payload_dict = json.loads(payload_str)
print(f"on_message payload dict: {payload_dict}")
return
def on_connection_success(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
print(f"Connection Successful with return code: {callback_data.return_code} session present: {callback_data.session_present}")
def on_connection_failure(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionFailureData)
print(f"Connection failed with error code: {callback_data.error}")
print(f"Error details: {callback_data}")
def on_connection_closed(connection, callback_data):
print("Connection closed")
def on_update_shadow_accepted(response):
try:
if hasattr(response, 'state'):
print("shadow_on_update_accepted")
print(f"Update state: {response.state}")
if hasattr(response, 'metadata'):
print(f"Metadata: {response.metadata}")
except Exception as e:
print(f"Error processing update shadow response: {e}")
def on_shadow_update_rejected(error):
print("Shadow Update Rejected:")
print(error)
def on_get_shadow_accepted(response):
try:
if hasattr(response, 'state'):
print("shadow_on_get_accepted")
print(f"Update state: {response.state}")
if hasattr(response, 'metadata'):
print(f"Metadata: {response.metadata}")
except Exception as e:
print(f"Error processing get shadow response: {e}")
def on_shadow_get_rejected(error):
print("Shadow Get Rejected:")
print(error)
def on_shadow_delta_updated(delta):
try:
print(f"Received shadow delta event:: Delta State: {delta.state}")
except Exception as e:
print(f"Error processing shadow delta: {e}")
def mqtt_init():
WILL_PAYLOAD = json.dumps({
"hostname" : hostname,
"device_no" : device_no,
"error_status" : "The connection was closed unexpectedly"
}).encode('utf-8')
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
port=port,
cert_filepath=cert,
pri_key_filepath=key,
ca_filepath=ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=clientid,
clean_session=False,
keep_alive_secs=30,
will=mqtt.Will(
topic=WILL_TOPIC,
qos=mqtt.QoS.AT_LEAST_ONCE,
payload=WILL_PAYLOAD,
retain=False
),
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed
)
print("Connecting to endpoint with client ID")
return mqtt_connection
def mqtt_connect(mqtt_connection):
connect_future = mqtt_connection.connect()
connect_future.result()
def mqtt_disconnoect(mqtt_connection):
print("Disconnecting...")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
def mqtt_publish(mqtt_connection, topic, msg):
data = {}
print("topic:", topic)
data['Timestamp'] = int(time.time())
data['hostname'] = hostname
data['device_no'] = device_no
data['msg'] = msg
message_json = json.dumps(data)
mqtt_connection.publish(
topic=topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE,
retain=True
)
print(f"published message payload dict: {data}")
time.sleep(1)
def mqtt_subscribe(mqtt_connection, topic):
print(f"Subscribing to topic '{topic}'...")
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message)
subscribe_result = subscribe_future.result()
print(f"Subscribed with {subscribe_result}")
time.sleep(1)
def shadow_update(shadow, state):
update_shadow_future = shadow.publish_update_named_shadow(
request=iotshadow.UpdateNamedShadowRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
state=iotshadow.ShadowState(
# デバイスの現在の状態
reported = {
"device_no": device_no,
"state": state
},
# デバイスが達成すべき目標の状態
desired = {
"device_no": device_no,
"state": desired_state
},
),
),
qos=mqtt.QoS.AT_LEAST_ONCE
)
update_shadow_future.result()
print("Shadow updated!")
def shadow_get_request(shadow):
# Getを送信する
get_shadow_future = shadow.publish_get_named_shadow(
request=iotshadow.GetNamedShadowRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
)
get_shadow_future.result()
def shadow_get(shadow):
# Updateの成功をサブスクライブする
update_accepted_future, _ = shadow.subscribe_to_update_named_shadow_accepted(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_update_shadow_accepted,
)
update_accepted_future.result()
print("Subscribed to update shadow accepted")
# Updateの失敗をサブスクライブする
update_rejected_future, _ = shadow.subscribe_to_update_named_shadow_rejected(
request=iotshadow.UpdateNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_update_rejected,
)
update_rejected_future.result()
print("Subscribed to update shadow rejected")
# Getの成功をサブスクライブする
get_accepted_future, _ = shadow.subscribe_to_get_named_shadow_accepted(
request=iotshadow.GetNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_get_shadow_accepted,
)
get_accepted_future.result()
print("Subscribed to get shadow accepted")
# Getの失敗をサブスクライブする
get_rejected_future, _ = shadow.subscribe_to_get_named_shadow_rejected(
request=iotshadow.GetNamedShadowSubscriptionRequest(
thing_name = hostname,
shadow_name = SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_get_rejected,
)
get_rejected_future.result()
print("Subscribed to get shadow rejected")
def shadowdelta_get(shadow):
try:
delta_subscribed_future, _ = shadow.subscribe_to_named_shadow_delta_updated_events(
request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
thing_name=hostname,
shadow_name=SHADOWNAME,
),
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_shadow_delta_updated
)
delta_subscribed_future.result()
print("Subscribed to shadow delta updates")
except Exception as e:
print(f"Error subscribing to shadow delta updates: {e}")
if __name__ == '__main__':
clientid = f"{hostname}"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
topic = TOPIC
state = state_ng
mqtt_connection = mqtt_init()
mqtt_connect(mqtt_connection)
shadow = iotshadow.IotShadowClient(mqtt_connection)
mqtt_subscribe(mqtt_connection, topic)
shadow_get(shadow)
shadowdelta_get(shadow)
try:
while True:
mqtt_publish(mqtt_connection, topic, msg)
shadow_update(shadow, state)
shadow_get_request(shadow)
time.sleep(30)
except KeyboardInterrupt:
print("Waiting for shadow updates...")
mqtt_disconnoect(mqtt_connection)
次回
次回からmosquitto編に入ります。
shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑨mosquittoでpubsub