LoginSignup
0
0
お題は不問!Qiita Engineer Festa 2024で記事投稿!
Qiita Engineer Festa20242024年7月17日まで開催中!

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑥awsiotsdkv2でpubsub

Last updated at Posted at 2024-06-30

この記事について

shadowとwillとretainを活用してAWS IoT CoreとPubSubする②AWSリソースの構築で構築した環境を使ったラズパイGW broker <-> AWS IoT間でのMQTT通信、ここから3回はAWS IoT Device SDK v2 for Pythonでmessageのpubsub、shadowのupdate/get、retainとwillの設定をします。

今回はラズパイに接続された仮想のdevice_0のシグナルをAWS IoTとpubsubします。


ライブラリー

GitHub

GitHub: aws/aws-iot-device-sdk-python-v2
GitHub: awslabs/aws-crt-python

PyPi

pypi: awsiotsdk 1.21.5

APIリファレンス

Python 用 AWS IoT デバイス SDK v2

開発者ガイド

AWS IoT Device SDKs

callback

awsiotとawscrtで準備されているcallback関数のうち今回のpubsubでは6種類を設定します。

on_connection_interrupted

Callback invoked whenever the MQTT connection is lost.
The MQTT client will automatically attempt to reconnect.
The function should take the following arguments return nothing

接続が失われたときに再接続すると呼び出されるcallback関数です。MQTT接続のクライアントobjectを作成する時に呼び出しておきます。

def on_connection_interrupted(connection, error, **kwargs):
    print(f"Connection interrupted. error: {error}")

on_connection_resumed

Callback invoked whenever the MQTT connection
is automatically resumed. Function should take the following arguments and return nothing

再接続時に自動的にMQTTを再開するたびに呼び出されるcallback関数です。MQTT接続のクライアントobjectを作成する時に呼び出しておきます。

def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print(f"Connection resumed. return_code: {return_code} session_present: {return_code}")

on_message

callback: Callback to invoke when message received, or None to disable.
Function should take the following arguments and return nothing

sub時に呼び出されるcallback関数です。sub時のpayloadやstatusを読み出すのに使用します。

def on_message(topic, payload, dup, qos, retain, **kwargs):
    payload_str = payload.decode('utf-8')
    payload_dict = json.loads(payload_str)
    print(f"on_message_received payload dict: {payload_dict}")

on_connection_success

Optional callback invoked whenever the connection successfully connects.
The function should take the following arguments and return nothing

MQTT接続が成功するたびに呼び出されるcallback関数です。MQTT接続のクライアントのインスタンスを作成する時に呼び出しておきます。Optionalと書かれていますが動作を見るために入れておきます。

def on_connection_success(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
    print(f"Connection Successful with return code: {callback_data.return_code} session present: {callback_data.session_present}")

on_connection_failure

Optional callback invoked whenever the connection fails to connect.
The function should take the following arguments and return nothing

MQTT接続が失敗するたびに呼び出されるcallback関数です。MQTT接続のクライアントのインスタンスを作成する時に呼び出しておきます。Optionalと書かれていますが動作を見るために入れておきます。

def on_connection_failure(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionFailureData)
    print(f"Connection failed with error code: {callback_data.error}")
    print(f"Error details: {callback_data}")

on_connection_closed

Optional callback invoked whenever the connection has been disconnected and shutdown successfully.
The function should take the following arguments and return nothing

MQTT接続が正常に切断・終了すると呼び出されるcallback関数です。MQTT接続のクライアントobjectを作成する時に呼び出しておきます。Optionalと書かれていますが動作を見るために入れておきます。

def on_connection_closed(connection, callback_data):
    print("Connection closed")

customメソッド

自分使い用にまとめ直したメソッドに再構築して使います。今回新たに作るメソッド以下です。

MQTTクライアント作成

一意のclient idでAWS IoTに接続するクライアントobjectを作成してます。
ここで使うawsiot.mqtt_connection_builder.mtls_from_path()ではクライアントをインスタンス化する際に、paho-mqttと以下の点が異なります。

  • endpointとportも紐づける
  • 接続に関わる5種類のcallbackを呼び出せる
def mqtt_init():
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        port=port,
        cert_filepath=cert,
        pri_key_filepath=key,
        ca_filepath=ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=clientid,
        clean_session=False,
        keep_alive_secs=30,
        on_connection_success=on_connection_success,
        on_connection_failure=on_connection_failure,
        on_connection_closed=on_connection_closed
        )
    print("Connecting to endpoint with client ID")
    return mqtt_connection

MQTT接続

mqtt_init()で作成したクライアントでAWS IoTに接続します。

def mqtt_connect(mqtt_connection):
    connect_future = mqtt_connection.connect()
    connect_future.result()

切断

client idによる接続をdisconnectメソッドで切断します。

def mqtt_disconnoect(mqtt_connection):
    print("Disconnecting...")
    disconnect_future = mqtt_connection.disconnect()
    disconnect_future.result()

topic発行(publish)

仮想のホスト名:MQTTServer1のラズパイGW/brokerから、device_0のシグナル:msgを以下のメソッドでAWS IoTにpubします。

def mqtt_publish(mqtt_connection, topic, msg):
    data = {}  
    print("topic:", topic)
    data['Timestamp'] = int(time.time())
    data['hostname'] = hostname
    data['device_no'] = device_no
    data['msg'] = msg
    message_json = json.dumps(data)
    mqtt_connection.publish(
        topic=topic,
        payload=message_json,
        qos=mqtt.QoS.AT_LEAST_ONCE,
    )
    print(f"published message payload dict: {data}")
    time.sleep(1)

topic購読(subcrribe)

以下のメソッドで自分が接続しているMQTTネットワーク内でtopicにpubされたメッセージをsubします。

def mqtt_subscribe(mqtt_connection, topic):
    print(f"Subscribing to topic '{topic}'...")
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message)
    subscribe_result = subscribe_future.result()
    print(f"Subscribed with {subscribe_result}")
    time.sleep(1)   

ラズパイGW <-> AWS IoT でpubsub

AWS IoTとport 8883で接続するための認証情報とラズパイの識別子(client id=hostname)を準備します。

topicはpahoの説明と同様のものとします。

接続情報
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'

device_no = 0
topic = f'MQTTServer/MQTTServer1'

device_0のシグナルもこれで行きますヨシッ👉

デバイスシグナル
msg = "AWS IoT Device SDK v2接続ヨシッ👉"

ここまでで準備した認証情報とメソッドを組み合わせるとラズパイとAWS IoTが接続されたNWでpubsubが可能になります。

以下を実行すると接続後にsubscriberが待ち受けした状態となり、pubされた後に自分自身でもmessageをsubして回線を切断します。

上手くいかないときは1~3秒くらいのtime.sleepを挟むとpubsubが出来ると思います。

clientid = f"{hostname}"
topic = TOPIC

#クライアント作成
mqtt_connection = mqtt_init()
#client idで接続
mqtt_connect(mqtt_connection)
#sub開始
mqtt_subscribe(mqtt_connection, topic)

try:
    #message発行
    mqtt_publish(mqtt_connection, topic, msg)

except KeyboardInterrupt:
    print("Waiting for shadow updates...")
    mqtt_disconnoect(mqtt_connection)

Demo code 全体

コードを全てつなげると以下のようになります。

pubsub_awssdk.py
#!/usr/bin/python
# -*- coding: utf-8 -*-
from awscrt import mqtt
from awsiot import mqtt_connection_builder, iotshadow
import os
import time
import json


endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'

device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT Device SDK v2接続ヨシッ👉"


def on_connection_interrupted(connection, error, **kwargs):
    print(f"Connection interrupted. error: {error}")

def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print(f"Connection resumed. return_code: {return_code} session_present: {return_code}")

def on_message(topic, payload, dup, qos, retain, **kwargs):
    payload_str = payload.decode('utf-8')
    payload_dict = json.loads(payload_str)
    print(f"on_message payload dict: {payload_dict}")
    return

def on_connection_success(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
    print(f"Connection Successful with return code: {callback_data.return_code} session present: {callback_data.session_present}")

def on_connection_failure(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionFailureData)
    print(f"Connection failed with error code: {callback_data.error}")
    print(f"Error details: {callback_data}")
    
def on_connection_closed(connection, callback_data):
    print("Connection closed")

def mqtt_init():
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=endpoint,
        port=port,
        cert_filepath=cert,
        pri_key_filepath=key,
        ca_filepath=ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=clientid,
        clean_session=False,
        keep_alive_secs=30,
        on_connection_success=on_connection_success,
        on_connection_failure=on_connection_failure,
        on_connection_closed=on_connection_closed
        )
    print("Connecting to endpoint with client ID")
    return mqtt_connection

def mqtt_connect(mqtt_connection):
    connect_future = mqtt_connection.connect()
    connect_future.result()

def mqtt_disconnoect(mqtt_connection):
    print("Disconnecting...")
    disconnect_future = mqtt_connection.disconnect()
    disconnect_future.result()

def mqtt_publish(mqtt_connection, topic, msg):
    data = {}  
    print("topic:", topic)
    data['Timestamp'] = int(time.time())
    data['hostname'] = hostname
    data['device_no'] = device_no
    data['msg'] = msg
    message_json = json.dumps(data)
    mqtt_connection.publish(
        topic=topic,
        payload=message_json,
        qos=mqtt.QoS.AT_LEAST_ONCE,
    )
    print(f"published message payload dict: {data}")
    time.sleep(1)

def mqtt_subscribe(mqtt_connection, topic):
    print(f"Subscribing to topic '{topic}'...")
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=topic,
        qos=mqtt.QoS.AT_LEAST_ONCE,
        callback=on_message)
    subscribe_result = subscribe_future.result()
    print(f"Subscribed with {subscribe_result}")
    time.sleep(1)    


if __name__ == '__main__':
    clientid = f"{hostname}"
    topic = TOPIC
    
    mqtt_connection = mqtt_init()
    mqtt_connect(mqtt_connection)
    mqtt_subscribe(mqtt_connection, topic)
    
    try:
        mqtt_publish(mqtt_connection, topic, msg)    
    except KeyboardInterrupt:
        print("Waiting for shadow updates...")
        mqtt_disconnoect(mqtt_connection)

実行結果

~ $ python pubsub_sdk.py
Connecting to endpoint with client ID
Connection Successful with return code: 0 session present: True
Subscribing to topic 'MQTTServer/MQTTServer1/0'...
Subscribed with {'packet_id': 1, 'topic': 'MQTTServer/MQTTServer1/0', 'qos': <QoS.AT_LEAST_ONCE: 1>}
topic: MQTTServer/MQTTServer1/0
published message payload dict: {'Timestamp': 1719715304, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
on_message payload dict: {'Timestamp': 1719715304, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
Disconnecting...
Connection closed
~ $ 

AWS IoTのテストクライアント上でもsubscribeを確認できました。


次回

shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑦awsiotsdkv2でshadow update

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0