LoginSignup
0
0
お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑤pahoでretainとwill

Last updated at Posted at 2024-06-28

この記事について

shadowとwillとretainを活用してAWS IoT CoreとPubSubする④pahoでshadow updateまでに実装したmessageのpubsub、shadowにretainとwillの設定を追加します。

retain

AWS IoTでsubした最新messageを、S3やDynamoDBに保存するのではなくIoT Coreで保持します。

保持された MQTT メッセージの使用例

意訳せずに素直に公式の説明を引用したいと思います。

初期設定メッセージとしての使用

保持されたMQTTメッセージは、クライアントがトピックにサブスクライブした後、クライアントに送信されます。トピックを購読しているすべてのクライアントに、購読後すぐに MQTT 保持メッセージを受信させたい場合は、RETAIN フラグを設定した設定メッセージを公開できます。サブスクライブしているクライアントはまた、新しい設定メッセージが発行されるたびに、その設定に対する更新が受信できます。

最新のメッセージとして

デバイスは、 AWS IoT Core が現在の状態メッセージを保存するように、それらにRETAIN フラッグを設定する事ができます。アプリケーションが接続または再接続すると、保持メッセージトピックを購読した直後に、このトピックを購読して、最後に報告された状態を取得できます。こうすることで、現在の状態を確認するために、デバイスからの次のメッセージを待つ必要がなくなります。

設定方法

topic発行(publish)メソッドの中で呼び出すclient.publish()にretain flagを付けることでAWS IoTに最後のmessageを保持することができます。

retainなし(前回までのコード)
client.publish(_topic, json.dumps(data, default=json_serial), qos=1)
retainあり
client.publish(_topic, json.dumps(data, default=json_serial), qos=1, retain=True)

もう一つ確認用にcallbackのon_message()に一文追加します。
on_message()のmsgはtopic、payload、qos、retainをメンバーとするクラスなのでmsg.retainでsubするmessageがretainか確認できます。

oo_message()
def on_message(client, userdata, msg):
    # retain属性の表示を出力に追加
    print(f"Retain: {msg.retain}")
    print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")
    return

実行結果

前回までのコードのmqtt_connect()の中のpublish()でretainありなしの違いを見ます。

retainなし

まず前回までのコードに、on_message()のmsg.retain出力だけ追加して実行します。

Message pblished : {'Timestamp': 1719581150, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Retain: False
Received message: {"Timestamp": 1719581150, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}

retain flagはFalseなのでsubしたmessageがretain messageでは無いことが分かります。

AWS IoTのコーンソールでもretain messageは確認できません。

retainあり

retainありを設定したコードを実行します。

一度目のpubでは、subしたmessageにretainなしと同じレスポンスが戻ってきます。この時点でAWS IoT側にretain messageは保持されていないので正常なレスポンスです。

Message pblished : {'Timestamp': 1719581652, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Retain: False
Received message: {"Timestamp": 1719581652, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
on topic: MQTTServer/MQTTServer1/0

一度disconnectして再度接続するとmessageを2つsubします。
片方はRetain: Trueとなっているのが分かります。

Timestampを比べると、前回のpubメッセージと今回のRetain: Trueのsub messageの時刻が同じであることを確認できます。

Message pblished : {'Timestamp': 1719581692, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT 接続ヨシッ👉'}
Retain: True
Received message: {"Timestamp": 1719581652, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}
Retain: False
Received message: {"Timestamp": 1719581692, "hostname": "MQTTServer1", "device_no": 0, "msg": "AWS IoT 接続ヨシッ👉"}

AWS IoTのコーンソールでもretain messageが確認できます。最後にpubした時刻のmessageに書き換えられています。

will

あらかじめ「異常を知らせるmessage」をAWS IoTに持たせておいてtimeoutしたら指定のtopicに「異常を知らせるmessage」をpubする機能です。

設備やプロセスの状態はshadowで監視、回線が接続出来てるかはwillで監視、という使い分けができると思います。

MQTT の Last Will and Testament (LWT) メッセージ

こちらもまず公式の説明を確認したいと思います。

Last Will and Testament (LWT) は MQTT の機能です。LWT を使用すると、クライアントはブローカーがクライアント定義のトピックに発行し、開始されていない切断が発生したときにそのトピックをサブスクライブしているすべてのクライアントに送信するメッセージを指定できます。クライアントが指定するメッセージは LWT メッセージまたは Will メッセージと呼ばれ、クライアントが定義するトピックは Will トピックと呼ばれます。デバイスがブローカーに接続するときに LWT メッセージを指定できます。これらのメッセージは、接続中に Connect Flag bits フィールドで Will Retain フラグを設定することで保持できます。例えば、Will Retain フラグが 1 に設定されている場合、Will メッセージはブローカーの関連する Will トピックに保存されます。詳細については、「Will メッセージ」を参照してください。

ブローカーは、開始されていない切断が発生するまで Will メッセージを保存します。その場合、ブローカーは Will トピックにサブスクライブしているすべてのクライアントにメッセージを発行して切断を通知します。MQTT DISCONNECT メッセージを使用してクライアントが開始した切断により、クライアントがブローカーから切断した場合、ブローカーは保存されている LWT メッセージを発行しません。それ以外の場合は、すべて LWT メッセージが送信されます。ブローカーが LWT メッセージを送信するときの切断シナリオの完全なリストについては、「接続/切断イベント」を参照してください。

設定方法

MQTT接続メソッド(mqtt_init())の中にwillを設定するclient.will_set()メソッドを追加します。呼び出すclient.will_set()にretain flagを付けることでwill messageを保持することもできます。

Client.will_set()

クライアントが予期せず切断された場合にブローカーが送信する Will を設定します。効果を得るには、connect() の前にこれを呼び出す必要があります。

retainなし
WILL_TOPIC = f"MQTTServer/{hostname}/will"
.
.
client.will_set(WILL_TOPIC, json.dumps({"Hostname" : hostname, "status": "Communication lost"}), qos=1)
retainあり
WILL_TOPIC = f"MQTTServer/{hostname}/will"
.
.
client.will_set(WILL_TOPIC, json.dumps({"Hostname" : hostname, "status": "Communication lost"}), qos=1, retain=True)

今までの動作確認では1回通信が終わると切断してきましたが、異常切断を判定してwillをpubさせるためにmain関数をloop設定します。

if __name__ == '__main__':
    clientid = f"{hostname}"
    state_ok = "ヨシッ👉"
    state_ng = "ダメ🙅"
    state = state_ng
    topic = TOPIC
    time.sleep(5)
    try:
        get_ssid()
        client = mqtt_init(clientid)
        mqtt_connect(client)
        time.sleep(1)
        mqtt_subscribe(client, topic)
        time.sleep(1)
        shadow_get(client)
        time.sleep(1)
        shadowdelta_get(client)
        client.loop_start()

        while True:
            mqtt_publish(client, topic, msg)
            shadow_update(client, state)

            time.sleep(30)

    except KeyboardInterrupt:
        aws_disconnect(client)
        time.sleep(3)
        sys.exit()

実行結果

初めに今まで同様pubsubが正常に行われることを確認します。
確認出来たらi-net接続している回線を遮断します。

~ $ nmcli c
NAME     UUID                                  TYPE      DEVICE
w0-inet  2969d6f3-b144-4c5f-8bda-4b3e607c4203  wifi      wlan0
lo       361ab6ac-26eb-48e1-b2a7-22fd5934a6de  loopback  lo
e0-lo    e6efbdcb-8856-45a9-992b-3770afe45ec9  ethernet  eth0
~ $ 
# i-netにつないでいるwlan0を切断
~ $ sudo nmcli c down w0-inet
Connection 'w0-inet' successfully deactivated (D-Bus active path: /org/freedesktop/NetworkManager/ActiveConnection/4)
~ $ nmcli c
NAME     UUID                                  TYPE      DEVICE
w0-inet  2969d6f3-b144-4c5f-8bda-4b3e607c4203  wifi      --
lo       361ab6ac-26eb-48e1-b2a7-22fd5934a6de  loopback  lo
e0-lo    e6efbdcb-8856-45a9-992b-3770afe45ec9  ethernet  eth0

keepaliveを60秒に設定しているので、それ以降でwillがpubされるはずです。

1分30秒程度かかりましたが、AWS IoTのMQTTテストクライアントでwillがpubされてることを確認できました。

deltaと同様にトリガーとして自動実行ワークロードに組み込むことが可能です。メールのアラート通知にhost、device、retain messageを付けることやwillが発生したタイミングのlogだけまとめておくなどの使い方も出来そうです。

Demo code 全体

前回までのコードにretainとwillの設定を追加すると以下のようになります。

pubsub_shadow_paho.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
import os
import sys
import json
import ssl
import time
import subprocess
import paho.mqtt.client as mqtt


endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'

device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT 接続ヨシッ👉"
desired_state = "ヨシッ👉"

def on_connect(client, userdata, flags, respons_code):
    if respons_code != 0:
        print(f"Restart after 120 secs due to the connection cannot be established: respons_code: {respons_code} flags: {flags}")
    print('Connected')

def on_disconnect(client, userdata, respons_code):
    if respons_code != 0:
        print(f"Unexpected disconnection.")
    else:
        print(f"Disconnected successfully.")

def on_message(client, userdata, msg):
    print(f"Received message: {json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)}\non topic: {msg.topic}\nwith QoS: {msg.qos}")
    return

def on_shadow_update(client, userdata, msg):
    payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
    print("Shadow Update detected: " + payload_decoded)

def on_shadowdelta_update(client, userdata, msg):
    payload_decoded = json.dumps(json.loads(msg.payload.decode('utf-8')), ensure_ascii=False)
    print("Shadow Delta Update detected: " + payload_decoded)

def get_ssid():
    cmd = 'iwconfig wlan0|grep ESSID'
    r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE).stdout.decode().rstrip()
    idx = r.find('ESSID:')
    if r[idx + 7:-1] == "ff/an":
        print(f"Restart after 120 secs due to the connection cannot be determined: ESSID: {r[idx + 7:-1]}")
        time.sleep(120)
        subprocess.call(["sudo", "reboot"])

def mqtt_init(clientId):
    try:
        client = mqtt.Client(client_id=clientId, protocol=mqtt.MQTTv311)
        client.tls_set(
            ca,
            certfile=cert,
            keyfile=key,
            tls_version=ssl.PROTOCOL_TLSv1_2)
        client.tls_insecure_set(True)
        # retainありでwillを設定
        client.will_set(WILL_TOPIC, json.dumps({"Hostname" : hostname, "status": "Communication lost"}), qos=1, retain=True)
    except Exception as e:
        print(f"Restart after 120 secs due to launches an MQTT client and creates an object instance failed of: {e}")  
    return client

def mqtt_connect(client):
    client.on_connect = on_connect
    client.on_message = on_message
    client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents", on_shadow_update)
    client.message_callback_add(f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta", on_shadowdelta_update)
    client.connect(endpoint, port, keepalive=60)
    time.sleep(1)

def mqtt_disconnect(client):
    client.on_disconnect = on_disconnect
    client.disconnect()
    client.loop_stop()

def mqtt_publish(client, msg):
    data = {}
    _topic = topic + "/" + str(device_no)
    print("Publishing to topic:", _topic)
    data['Timestamp'] = int(time.time())
    data['hostname'] = os.uname()[1]
    data['device_no'] = device_no
    data['msg'] = msg
    # retain=Trueを設定
    client.publish(_topic, json.dumps(data, default=json_serial), qos=1, retain=True)
    print(_topic, data)
    return

def mqtt_subscribe(client):
    _topic = topic + "/" + str(device_no)
    print("Subscribing to topic:", _topic)
    client.subscribe(_topic, qos=1)
    return

def json_serial(para):
    return para.isoformat()

def shadow_update(client, state):
    topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update"
    print(topic)
    payload = {
        "state": {
            "desired": {
                "device_no": device_no,
                "state": desired_state
            },
            "reported": {
                "device_no": device_no,
                "state": state
            }
        }
    }
    client.publish(topic, json.dumps(payload), qos=1)
    print(f"Shadow update sent: {payload}")

def shadow_get(client):
    shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/documents"
    print("Subscribing to shadow update topic:", shadow_update_topic)
    client.subscribe(shadow_update_topic, qos=1)

def shadowdelta_get(client):
    shadow_update_topic = f"$aws/things/{hostname}/shadow/name/device_{device_no}/update/delta"
    print("Subscribing to shadow delta update topic:", shadow_update_topic)
    client.subscribe(shadow_update_topic, qos=1)


if __name__ == '__main__':
    clientid = f"{hostname}"
    state_ok = "ヨシッ👉"
    state_ng = "ダメ🙅"
    time.sleep(5)
    try:
        get_ssid()
        client = mqtt_init(clientid)
        mqtt_connect(client)
        subscribe_pub(client)
        time.sleep(1)
        shadow_get(client)
        time.sleep(1)
        shadowdelta_get(client)
        time.sleep(1)
        client.loop_start()

        while True:
            publish_pub(client, msg)
            shadow_update(client, state_ok)
            time.sleep(30)

    except KeyboardInterrupt:
        aws_disconnect(client)
        time.sleep(3)
        sys.exit()

次回

次回からAWS IoT Device SDK v2 for Python編に入ります。

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑥awsiotsdkv2でpubsub

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0