この記事について
shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow updateまでに実装したmessageのpubsub、shadowにretainとwillの設定を追加します。
retain
AWS IoTでsubした最新messageを、S3やDynamoDBに保存するのではなくIoT Coreで保持します。
保持された MQTT メッセージの使用例
意訳せずに素直に公式の説明を引用したいと思います。
初期設定メッセージとしての使用
保持されたMQTTメッセージは、クライアントがトピックにサブスクライブした後、クライアントに送信されます。トピックを購読しているすべてのクライアントに、購読後すぐに MQTT 保持メッセージを受信させたい場合は、RETAIN フラグを設定した設定メッセージを公開できます。サブスクライブしているクライアントはまた、新しい設定メッセージが発行されるたびに、その設定に対する更新が受信できます。
最新のメッセージとして
デバイスは、 AWS IoT Core が現在の状態メッセージを保存するように、それらにRETAIN フラッグを設定する事ができます。アプリケーションが接続または再接続すると、保持メッセージトピックを購読した直後に、このトピックを購読して、最後に報告された状態を取得できます。こうすることで、現在の状態を確認するために、デバイスからの次のメッセージを待つ必要がなくなります。
設定方法
topic発行(publish)メソッドの中で呼び出すclient.publish()にretain flagを付けることでAWS IoTに最後のmessageを保持することができます。
client.publish(_topic, json.dumps(data, default=json_serial), qos=1)
client.publish(_topic, json.dumps(data, default=json_serial), qos=1, retain=True)
もう一つ確認用にcallbackのon_message()に一文追加します。
on_message()のmsgはtopic、payload、qos、retainをメンバーとするクラスなのでmsg.retainでsubするmessageがretainか確認できます。
def on_message(client, userdata, msg):
# retain属性の表示を出力に追加
print(f"Retain: {msg.retain}")
print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")
return
実行結果
前回までのコードのmqtt_connect()の中のpublish()でretainありなしの違いを見ます。
retainなし
まず前回までのコードに、on_message()のmsg.retain出力だけ追加して実行します。
Message pblished : {'Timestamp': 1719581150, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Retain: False
Received message: {"Timestamp": 1719581150, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
retain flagはFalseなのでsubしたmessageがretain messageでは無いことが分かります。
AWS IoTのコーンソールでもretain messageは確認できません。
retainあり
retainありを設定したコードを実行します。
一度目のpubでは、subしたmessageにretainなしと同じレスポンスが戻ってきます。この時点でAWS IoT側にretain messageは保持されていないので正常なレスポンスです。
Message pblished : {'Timestamp': 1719581652, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Retain: False
Received message: {"Timestamp": 1719581652, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
on topic: MQTTServer/MQTTServer1/0
一度disconnectして再度接続するとmessageを2つsubします。
片方はRetain: Trueとなっているのが分かります。
Timestampを比べると、前回のpubメッセージと今回のRetain: Trueのsub messageの時刻が同じであることを確認できます。
Message pblished : {'Timestamp': 1719581692, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Retain: True
Received message: {"Timestamp": 1719581652, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
Retain: False
Received message: {"Timestamp": 1719581692, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
AWS IoTのコーンソールでもretain messageが確認できます。最後にpubした時刻のmessageに書き換えられています。
will
あらかじめ「異常を知らせるmessage」をAWS IoTに持たせておいてtimeoutしたら指定のtopicに「異常を知らせるmessage」をpubする機能です。
設備やプロセスの状態はshadowで監視、回線が接続出来てるかはwillで監視、という使い分けができると思います。
MQTT の Last Will and Testament (LWT) メッセージ
こちらもまず公式の説明を確認したいと思います。
Last Will and Testament (LWT) は MQTT の機能です。LWT を使用すると、クライアントはブローカーがクライアント定義のトピックに発行し、開始されていない切断が発生したときにそのトピックをサブスクライブしているすべてのクライアントに送信するメッセージを指定できます。クライアントが指定するメッセージは LWT メッセージまたは Will メッセージと呼ばれ、クライアントが定義するトピックは Will トピックと呼ばれます。デバイスがブローカーに接続するときに LWT メッセージを指定できます。これらのメッセージは、接続中に Connect Flag bits フィールドで Will Retain フラグを設定することで保持できます。例えば、Will Retain フラグが 1 に設定されている場合、Will メッセージはブローカーの関連する Will トピックに保存されます。詳細については、「Will メッセージ」を参照してください。
ブローカーは、開始されていない切断が発生するまで Will メッセージを保存します。その場合、ブローカーは Will トピックにサブスクライブしているすべてのクライアントにメッセージを発行して切断を通知します。MQTT DISCONNECT メッセージを使用してクライアントが開始した切断により、クライアントがブローカーから切断した場合、ブローカーは保存されている LWT メッセージを発行しません。それ以外の場合は、すべて LWT メッセージが送信されます。ブローカーが LWT メッセージを送信するときの切断シナリオの完全なリストについては、「接続/切断イベント」を参照してください。
設定方法
MQTT接続メソッド(mqtt_init())の中にwillを設定するclient.will_set()メソッドを追加します。呼び出すclient.will_set()にretain flagを付けることでwill messageを保持することもできます。
Client.will_set()
クライアントが予期せず切断された場合にブローカーが送信する Will を設定します。効果を得るには、connect() の前にこれを呼び出す必要があります。
WILL_TOPIC = f"MQTTServer/{hostname}/will"
.
.
client.will_set(WILL_TOPIC, json.dumps({"Hostname" : hostname, "status": "Communication lost"}), qos=1)
WILL_TOPIC = f"MQTTServer/{hostname}/will"
.
.
client.will_set(WILL_TOPIC, json.dumps({"Hostname" : hostname, "status": "Communication lost"}), qos=1, retain=True)
今までの動作確認では1回通信が終わると切断してきましたが、異常切断を判定してwillをpubさせるためにmain関数をloop設定します。
if __name__ == '__main__':
clientid = f"{hostname}"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
state = state_ng
topic = TOPIC
time.sleep(5)
try:
get_ssid()
client = mqtt_init(clientid)
mqtt_connect(client)
time.sleep(1)
mqtt_subscribe(client, topic)
time.sleep(1)
shadow_get(client)
time.sleep(1)
shadowdelta_get(client)
client.loop_start()
while True:
mqtt_publish(client, topic, msg)
shadow_update(client, state)
time.sleep(30)
except KeyboardInterrupt:
aws_disconnect(client)
time.sleep(3)
sys.exit()
実行結果
初めに今まで同様pubsubが正常に行われることを確認します。
確認出来たらi-net接続している回線を遮断します。
~ $ nmcli c
NAME UUID TYPE DEVICE
w0-inet 2969d6f3-b144-4c5f-8bda-4b3e607c4203 wifi wlan0
lo 361ab6ac-26eb-48e1-b2a7-22fd5934a6de loopback lo
e0-lo e6efbdcb-8856-45a9-992b-3770afe45ec9 ethernet eth0
~ $
# i-netにつないでいるwlan0を切断
~ $ sudo nmcli c down w0-inet
Connection 'w0-inet' successfully deactivated (D-Bus active path: /org/freedesktop/NetworkManager/ActiveConnection/4)
~ $ nmcli c
NAME UUID TYPE DEVICE
w0-inet 2969d6f3-b144-4c5f-8bda-4b3e607c4203 wifi --
lo 361ab6ac-26eb-48e1-b2a7-22fd5934a6de loopback lo
e0-lo e6efbdcb-8856-45a9-992b-3770afe45ec9 ethernet eth0
keepaliveを60秒に設定しているので、それ以降でwillがpubされるはずです。
1分30秒程度かかりましたが、AWS IoTのMQTTテストクライアントでwillがpubされてることを確認できました。
deltaと同様にトリガーとして自動実行ワークロードに組み込むことが可能です。メールのアラート通知にhost、device、retain messageを付けることやwillが発生したタイミングのlogだけまとめておくなどの使い方も出来そうです。
Demo code 全体
前回までのコードにretainとwillの設定を追加すると以下のようになります。
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import json
import ssl
import time
import subprocess
import paho.mqtt.client as mqtt
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'
device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT 接続ヨシッ👉"
desired_state = "ヨシッ👉"
def on_connect(client, userdata, flags, respons_code):
if respons_code != 0:
print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}")
print('Connected')
def on_disconnect(client, userdata, respons_code):
if respons_code != 0:
print(f"Unexpected disconnection.")
else:
print(f"Disconnected successfully.")
def on_message(client, userdata, msg):
print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")
return
def on_shadow_update(client, userdata, msg):
payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
print("Shadow Update detected: " + payload_decoded)
def on_shadowdelta_update(client, userdata, msg):
payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
print("Shadow Delta Update detected: " + payload_decoded)
def get_ssid():
cmd = 'iwconfig wlan0|grep ESSID'
r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip()
idx = r.find('ESSID:')
if r[idx + 7:-1] == "ff/an":
print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}")
time.sleep(120)
subprocess.call(["sudo", "reboot"])
def mqtt_init(clientId):
try:
client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311)
client.tls_set(
ca,
certfile=cert,
keyfile=key,
tls_version=ssl.PROTOCOL_TLSv1_2)
client.tls_insecure_set(True)
# retainありでwillを設定
client.will_set(WILL_TOPIC, json.dumps({"Hostname" : hostname, "status": "Communication lost"}), qos=1, retain=True)
except Exception as e:
print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}")
return client
def mqtt_connect(client):
client.on_connect = on_connect
client.on_message = on_message
client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents", on_shadow_update)
client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta", on_shadowdelta_update)
client.connect(endpoint, port, keepalive=60)
time.sleep(1)
def mqtt_disconnect(client):
client.on_disconnect = on_disconnect
client.disconnect()
client.loop_stop()
def mqtt_publish(client, msg):
data = {}
_topic = topic + "/" + str(device_no)
print("Publishing to topic:", _topic)
data['Timestamp'] = int(time.time())
data['hostname'] = os.uname()[1]
data['device_no'] = device_no
data['msg'] = msg
# retain=Trueを設定
client.publish(_topic, json.dumps(data, default=json_serial), qos=1, retain=True)
print(_topic, data)
return
def mqtt_subscribe(client):
_topic = topic + "/" + str(device_no)
print("Subscribing to topic:", _topic)
client.subscribe(_topic, qos=1)
return
def json_serial(para):
return para.isoformat()
def shadow_update(client, state):
topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update"
print(topic)
payload = {
"state": {
"desired": {
"device_no": device_no,
"state": desired_state
},
"reported": {
"device_no": device_no,
"state": state
}
}
}
client.publish(topic, json.dumps(payload), qos=1)
print(f"Shadow update sent: {payload}")
def shadow_get(client):
shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents"
print("Subscribing to shadow update topic:", shadow_update_topic)
client.subscribe(shadow_update_topic, qos=1)
def shadowdelta_get(client):
shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta"
print("Subscribing to shadow delta update topic:", shadow_update_topic)
client.subscribe(shadow_update_topic, qos=1)
if __name__ == '__main__':
clientid = f"{hostname}"
state_ok = "ヨシッ👉"
state_ng = "ダメ🙅"
time.sleep(5)
try:
get_ssid()
client = mqtt_init(clientid)
mqtt_connect(client)
subscribe_pub(client)
time.sleep(1)
shadow_get(client)
time.sleep(1)
shadowdelta_get(client)
time.sleep(1)
client.loop_start()
while True:
publish_pub(client, msg)
shadow_update(client, state_ok)
time.sleep(30)
except KeyboardInterrupt:
aws_disconnect(client)
time.sleep(3)
sys.exit()
次回
次回からAWS IoT Device SDK v2 for Python編に入ります。
shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑥awsiotsdkv2でpubsub