この記事について
shadowとwillとretainを活用してAWS IoT CoreとPubSubする②AWSリソースの構築で構築した環境を使ったラズパイGW broker <-> AWS IoT間でのMQTT通信、ここから3回はAWS IoT Device SDK v2 for Pythonでmessageのpubsub、shadowのupdate/get、retainとwillの設定をします。
今回はラズパイに接続された仮想のdevice_0のシグナルをAWS IoTとpubsubします。
ライブラリー
GitHub
GitHub: aws/aws-iot-device-sdk-python-v2
GitHub: awslabs/aws-crt-python
PyPi
APIリファレンス
開発者ガイド
callback
awsiotとawscrtで準備されているcallback関数のうち今回のpubsubでは6種類を設定します。
on_connection_interrupted
Callback invoked whenever the MQTT connection is lost.
The MQTT client will automatically attempt to reconnect.
The function should take the following arguments return nothing
接続が失われたときに再接続すると呼び出されるcallback関数です。MQTT接続のクライアントobjectを作成する時に呼び出しておきます。
def on_connection_interrupted(connection, error, **kwargs):
print(f"Connection interrupted. error: {error}")
on_connection_resumed
Callback invoked whenever the MQTT connection
is automatically resumed. Function should take the following arguments and return nothing
再接続時に自動的にMQTTを再開するたびに呼び出されるcallback関数です。MQTT接続のクライアントobjectを作成する時に呼び出しておきます。
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print(f"Connection resumed. return_code: {return_code} session_present: {return_code}")
on_message
callback: Callback to invoke when message received, or None to disable.
Function should take the following arguments and return nothing
sub時に呼び出されるcallback関数です。sub時のpayloadやstatusを読み出すのに使用します。
def on_message(topic, payload, dup, qos, retain, **kwargs):
payload_str = payload.decode('utf-8')
payload_dict = json.loads(payload_str)
print(f"on_message_received payload dict: {payload_dict}")
on_connection_success
Optional callback invoked whenever the connection successfully connects.
The function should take the following arguments and return nothing
MQTT接続が成功するたびに呼び出されるcallback関数です。MQTT接続のクライアントのインスタンスを作成する時に呼び出しておきます。Optionalと書かれていますが動作を見るために入れておきます。
def on_connection_success(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
print(f"Connection Successful with return code: {callback_data.return_code} session present: {callback_data.session_present}")
on_connection_failure
Optional callback invoked whenever the connection fails to connect.
The function should take the following arguments and return nothing
MQTT接続が失敗するたびに呼び出されるcallback関数です。MQTT接続のクライアントのインスタンスを作成する時に呼び出しておきます。Optionalと書かれていますが動作を見るために入れておきます。
def on_connection_failure(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionFailureData)
print(f"Connection failed with error code: {callback_data.error}")
print(f"Error details: {callback_data}")
on_connection_closed
Optional callback invoked whenever the connection has been disconnected and shutdown successfully.
The function should take the following arguments and return nothing
MQTT接続が正常に切断・終了すると呼び出されるcallback関数です。MQTT接続のクライアントobjectを作成する時に呼び出しておきます。Optionalと書かれていますが動作を見るために入れておきます。
def on_connection_closed(connection, callback_data):
print("Connection closed")
customメソッド
自分使い用にまとめ直したメソッドに再構築して使います。今回新たに作るメソッド以下です。
MQTTクライアント作成
一意のclient idでAWS IoTに接続するクライアントobjectを作成してます。
ここで使うawsiot.mqtt_connection_builder.mtls_from_path()ではクライアントをインスタンス化する際に、paho-mqttと以下の点が異なります。
- endpointとportも紐づける
- 接続に関わる5種類のcallbackを呼び出せる
def mqtt_init():
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
port=port,
cert_filepath=cert,
pri_key_filepath=key,
ca_filepath=ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=clientid,
clean_session=False,
keep_alive_secs=30,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed
)
print("Connecting to endpoint with client ID")
return mqtt_connection
MQTT接続
mqtt_init()で作成したクライアントでAWS IoTに接続します。
def mqtt_connect(mqtt_connection):
connect_future = mqtt_connection.connect()
connect_future.result()
切断
client idによる接続をdisconnectメソッドで切断します。
def mqtt_disconnoect(mqtt_connection):
print("Disconnecting...")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
topic発行(publish)
仮想のホスト名:MQTTServer1のラズパイGW/brokerから、device_0のシグナル:msgを以下のメソッドでAWS IoTにpubします。
def mqtt_publish(mqtt_connection, topic, msg):
data = {}
print("topic:", topic)
data['Timestamp'] = int(time.time())
data['hostname'] = hostname
data['device_no'] = device_no
data['msg'] = msg
message_json = json.dumps(data)
mqtt_connection.publish(
topic=topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE,
)
print(f"published message payload dict: {data}")
time.sleep(1)
topic購読(subcrribe)
以下のメソッドで自分が接続しているMQTTネットワーク内でtopicにpubされたメッセージをsubします。
def mqtt_subscribe(mqtt_connection, topic):
print(f"Subscribing to topic '{topic}'...")
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message)
subscribe_result = subscribe_future.result()
print(f"Subscribed with {subscribe_result}")
time.sleep(1)
ラズパイGW <-> AWS IoT でpubsub
AWS IoTとport 8883で接続するための認証情報とラズパイの識別子(client id=hostname)を準備します。
topicはpahoの説明と同様のものとします。
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'
device_no = 0
topic = f'MQTTServer/MQTTServer1'
device_0のシグナルもこれで行きますヨシッ👉
msg = "AWS IoT Device SDK v2接続ヨシッ👉"
ここまでで準備した認証情報とメソッドを組み合わせるとラズパイとAWS IoTが接続されたNWでpubsubが可能になります。
以下を実行すると接続後にsubscriberが待ち受けした状態となり、pubされた後に自分自身でもmessageをsubして回線を切断します。
上手くいかないときは1~3秒くらいのtime.sleepを挟むとpubsubが出来ると思います。
clientid = f"{hostname}"
topic = TOPIC
#クライアント作成
mqtt_connection = mqtt_init()
#client idで接続
mqtt_connect(mqtt_connection)
#sub開始
mqtt_subscribe(mqtt_connection, topic)
try:
#message発行
mqtt_publish(mqtt_connection, topic, msg)
except KeyboardInterrupt:
print("Waiting for shadow updates...")
mqtt_disconnoect(mqtt_connection)
Demo code 全体
コードを全てつなげると以下のようになります。
#!/usr/bin/python
# -*- coding: utf-8 -*-
from awscrt import mqtt
from awsiot import mqtt_connection_builder, iotshadow
import os
import time
import json
endpoint = 'xxxxxxxxxxxxx-ats.iot.ap-northeast-1.amazonaws.com'
hostname = os.uname()[1]
port = 8883
cert = f'./cert/MQTTServer1-certificate.pem.crt'
key = f'./cert/MQTTServer1-private.pem.key'
ca = f'./cert/AmazonRootCA1.pem'
device_no = 0
topic = f'MQTTServer/MQTTServer1'
msg = "AWS IoT Device SDK v2接続ヨシッ👉"
def on_connection_interrupted(connection, error, **kwargs):
print(f"Connection interrupted. error: {error}")
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print(f"Connection resumed. return_code: {return_code} session_present: {return_code}")
def on_message(topic, payload, dup, qos, retain, **kwargs):
payload_str = payload.decode('utf-8')
payload_dict = json.loads(payload_str)
print(f"on_message payload dict: {payload_dict}")
return
def on_connection_success(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
print(f"Connection Successful with return code: {callback_data.return_code} session present: {callback_data.session_present}")
def on_connection_failure(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionFailureData)
print(f"Connection failed with error code: {callback_data.error}")
print(f"Error details: {callback_data}")
def on_connection_closed(connection, callback_data):
print("Connection closed")
def mqtt_init():
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=endpoint,
port=port,
cert_filepath=cert,
pri_key_filepath=key,
ca_filepath=ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=clientid,
clean_session=False,
keep_alive_secs=30,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed
)
print("Connecting to endpoint with client ID")
return mqtt_connection
def mqtt_connect(mqtt_connection):
connect_future = mqtt_connection.connect()
connect_future.result()
def mqtt_disconnoect(mqtt_connection):
print("Disconnecting...")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
def mqtt_publish(mqtt_connection, topic, msg):
data = {}
print("topic:", topic)
data['Timestamp'] = int(time.time())
data['hostname'] = hostname
data['device_no'] = device_no
data['msg'] = msg
message_json = json.dumps(data)
mqtt_connection.publish(
topic=topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE,
)
print(f"published message payload dict: {data}")
time.sleep(1)
def mqtt_subscribe(mqtt_connection, topic):
print(f"Subscribing to topic '{topic}'...")
subscribe_future, packet_id = mqtt_connection.subscribe(
topic=topic,
qos=mqtt.QoS.AT_LEAST_ONCE,
callback=on_message)
subscribe_result = subscribe_future.result()
print(f"Subscribed with {subscribe_result}")
time.sleep(1)
if __name__ == '__main__':
clientid = f"{hostname}"
topic = TOPIC
mqtt_connection = mqtt_init()
mqtt_connect(mqtt_connection)
mqtt_subscribe(mqtt_connection, topic)
try:
mqtt_publish(mqtt_connection, topic, msg)
except KeyboardInterrupt:
print("Waiting for shadow updates...")
mqtt_disconnoect(mqtt_connection)
実行結果
~ $ python pubsub_sdk.py
Connecting to endpoint with client ID
Connection Successful with return code: 0 session present: True
Subscribing to topic 'MQTTServer/MQTTServer1/0'...
Subscribed with {'packet_id': 1, 'topic': 'MQTTServer/MQTTServer1/0', 'qos': <QoS.AT_LEAST_ONCE: 1>}
topic: MQTTServer/MQTTServer1/0
published message payload dict: {'Timestamp': 1719715304, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
on_message payload dict: {'Timestamp': 1719715304, 'hostname': 'MQTTServer1', 'device_no': 0, 'msg': 'AWS IoT Device SDK v2接続ヨシッ👉'}
Disconnecting...
Connection closed
~ $
AWS IoTのテストクライアント上でもsubscribeを確認できました。
次回
shadowとwillとretainを活用してAWS IoT CoreとPubSubする⑦awsiotsdkv2でshadow update