時間が空いてしまいましたが以前の記事の続きにあたります。
想定している全体像は以前の記事にある通りですが、この記事単独でAWS IoTでのPublish部分の流れがわかるようにしています。
デバイスを接続(環境準備)
サービス -> IoT -> IoT Core でAWS IoTの画面を開きます
右上の開始方法のボタン(下記画像)あるいは画面左のメニューから接続 - 1個のデバイスを接続を選択します
Step 1 デバイスを準備する
デバイスを準備する とありますが、今回はローカルの環境をIoT機器に見立てて実行しますので、そのまま次へで大丈夫です。
(実行環境からの通信が可能か、手順にあるpingは実行しておいた方が良いかもしれません)
Step 2 デバイスを登録して保護する
Step 3プラットフォームと SDK を選択します
後ほどラズパイでの使用を想定しているのでLinux/macOS向けにしています。
IoT機器で使用する言語、または好みの言語を選択すれば良いと思います。今回はPythonを使用します。
Step 4 接続キットをダウンロード
Step3で選んだ環境に合わせた接続キットがダウンロードできますので、ダウンロードして展開します。
今回はWindowsのWSL環境から次のように展開しました。
$ unzip connect_device_package.zip
Archive: connect_device_package.zip
extracting: TestThing001.cert.pem
extracting: TestThing001.public.key
extracting: TestThing001.private.key
extracting: TestThing001-Policy
extracting: start.sh
Step 5 接続キットを実行
実行のための丁寧な説明が表示されますので、それに従って実行します。
途中で1分近く待たされましたが、無事実行開始。
放っておくとメッセージを送り続けるようなので、Ctrl_Cで止めました。
$ ./start.sh
Downloading AWS IoT Root CA certificate from AWS...
% Total % Received % Xferd Average Speed Time Time Time Current
Dload Upload Total Spent Left Speed
100 1188 100 1188 0 0 11034 0 --:--:-- --:--:-- --:--:-- 11102
Cloning the AWS SDK...
Cloning into 'aws-iot-device-sdk-python-v2'...
remote: Enumerating objects: 2139, done.
remote: Counting objects: 100% (878/878), done.
remote: Compressing objects: 100% (341/341), done.
remote: Total 2139 (delta 665), reused 664 (delta 525), pack-reused 1261
Receiving objects: 100% (2139/2139), 2.06 MiB | 17.76 MiB/s, done.
Resolving deltas: 100% (1334/1334), done.
Installing AWS SDK...
Defaulting to user installation because normal site-packages is not writeable
Processing ./aws-iot-device-sdk-python-v2
Preparing metadata (setup.py) ... done
Collecting awscrt==0.16.17
Downloading awscrt-0.16.17-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (8.2 MB)
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 8.2/8.2 MB 43.3 MB/s eta 0:00:00
Building wheels for collected packages: awsiotsdk
Building wheel for awsiotsdk (setup.py) ... done
Created wheel for awsiotsdk: filename=awsiotsdk-1.0.0.dev0-py3-none-any.whl size=72345 sha256=f17b9413eeefd609a38958c3c8a15970be8aeff24e3b54f3822616e5124a375b
Stored in directory: /home/keroway/.cache/pip/wheels/2b/82/d7/ba3014685ba01e4596bcd48381727f25c86466984a7020f9a4
Successfully built awsiotsdk
Installing collected packages: awscrt, awsiotsdk
Successfully installed awscrt-0.16.17 awsiotsdk-1.0.0.dev0
Running pub/sub sample application...
Connecting to a1w9cortf3p7g3-ats.iot.ap-northeast-1.amazonaws.com with client ID 'basicPubSub'...
Connected!
Subscribing to topic 'sdk/test/python'...
Subscribed with QoS.AT_LEAST_ONCE
Sending messages until program killed
Publishing message to topic 'sdk/test/python': Hello World! [1]
Received message from topic 'sdk/test/python': b'"Hello World! [1]"'
Publishing message to topic 'sdk/test/python': Hello World! [2]
Received message from topic 'sdk/test/python': b'"Hello World! [2]"'
Publishing message to topic 'sdk/test/python': Hello World! [3]
Received message from topic 'sdk/test/python': b'"Hello World! [3]"'
送信(Publish) されたデータがサブスクリプションの欄にどんどん表示されていきます。
参考
start.sh内で、下記リポジトリからSDKを取得していました。
Readme.mdにインストール手順や使い方のガイドがあるので、参考になると思います。
Publish側コード実装
Publishだけに絞ったサンプルコードを準備します。
AWSのサンプル実装を削りつつ、start.shからパラメータで渡されていた部分を内部で持たせて簡略化しています。
サンプルコード編集
pubgeosample.py
from awscrt import mqtt
from awsiot import mqtt_connection_builder
import time
import json
input_endpoint = 'xxxxxx.iot.ap-northeast-1.amazonaws.com'
input_port = 8883
input_cert = 'TestThing001.cert.pem'
input_key = 'TestThing001.private.key'
input_ca = 'root-CA.crt'
input_clientId = 'basicPubSub'
input_count = 5
input_topic = 'sdk/test/python'
input_message = 'hello my aws-iot.'
# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
print("Connection interrupted. error: {}".format(error))
# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))
if __name__ == '__main__':
# Create a MQTT connection from the command line data
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=input_endpoint,
port=input_port,
cert_filepath=input_cert,
pri_key_filepath=input_key,
ca_filepath=input_ca,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=input_clientId,
clean_session=False,
# keep_alive_secs=30,
keep_alive_secs=30
# http_proxy_options=proxy_options
)
print("Connecting to endpoint with client ID")
connect_future = mqtt_connection.connect()
# Future.result() waits until a result is available
connect_future.result()
print("Connected!")
message_count = input_count
message_topic = input_topic
message_string = input_message
if message_string:
if message_count == 0:
print("Sending messages until program killed")
else:
print("Sending {} message(s)".format(message_count))
publish_count = 1
while (publish_count <= message_count) or (message_count == 0):
message = "{} [{}]".format(message_string, publish_count)
print("Publishing message to topic '{}': {}".format(message_topic, message))
message_json = json.dumps(message)
mqtt_connection.publish(
topic=message_topic,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE)
time.sleep(1)
publish_count += 1
# Disconnect
print("Disconnecting...")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
print("Disconnected!")
送信テスト
まずはクライアント名やトピック名を変更せず、接続~送信を行い、テストクライアントでサブスクライブできることを確認します。
start.shは使わず、直接pyファイルを実行します。
python pubgeosample.py
ポリシー編集
なお、ポリシーで設定したclient名・トピックでないとエラーになります。
デフォルトで各言語向け接続キット用のclient名・トピックが登録されていますので、自分の使いたい名称に合わせて変更します。
アクティブなバージョンを編集 を押下して編集画面にします
ビルダーよりJSONの方が書き換えやすそうだったので、このように書き換えました。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Publish"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:xxxxxxxxxx:topic/wkt/test/"
]
},
{
"Effect": "Allow",
"Action": "iot:Connect",
"Resource": [
"arn:aws:iot:ap-northeast-1:xxxxxxxxxx:client/test-thing001"
]
}
]
}
すぐ反映してよいので、「編集したバージョンをこのポリシーのアクティブバージョンとして設定します。」にチェックを入れて、新しいバージョンとして保存を押下します。
トピック名とクライアント名を編集
プログラム側の接続時クライアント名、Publish先トピック名を書き換えます。
(差分のみ)
input_clientId = 'test-thing001'
input_topic = 'wkt/test/'
座標(緯度経度)データを送信
送信メッセージを座標データ(ダミー)にします。
message_count = input_count
message_topic = input_topic
publishdata = {"lat": 35.676764621841286, "lng": 139.7336903131048} # 座標データ
if publishdata:
if message_count == 0:
print("Sending messages until program killed")
else:
print("Sending {} message(s)".format(message_count))
publish_count = 1
while (publish_count <= message_count) or (message_count == 0):
# print("Publishing message to topic '{}': {}".format(message_topic, message))
publishdata.update(count=publish_count) # dictにカウント数を追加
message_json = json.dumps(publishdata)
mqtt_connection.publish(
topic=message_topic, payload=message_json, qos=mqtt.QoS.AT_LEAST_ONCE
)
time.sleep(1)
publish_count += 1
終わりに
本当はSubscribe側も盛り込む予定でしたが、長くなっってきましたので今回はここまでとします。