0
0
お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑧awsiotsdkv2でretainとwill

Last updated at Posted at 2024-06-30

この記事について

AWS IoT Device SDK v2 for Pythonでmessageのpubsubをしたコードにretainとwillの設定を追加します。

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑤pahoでretainとwillで詳細に説明した内容は割愛します。

retain

AWS IoTでsubした最新messageを、S3やDynamoDBに保存するのではなくIoT Coreで保持します。

設定方法

topic発行(publish)メソッドの中で呼び出すmqtt_connection.publish()にretain flagを付けることでAWS IoTに最後のmessageを保持することができます。

retainなし(前回までのコード)
    mqtt_connection.publish(
        topic=topic,
        payload=message_json,
        qos=mqtt.QoS.AT_LEAST_ONCE
    )
retainあり
    mqtt_connection.publish(
        topic=topic,
        payload=message_json,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        retain=True
    )

実行結果

前回までのコードのmqtt_connect()の中のpublish()でretainありなしの違いを見ます。

retainなし

まず前回までのコードに、on_message()のmsg.retain出力だけ追加して実行します。

published message payload dict: {'Timestamp': 1719748369, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
Retain : False
on_message payload dict: {'Timestamp': 1719748369, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}

retain flagはFalseなのでsubしたmessageがretain messageでは無いことが分かります。

AWS IoTのコーンソールでもretain messageは確認できません。

retainあり

retainありを設定したコードを実行します。

一度目のpubでは、subしたmessageにretainなしと同じレスポンスが戻ってきます。この時点でAWS IoT側にretain messageは保持されていないので正常なレスポンスです。

topic: MQTTServer/MQTTServer1/0
published message payload dict: {'Timestamp': 1719749715, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
Retain : False
on_message payload dict: {'Timestamp': 1719749715, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}

一度disconnectして再度接続するとmessageを2つsubします。
片方はRetain: Trueとなっているのが分かります。

Timestampを比べると、前回のpubメッセージと今回のRetain: Trueのsub messageの時刻が同じであることを確認できます。

Subscribing to topic 'MQTTServer/MQTTServer1/0'...
<QoS.AT_LEAST_ONCE: 1>}
Retain : True
on_message payload dict: {'Timestamp': 1719749715, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
topic: MQTTServer/MQTTServer1/0
published message payload dict: {'Timestamp': 1719749726, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
Retain : False
on_message payload dict: {'Timestamp': 1719749726, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}

AWS IoTのコーンソールでもretain messageが確認できます。最後にpubした時刻のmessageに書き換えられています。

will

あらかじめ「異常を知らせるmessage」をAWS IoTに持たせておいてtimeoutしたら指定のtopicに「異常を知らせるmessage」をpubする機能です。

設定方法

MQTT接続メソッド(mqtt_init())の中にwillを設定するpropertyを追加します。

引数の渡し方以外にpahoと異なることとして、AWS IoT Device SDK v2 for Pythonではwill messageにretain flagを付けることが出来ません。

なのでretain=Falseがmustです。

LWT (Will) with retain=True causes AWS_ERROR_MQTT_UNEXPECTED_HANGUP
How to call MQTT Last Will and Testament

retainなし
mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        port=port,
        cert_filepath=cert,
        pri_key_filepath=key,
        ca_filepath=ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=clientid,
        clean_session=False,
        keep_alive_secs=30,
        on_connection_success=on_connection_success,
        on_connection_failure=on_connection_failure,
        on_connection_closed=on_connection_closed
        )
retainあり
WILL_PAYLOAD = json.dumps({
                            "hostname" : hostname,
                            "device_no" : device_no,
                            "error_status" : "The connection was closed unexpectedly"
                        }).encode('utf-8')

mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        port=port,
        cert_filepath=cert,
        pri_key_filepath=key,
        ca_filepath=ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=clientid,
        clean_session=False,
        keep_alive_secs=30,
        will=mqtt.Will(
            topic=WILL_TOPIC,
           qos=mqtt.QoS.AT_LEAST_ONCE,
            payload=WILL_PAYLOAD,
            retain=False
            ),
        on_connection_success=on_connection_success,
        on_connection_failure=on_connection_failure,
        on_connection_closed=on_connection_closed
        )

今までの動作確認では1回通信が終わると切断してきましたが、異常切断を判定してwillをpubさせるためにmain関数をloop設定します。

if __name__ == '__main__':
    clientid = f"{hostname}"
    state_ok = "ヨシッ👉"
    state_ng = "ダメ🙅"
    topic = TOPIC
    state = state_ng

    mqtt_connection = mqtt_init()
    mqtt_connect(mqtt_connection)
    shadow = iotshadow.IotShadowClient(mqtt_connection)

    mqtt_subscribe(mqtt_connection, topic)
    shadow_get(shadow)
    shadowdelta_get(shadow)
 
    try:
        while True:
            mqtt_publish(mqtt_connection, topic, msg)
            shadow_update(shadow, state)
            shadow_get_request(shadow)

            time.sleep(30)

    except KeyboardInterrupt:
        print("Waiting for shadow updates...")
        mqtt_disconnoect(mqtt_connection)

実行結果

初めに今まで同様pubsubが正常に行われることを確認します。
確認出来たらi-net接続している回線を遮断します。

~ $ nmcli c
NAME     UUID                                  TYPE      DEVICE
w0-inet  2969d6f3-b144-4c5f-8bda-4b3e607c4203  wifi      wlan0
lo       d851e136-18e8-43cc-b010-d682e8ec2674  loopback  lo
e0-lo    e6efbdcb-8856-45a9-992b-3770afe45ec9  ethernet  eth0
~ $ 
# i-netにつないでいるwlan0を切断
~ $ sudo nmcli c down w0-inet
Connection 'w0-inet' successfully deactivated (D-Bus active path: /org/freedesktop/NetworkManager/ActiveConnection/9)
~ $ nmcli c
NAME     UUID                                  TYPE      DEVICE
lo       d851e136-18e8-43cc-b010-d682e8ec2674  loopback  lo
e0-lo    e6efbdcb-8856-45a9-992b-3770afe45ec9  ethernet  eth0
w0-inet  2969d6f3-b144-4c5f-8bda-4b3e607c4203  wifi      --

前回長かったのでkeepaliveを30秒に設定しました()

1分以内にAWS IoTのMQTTテストクライアントでwillがpubされてることを確認できました。

deltaと同様にトリガーとして自動実行ワークロードに組み込むことが可能です。

Demo code 全体

前回までのコードにretainとwillの設定を追加すると以下のようになります。

pubsub_awssdk.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from awscrt import mqtt
from awsiot import mqtt_connection_builder, iotshadow
import os
import time
import json


endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'

device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT Device SDK v2接続ヨシッ👉"


def on_connection_interrupted(connection, error, **kwargs):
    print(f"Connection interrupted. error: {error}")

def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print(f"Connection resumed. return_code: {return_code} session_present: {return_code}")

def on_message(topic, payload, dup, qos, retain, **kwargs):
    payload_str = payload.decode('utf-8')
    payload_dict = json.loads(payload_str)
    print(f"on_message payload dict: {payload_dict}")
    return

def on_connection_success(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
    print(f"Connection Successful with return code: {callback_data.return_code} session present: {callback_data.session_present}")

def on_connection_failure(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionFailureData)
    print(f"Connection failed with error code: {callback_data.error}")
    print(f"Error details: {callback_data}")
    
def on_connection_closed(connection, callback_data):
    print("Connection closed")

def on_update_shadow_accepted(response):
    try:
        if hasattr(response, 'state'):
            print("shadow_on_update_accepted")
            print(f"Update state: {response.state}")
        if hasattr(response, 'metadata'):
            print(f"Metadata: {response.metadata}")
    except Exception as e:
        print(f"Error processing update shadow response: {e}")

def on_shadow_update_rejected(error):
    print("Shadow Update Rejected:")
    print(error)

def on_get_shadow_accepted(response):
    try:
        if hasattr(response, 'state'):
            print("shadow_on_get_accepted")
            print(f"Update state: {response.state}")
        if hasattr(response, 'metadata'):
            print(f"Metadata: {response.metadata}")
    except Exception as e:
        print(f"Error processing get shadow response: {e}")

def on_shadow_get_rejected(error):
    print("Shadow Get Rejected:")
    print(error)

def on_shadow_delta_updated(delta):
    try:
        print(f"Received shadow delta event::  Delta State: {delta.state}")
    except Exception as e:
        print(f"Error processing shadow delta: {e}")

def mqtt_init():
    WILL_PAYLOAD = json.dumps({
                                "hostname" : hostname,
                                "device_no" : device_no,
                                "error_status" : "The connection was closed unexpectedly"
                            }).encode('utf-8')

    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        port=port,
        cert_filepath=cert,
        pri_key_filepath=key,
        ca_filepath=ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=clientid,
        clean_session=False,
        keep_alive_secs=30,
        will=mqtt.Will(
            topic=WILL_TOPIC,
           qos=mqtt.QoS.AT_LEAST_ONCE,
            payload=WILL_PAYLOAD,
            retain=False
            ),
        on_connection_success=on_connection_success,
        on_connection_failure=on_connection_failure,
        on_connection_closed=on_connection_closed
        )
    print("Connecting to endpoint with client ID")
    return mqtt_connection

def mqtt_connect(mqtt_connection):
    connect_future = mqtt_connection.connect()
    connect_future.result()

def mqtt_disconnoect(mqtt_connection):
    print("Disconnecting...")
    disconnect_future = mqtt_connection.disconnect()
    disconnect_future.result()

def mqtt_publish(mqtt_connection, topic, msg):
    data = {}  
    print("topic:", topic)
    data['Timestamp'] = int(time.time())
    data['hostname'] = hostname
    data['device_no'] = device_no
    data['msg'] = msg
    message_json = json.dumps(data)
    mqtt_connection.publish(
        topic=topic,
        payload=message_json,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        retain=True
    )
    print(f"published message payload dict: {data}")
    time.sleep(1)

def mqtt_subscribe(mqtt_connection, topic):
    print(f"Subscribing to topic '{topic}'...")
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message)
    subscribe_result = subscribe_future.result()
    print(f"Subscribed with {subscribe_result}")
    time.sleep(1)    

def shadow_update(shadow, state):
    update_shadow_future = shadow.publish_update_named_shadow(
        request=iotshadow.UpdateNamedShadowRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
            state=iotshadow.ShadowState(
                # デバイスの現在の状態
                reported  = {
                    "device_no": device_no,
                    "state": state
                },
                # デバイスが達成すべき目標の状態
                desired =  {
                    "device_no": device_no,
                    "state": desired_state
                },
            ),
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE
    )
    update_shadow_future.result()
    print("Shadow updated!")

def shadow_get_request(shadow):
    # Getを送信する
    get_shadow_future = shadow.publish_get_named_shadow(
        request=iotshadow.GetNamedShadowRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
    )
    get_shadow_future.result()

def shadow_get(shadow):
    # Updateの成功をサブスクライブする
    update_accepted_future, _ = shadow.subscribe_to_update_named_shadow_accepted(
        request=iotshadow.UpdateNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_update_shadow_accepted,
    )
    update_accepted_future.result()
    print("Subscribed to update shadow accepted")

    # Updateの失敗をサブスクライブする
    update_rejected_future, _ = shadow.subscribe_to_update_named_shadow_rejected(
        request=iotshadow.UpdateNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_shadow_update_rejected,
    )
    update_rejected_future.result()
    print("Subscribed to update shadow rejected")

    # Getの成功をサブスクライブする
    get_accepted_future, _ = shadow.subscribe_to_get_named_shadow_accepted(
        request=iotshadow.GetNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_get_shadow_accepted,
    )
    get_accepted_future.result()
    print("Subscribed to get shadow accepted")

    # Getの失敗をサブスクライブする
    get_rejected_future, _ = shadow.subscribe_to_get_named_shadow_rejected(
        request=iotshadow.GetNamedShadowSubscriptionRequest(
            thing_name = hostname,
            shadow_name = SHADOWNAME,
        ),
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_shadow_get_rejected,
    )
    get_rejected_future.result()
    print("Subscribed to get shadow rejected")

def shadowdelta_get(shadow):
    try:
        delta_subscribed_future, _ = shadow.subscribe_to_named_shadow_delta_updated_events(
            request=iotshadow.NamedShadowDeltaUpdatedSubscriptionRequest(
                thing_name=hostname,
                shadow_name=SHADOWNAME,
            ),
            qos=mqtt.QoS.AT_LEAST_ONCE,
            callback=on_shadow_delta_updated
        )
        delta_subscribed_future.result()
        print("Subscribed to shadow delta updates")
    except Exception as e:
        print(f"Error subscribing to shadow delta updates: {e}")


if __name__ == '__main__':
    clientid = f"{hostname}"
    state_ok = "ヨシッ👉"
    state_ng = "ダメ🙅"
    topic = TOPIC
    state = state_ng

    mqtt_connection = mqtt_init()
    mqtt_connect(mqtt_connection)
    shadow = iotshadow.IotShadowClient(mqtt_connection)

    mqtt_subscribe(mqtt_connection, topic)
    shadow_get(shadow)
    shadowdelta_get(shadow)
 
    try:
        while True:
            mqtt_publish(mqtt_connection, topic, msg)
            shadow_update(shadow, state)
            shadow_get_request(shadow)

            time.sleep(30)

    except KeyboardInterrupt:
        print("Waiting for shadow updates...")
        mqtt_disconnoect(mqtt_connection)

次回

次回からmosquitto編に入ります。

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑨mosquittoでpubsub

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0