LoginSignup
0
0

More than 1 year has passed since last update.

AWS IoT SDK for Python v2 で publish

Posted at

こちらのページと同じことを行ないました。
Python の使用時に、デバイスから AWS IoT Core に MQTT メッセージを発行する方法を教えてください。

プログラム

iot_publish.py
#! /usr/bin/python
#
#
# ------------------------------------------------------------------
from awscrt import io, mqtt, auth, http
from awsiot import mqtt_connection_builder
import time
import json
import sys
#
# ------------------------------------------------------------------
ENDPOINT = "*****-ats.iot.ap-northeast-1.amazonaws.com"
CLIENT_ID = "testDevice"
PATH_TO_CERTIFICATE = "****-certificate.pem.crt"
PATH_TO_PRIVATE_KEY = ":*****-private.pem.key"
PATH_TO_AMAZON_ROOT_CA_1 = "AmazonRootCA1.pem"
MESSAGE = 'Good Morning'
TOPIC = "test/testing"

# ------------------------------------------------------------------
# Spin up resources
sys.stderr.write("*** 開始 ***\n")
event_loop_group = io.EventLoopGroup(1)
host_resolver = io.DefaultHostResolver(event_loop_group)
client_bootstrap = io.ClientBootstrap(event_loop_group, host_resolver)
mqtt_connection = mqtt_connection_builder.mtls_from_path(
			endpoint=ENDPOINT,
			cert_filepath=PATH_TO_CERTIFICATE,
			pri_key_filepath=PATH_TO_PRIVATE_KEY,
			client_bootstrap=client_bootstrap,
			ca_filepath=PATH_TO_AMAZON_ROOT_CA_1,
			client_id=CLIENT_ID,
			clean_session=False,
			keep_alive_secs=6
			)
#
print("Connecting to {} with client ID '{}'...".format(
		ENDPOINT, CLIENT_ID))
# Make the connect() call
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
sys.stderr.write("*** Connected! ***\n")
#
sys.stderr.write('*** Begin Publish ***\n')
for i in range (5):
	data = "{} [{}]".format(MESSAGE, i+1)
	message = {"message" : data}
	mqtt_connection.publish(topic=TOPIC, payload=json.dumps(message), qos=mqtt.QoS.AT_LEAST_ONCE)
	print("Published: '" + json.dumps(message) + "' to the topic: " + "'test/testing'")
	time.sleep(1.0)
sys.stderr.write('*** Publish End ***\n')
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
sys.stderr.write("*** 終了 ***\n")
# ------------------------------------------------------------------

実行結果

$ ./iot_publish.py 
*** 開始 ***
Connecting to *****-ats.iot.ap-northeast-1.amazonaws.com with client ID 'testDevice'...
*** Connected! ***
*** Begin Publish ***
Published: '{"message": "Good Morning [1]"}' to the topic: 'test/testing'
Published: '{"message": "Good Morning [2]"}' to the topic: 'test/testing'
Published: '{"message": "Good Morning [3]"}' to the topic: 'test/testing'
Published: '{"message": "Good Morning [4]"}' to the topic: 'test/testing'
Published: '{"message": "Good Morning [5]"}' to the topic: 'test/testing'
*** Publish End ***
*** 終了 ***

確認するスクリプト

go_subscribe.sh
mosquitto_sub --cafile ./AmazonRootCA1.pem \
        --cert ./*****-certificate.pem.crt \
    --key ./****-private.pem.key \
    -h "*****-ats.iot.ap-northeast-1.amazonaws.com" \
    -p 8883 -q 1 -d -t test/testing

実行結果

Client (null) received PUBLISH (d0, q1, r0, m1, 'test/testing', ... (31 bytes))
Client (null) sending PUBACK (m1, rc0)
{"message": "Good Morning [1]"}
Client (null) received PUBLISH (d0, q1, r0, m1, 'test/testing', ... (31 bytes))
Client (null) sending PUBACK (m1, rc0)
{"message": "Good Morning [2]"}
Client (null) received PUBLISH (d0, q1, r0, m1, 'test/testing', ... (31 bytes))
Client (null) sending PUBACK (m1, rc0)
{"message": "Good Morning [3]"}
Client (null) received PUBLISH (d0, q1, r0, m1, 'test/testing', ... (31 bytes))
Client (null) sending PUBACK (m1, rc0)
{"message": "Good Morning [4]"}
Client (null) received PUBLISH (d0, q1, r0, m1, 'test/testing', ... (31 bytes))
Client (null) sending PUBACK (m1, rc0)
{"message": "Good Morning [5]"}

確認したバージョン

$ python --version
Python 3.10.7
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0