0
1

Raspberry Pi → AWS IoT Core 経由でDynamoDBへ保存する

Last updated at Posted at 2024-08-15

はじめに

前回の記事(Raspberry Pi + DS18B20で水温測定)でRaspberry PiとDS18B20温度センサーを使用して水温の取得ができたので、今回は取得したデータをAWS IoT Core経由でDynamoDBに保存するところまでやってみる。

開発環境

  • Python 3.11.2
  • Raspberry Pi OS (64-bit)
  • AWS IoT Core
  • Amazon DynamoDB

システム構成

水温計DS18B20から温度を取得し、Raspberry PiとIoT Coreを接続、Raspberry Piから送信されたデバイスデータをIoT CoreのルールでDynamoDBに保存する。

IoTCore-1.jpg

※ DS18B20→Raspberry Piへの温度取得に必要な設定と配線図は、前回の記事に記載しています。

AWS IoT Coreの設定

マネージドコンソールからIoTCoreに入る

[デバイスを接続] からデバイスの設定画面に移動します。

image-9.jpg

デバイスがIoTCoreに接続できるか確認する

Raspberry PiにSSH接続後、以下のコマンドを実行しIoTCoreに接続できるか確認する

ping hogefuga.iot.ap-northeast-1.amazonaws.com

接続できない場合は、RaspberryPiのWifi設定などインターネットに接続できる環境になっているか確認する

image-1.jpg

上記コマンドを実行し、結果が返ってこればRaspberryPi側の設定はできている。

Reply from xx.xx.xxx.90: bytes=32 time=21ms TTL=248
Reply from xx.xx.xxx.90: bytes=32 time=16ms TTL=248
Reply from xx.xx.xxx.90: bytes=32 time=21ms TTL=248

接続するデバイスの設定

1. デバイス名を登録

今回は raspberry_pi_4 で設定

image_1.jpg

2. プラットフォームとSDKを選択

今回は、RaspberryPi OSを使用しているので、Linux を選択
SDKは Python を使用する

image-2.jpg

3. デバイスにソフトウェアをインストール

接続キットをダウンロード

image-3.jpg

4. SCPコマンドでダウンロードしたZipをRaspberry Pi に転送

scp .\connect_device_package.zip [ユーザー名]@[IPアドレス]:/home/[ユーザー名]

Raspberry PiにSSH接続で入り、転送したZipフォルダを解凍する

unzip connect_device_package.zip

5. 接続キットを実行

解凍したファイルからstart.shに実行権限を追加

chmod +x start.sh

Pythonの仮想環境を作成する

python -m venv venv

Pythonの仮想環境へアクティベートする

source ./venv/bin/activate

Pythonの仮想環境へアクティベートした状態でないと、start.sh中にエラーになります

インストールスクリプトを実行する

./start.sh

→ インストールが完了すると、アプリケーションが動作し始めます

Publishing message to topic 'sdk/test/python': Hello World!  [1]
Received message from topic 'sdk/test/python': b'"Hello World!  [1]"'
Publishing message to topic 'sdk/test/python': Hello World!  [2]
Received message from topic 'sdk/test/python': b'"Hello World!  [2]"'
Publishing message to topic 'sdk/test/python': Hello World!  [3]
Received message from topic 'sdk/test/python': b'"Hello World!  [3]"'
Publishing message to topic 'sdk/test/python': Hello World!  [4]
Received message from topic 'sdk/test/python': b'"Hello World!  [4]"'
Publishing message to topic 'sdk/test/python': Hello World!  [5]
Received message from topic 'sdk/test/python': b'"Hello World!  [5]"'

AWSのコンソールパネルで送信されたメッセージがサブスクリプションされていることが分かる。

image-4.png

これでRaspberryPiをAWS IoT Coreの連携は一旦完了

MQTTパブリッシュプログラム作成

ここからRaspberry Piに実装するスクリプトの作成に入る

ファイル構成

今回作成するプログラムのファイル構成は以下。

├── mqtt_publish.py
├── utils
│   └── ds18B20.py
├── venv
└── .env

mqtt_publish.py

今回のメインになるプログラム。

RaspberryPiからIoT CoreへMQTTプロトコルを使用してパブリッシュする。

import os
from os.path import join, dirname
from dotenv import load_dotenv
from awscrt import mqtt, http
from awsiot import mqtt_connection_builder
import sys
import threading
import time
import json
from datetime import datetime, date
from utils import ds18b20

# 環境変数を取得
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
ENDPOINT = os.environ.get("ENDPOINT")
PORT = int(os.environ.get("PORT"))
CERT_PATH = os.environ.get("CERT_FILEPATH")
PRIVATE_KEY_PATH = os.environ.get("PRI_KEY_FILEPATH")
CA_PATH = os.environ.get("CA_FILEPATH")
CLIENT_ID = os.environ.get("CLIENT_ID")
MESSAGE_COUNT = int(os.environ.get("MESSAGE_COUNT"))
MESSAGE_TOPIC = os.environ.get("MESSAGE_TOPIC")

received_count = 0
received_all_event = threading.Event()

# 接続切断時コールバック
def on_connection_interrupted(connection, error, **kwargs):
    print("Connection interrupted. error: {}".format(error))


# コネクション接続が再確立コールバック
def on_connection_resumed(connection, return_code, session_present, **kwargs):
    print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))

    if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
        print("Session did not persist. Resubscribing to existing topics...")
        resubscribe_future, _ = connection.resubscribe_existing_topics()
        resubscribe_future.add_done_callback(on_resubscribe_complete)


def on_resubscribe_complete(resubscribe_future):
    resubscribe_results = resubscribe_future.result()
    print("Resubscribe results: {}".format(resubscribe_results))

    for topic, qos in resubscribe_results['topics']:
        if qos is None:
            sys.exit("Server rejected resubscribe to topic: {}".format(topic))


# サブスクライブされたトピックがメッセージを受信したときのコールバック
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))
    global received_count
    received_count += 1
    if received_count == MESSAGE_COUNT:
        received_all_event.set()

# 接続が成功したときのコールバック
def on_connection_success(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
    print("Connection Successful with return code: {} session present: {}".format(callback_data.return_code, callback_data.session_present))

# 接続試行が失敗した場合のコールバック
def on_connection_failure(connection, callback_data):
    assert isinstance(callback_data, mqtt.OnConnectionFailureData)
    print("Connection failed with error code: {}".format(callback_data.error))

# 接続がクローズされたときのコールバック
def on_connection_closed(connection, callback_data):
    print("Connection closed")

def make_mqtt_connection():
    # MQTT接続を作成
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=ENDPOINT,
        port=PORT,
        cert_filepath=CERT_PATH,
        pri_key_filepath=PRIVATE_KEY_PATH,
        ca_filepath=CA_PATH,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=CLIENT_ID,
        clean_session=False,
        keep_alive_secs=30,
        http_proxy_options=None,
        on_connection_success=on_connection_success,
        on_connection_failure=on_connection_failure,
        on_connection_closed=on_connection_closed
        )
    
    return mqtt_connection

def main():
    mqtt_connection = make_mqtt_connection()
    
    print(f"Connecting to {ENDPOINT} with client ID '{CLIENT_ID}'...")
    connect_future = mqtt_connection.connect()

    # コネクションが利用可能になるまで待機
    connect_future.result()
    print("Connected!")
    
    # 水温センサーから温度を取得
    temperature = ds18b20.measure()
    
    # JSONデータを作成
    datetime_now = datetime.now().strftime("%Y%m%d%H%M")
    message_json = json.dumps(
            {
                'device_id' : 'ds18b20',
                "temperature": temperature,
                "datetime"  : datetime_now,
            }
        )
    
    # MESSAGE_COUNTの数だけメッセージをパブリッシュする(0の場合は無限ループ)
    if MESSAGE_COUNT == 0:
        print("Sending messages until program killed")
    else:
        print("Sending {} message".format(MESSAGE_COUNT))

    publish_count = 1
    while (publish_count <= MESSAGE_COUNT) or (MESSAGE_COUNT == 0):
        print("Publishing message to topic '{}': {}".format(MESSAGE_TOPIC, message_json))
        
        mqtt_connection.publish(
            topic=MESSAGE_TOPIC,
            payload=message_json,
            qos=mqtt.QoS.AT_LEAST_ONCE)
        time.sleep(1)
        
        publish_count = publish_count + 1

    # コネクション切断
    print("Disconnecting...")
    disconnect_future = mqtt_connection.disconnect()
    disconnect_future.result()
    print("Disconnected!")

if __name__ == '__main__':
    main()

utils/ds18B20.py

前回の記事 で作成したプログラムを参考に作成。

温度センサー(DS18B20)から温度を取得するプログラムをmqtt_publish.pyでインポートするライブラリとして使用する

from w1thermsensor import W1ThermSensor

def measure():
    try:
        ds18b20_sensor = W1ThermSensor()
        temperature = round(ds18b20_sensor.get_temperature(),1)
    except Exception as e:
        temperature = -9999
        
    return temperature

venv

Pythonの仮想環境

.env

環境変数の設定ファイル

.env
ENDPOINT=hogehoge.iot.ap-northeast-1.amazonaws.com
PORT=8883
CERT_FILEPATH=raspberry_pi_4.cert.pem
PRI_KEY_FILEPATH=raspberry_pi_4.private.key
CA_FILEPATH=root-CA.crt
CLIENT_ID=raspberry_pi_4
MESSAGE_COUNT=1
MESSAGE_TOPIC=raspberry_pi_4/ds18b20/python

トピック名とクライアントID変更

トピック名とクライアントIDがデフォルトのままになっているので、環境変数ファイル(.env)に記載した内容になるようコンソール画面で変更する

IoT Coreのコンソール画面から、[セキュリティ] > [ポリシー] > [raspberry_pi_4-Policy]に移動し、JSON記載画面に移動し変更を行う

image-10.jpg

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:Publish",
        "iot:Receive",
        "iot:PublishRetain"
      ],
      "Resource": [
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topic/raspberry_pi_4/ds18b20/java",
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topic/raspberry_pi_4/ds18b20/python",
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topic/raspberry_pi_4/ds18b20/js"
      ]
    },
    {
      "Effect": "Allow",
      "Action": "iot:Subscribe",
      "Resource": [
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topicfilter/raspberry_pi_4/ds18b20/java",
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topicfilter/raspberry_pi_4/ds18b20/python",
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topicfilter/raspberry_pi_4/ds18b20/js"
      ]
    },
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": [
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:client/sdk-java",
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:client/raspberry_pi_4",
        "arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:client/sdk-nodejs-*"
      ]
    }
  ]
}

Amazon DynamoDBを作成する

ここまででRaspberry → IoT Coreの設定は完了したので、次はIoT Core → DynamoDB保存の設定を行う。

トピックのJSON構造

DynamoDBの設定の前に一旦トピックの内容を確認

RaspberryPi → IoT CoreへのMQTTメッセージは、以下のJSON形式でパブリッシュされる

{
    "device_id" : 'ds18b20',
    "temperature": temperature,
    "datetime"  : datetime_now,
}

DynamoDBのテーブル構造

項目 データ型
テーブル名 raspberry_pi_4_data
パーティションキー device_id String
ソートキー datetime String

パーティションキーとソートキーで複合プライマリーキーとなるように設定する

(参考) Amazon DynamoDB のコアコンポーネント

DynamoDBテーブルの作成

DynamoDBのコンソール画面から [テーブル] > [テーブル作成] を開き、テーブル構造で決めた設定内容を入力する

image-11.jpg

IoT Core ルールを作成する

最後に、IoT Coreルールを作成し、IoT Coreにデバイスデータが飛んで来た時にDynamoDBへ保存する設定を行う

ルールの設定

IoT Coreのコンソールから、[メッセージのルーティング] > [ルール] に移動し、[ルールを作成] する

image-6.jpg

ルール名: raspberry_pi_4_data

説明:IoT Coreで受信したraspberry pi からのJSONデータを DynamoDBに保存する

image-7.jpg

SQLステートメントの設定

受信したMQTTメッセージを、DynamoDBに保存する為のSQLを設定する

SQLのバージョン:2016-03-23

image-12.jpg

SQLステートメント

SELECT
  device_id AS device_id,
  temperature AS temperature,
  datetime AS datetime,
FROM
  'raspberry_pi_4/ds18b20/python'

ルールアクションをアタッチ

先ほど作成したDynamoDBテーブル設定をします。

  • アクション:DynamoDBv2

  • テーブル:raspberry_pi_4_data

  • IAMロール:raspberry_pi_4_query_role([新しいロールを作成] から新規作成します)

image-8.jpg

動作確認

Raspberry Pi上でPythonスクリプトを実行する

python  ./mqtt_publish.py

IoT Coreへパブリッシュが正常に完了すれば、以下が出力される

Connecting to hogehoge.iot.ap-northeast-1.amazonaws.com with client ID 'raspberry_pi_4'...
Connection Successful with return code: 0 session present: False
Connected!
Sending 1 message
Publishing message to topic 'raspberry_pi_4/ds18b20/python': {"device_id": "ds18b20", "temperature": 27.6, "datetime": "202408150030"}
Disconnecting...
Connection closed
Disconnected!

IoT Coreを確認する

IoT Coreのコンソール画面から。[MQTTクライアント]を開く。

トピックを指定し、サブスライブした状態でスクリプトを実行すると IoT Coreにパブリッシュされていることが分かる。

image-13.jpg

DynamoDBを確認する

DynamoDBのコンソール画面から、[項目を探索] 画面を開く。

[raspberry_pi_4_data] テーブルの画面を開くと、IoT Coreでサブスクリプションしたデータが保存されていることが分かる

image-14.jpg

最後に

これで水温測定 → Raspberry Piから、IoT Core → DynamoDBまでデータと送れるようになったので、cron設定や定期実行用スクリプトを作成すれば定期的にDynamoDBへデータを送信することができる。
そこまでできればWebアプリケーションとの連携準備が整うはずなので、今後はWebアプリケーションからDynamoDBのデータを取得して、データ可視化を目標にやっていきたいと思う。

参照記事

Raspberry Pi + DS18B20で水温測定

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1