2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

お題は不問!Qiita Engineer Festa 2023で記事投稿!

aws-iot-device-sdk-python-v2 を用いてPublish/Subscribeを行う(Publish編)

Last updated at Posted at 2023-07-13

時間が空いてしまいましたが以前の記事の続きにあたります。
想定している全体像は以前の記事にある通りですが、この記事単独でAWS IoTでのPublish部分の流れがわかるようにしています。

デバイスを接続(環境準備)

サービス -> IoT -> IoT Core でAWS IoTの画面を開きます
右上の開始方法のボタン(下記画像)あるいは画面左のメニューから接続 - 1個のデバイスを接続を選択します
image.png

Step 1 デバイスを準備する

デバイスを準備する とありますが、今回はローカルの環境をIoT機器に見立てて実行しますので、そのまま次へで大丈夫です。
(実行環境からの通信が可能か、手順にあるpingは実行しておいた方が良いかもしれません)

Step 2 デバイスを登録して保護する

image.png
名前だけ入力しています。追加設定は特になしで次へ

Step 3プラットフォームと SDK を選択します

image.png
後ほどラズパイでの使用を想定しているのでLinux/macOS向けにしています。
IoT機器で使用する言語、または好みの言語を選択すれば良いと思います。今回はPythonを使用します。

Step 4 接続キットをダウンロード

image.png
Step3で選んだ環境に合わせた接続キットがダウンロードできますので、ダウンロードして展開します。
今回はWindowsのWSL環境から次のように展開しました。

$ unzip connect_device_package.zip
Archive:  connect_device_package.zip
 extracting: TestThing001.cert.pem
 extracting: TestThing001.public.key
 extracting: TestThing001.private.key
 extracting: TestThing001-Policy
 extracting: start.sh

Step 5 接続キットを実行

実行のための丁寧な説明が表示されますので、それに従って実行します。
image.png

途中で1分近く待たされましたが、無事実行開始。
放っておくとメッセージを送り続けるようなので、Ctrl_Cで止めました。

$ ./start.sh

Downloading AWS IoT Root CA certificate from AWS...
  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  1188  100  1188    0     0  11034      0 --:--:-- --:--:-- --:--:-- 11102

Cloning the AWS SDK...
Cloning into 'aws-iot-device-sdk-python-v2'...
remote: Enumerating objects: 2139, done.
remote: Counting objects: 100% (878/878), done.
remote: Compressing objects: 100% (341/341), done.
remote: Total 2139 (delta 665), reused 664 (delta 525), pack-reused 1261
Receiving objects: 100% (2139/2139), 2.06 MiB | 17.76 MiB/s, done.
Resolving deltas: 100% (1334/1334), done.

Installing AWS SDK...
Defaulting to user installation because normal site-packages is not writeable
Processing ./aws-iot-device-sdk-python-v2
  Preparing metadata (setup.py) ... done
Collecting awscrt==0.16.17
  Downloading awscrt-0.16.17-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (8.2 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 8.2/8.2 MB 43.3 MB/s eta 0:00:00
Building wheels for collected packages: awsiotsdk
  Building wheel for awsiotsdk (setup.py) ... done
  Created wheel for awsiotsdk: filename=awsiotsdk-1.0.0.dev0-py3-none-any.whl size=72345 sha256=f17b9413eeefd609a38958c3c8a15970be8aeff24e3b54f3822616e5124a375b
  Stored in directory: /home/keroway/.cache/pip/wheels/2b/82/d7/ba3014685ba01e4596bcd48381727f25c86466984a7020f9a4
Successfully built awsiotsdk
Installing collected packages: awscrt, awsiotsdk
Successfully installed awscrt-0.16.17 awsiotsdk-1.0.0.dev0

Running pub/sub sample application...
Connecting to a1w9cortf3p7g3-ats.iot.ap-northeast-1.amazonaws.com with client ID 'basicPubSub'...
Connected!
Subscribing to topic 'sdk/test/python'...
Subscribed with QoS.AT_LEAST_ONCE
Sending messages until program killed
Publishing message to topic 'sdk/test/python': Hello World!  [1]
Received message from topic 'sdk/test/python': b'"Hello World!  [1]"'
Publishing message to topic 'sdk/test/python': Hello World!  [2]
Received message from topic 'sdk/test/python': b'"Hello World!  [2]"'
Publishing message to topic 'sdk/test/python': Hello World!  [3]
Received message from topic 'sdk/test/python': b'"Hello World!  [3]"'

送信(Publish) されたデータがサブスクリプションの欄にどんどん表示されていきます。
image.png

続行を押すと接続の手順は完了です。
image.png

参考

start.sh内で、下記リポジトリからSDKを取得していました。
Readme.mdにインストール手順や使い方のガイドがあるので、参考になると思います。

Publish側コード実装

Publishだけに絞ったサンプルコードを準備します。
AWSのサンプル実装を削りつつ、start.shからパラメータで渡されていた部分を内部で持たせて簡略化しています。

サンプルコード編集

pubgeosample.py

from awscrt import mqtt
from awsiot import mqtt_connection_builder
import time
import json

input_endpoint = 'xxxxxx.iot.ap-northeast-1.amazonaws.com'
input_port = 8883
input_cert = 'TestThing001.cert.pem'
input_key = 'TestThing001.private.key'
input_ca = 'root-CA.crt'
input_clientId = 'basicPubSub'
input_count = 5
input_topic = 'sdk/test/python'
input_message = 'hello my aws-iot.'


# Callback when connection is accidentally lost.
def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# Callback when an interrupted connection is re-established.
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))


if __name__ == '__main__':

    # Create a MQTT connection from the command line data
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=input_endpoint,
        port=input_port,
        cert_filepath=input_cert,
        pri_key_filepath=input_key,
        ca_filepath=input_ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=input_clientId,
        clean_session=False,
        # keep_alive_secs=30,
        keep_alive_secs=30
        # http_proxy_options=proxy_options
        )

    print("Connecting to endpoint with client ID")
    connect_future = mqtt_connection.connect()

    # Future.result() waits until a result is available
    connect_future.result()
    print("Connected!")

    message_count = input_count
    message_topic = input_topic
    message_string = input_message

    if message_string:
        if message_count == 0:
            print("Sending messages until program killed")
        else:
            print("Sending {} message(s)".format(message_count))

        publish_count = 1
        while (publish_count <= message_count) or (message_count == 0):
            message = "{} [{}]".format(message_string, publish_count)
            print("Publishing message to topic '{}': {}".format(message_topic, message))
            message_json = json.dumps(message)
            mqtt_connection.publish(
                topic=message_topic,
                payload=message_json,
                qos=mqtt.QoS.AT_LEAST_ONCE)
            time.sleep(1)
            publish_count += 1

    # Disconnect
    print("Disconnecting...")
    disconnect_future = mqtt_connection.disconnect()
    disconnect_future.result()
    print("Disconnected!")

送信テスト

まずはクライアント名やトピック名を変更せず、接続~送信を行い、テストクライアントでサブスクライブできることを確認します。
start.shは使わず、直接pyファイルを実行します。

python pubgeosample.py 

image.png

ポリシー編集

なお、ポリシーで設定したclient名・トピックでないとエラーになります。
デフォルトで各言語向け接続キット用のclient名・トピックが登録されていますので、自分の使いたい名称に合わせて変更します。
image.png
アクティブなバージョンを編集 を押下して編集画面にします
image.png
ビルダーよりJSONの方が書き換えやすそうだったので、このように書き換えました。

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:Publish"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxx:topic/wkt/test/"
      ]
    },
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": [
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxx:client/test-thing001"
      ]
    }
  ]
}

すぐ反映してよいので、「編集したバージョンをこのポリシーのアクティブバージョンとして設定します。」にチェックを入れて、新しいバージョンとして保存を押下します。

トピック名とクライアント名を編集

プログラム側の接続時クライアント名、Publish先トピック名を書き換えます。
(差分のみ)

input_clientId = 'test-thing001'
input_topic = 'wkt/test/'

問題なく受信できることを確認しました。
image.png

座標(緯度経度)データを送信

送信メッセージを座標データ(ダミー)にします。

message_count = input_count
message_topic = input_topic
publishdata = {"lat": 35.676764621841286, "lng": 139.7336903131048} # 座標データ

if publishdata:
    if message_count == 0:
        print("Sending messages until program killed")
    else:
        print("Sending {} message(s)".format(message_count))

    publish_count = 1
    while (publish_count <= message_count) or (message_count == 0):
        # print("Publishing message to topic '{}': {}".format(message_topic, message))
        publishdata.update(count=publish_count) # dictにカウント数を追加
        message_json = json.dumps(publishdata)
        mqtt_connection.publish(
            topic=message_topic, payload=message_json, qos=mqtt.QoS.AT_LEAST_ONCE
        )
        time.sleep(1)
        publish_count += 1

座標データが送信されていることを確認しました。
image.png

終わりに

本当はSubscribe側も盛り込む予定でしたが、長くなっってきましたので今回はここまでとします。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?