はじめに
前回の記事(Raspberry Pi + DS18B20で水温測定)でRaspberry PiとDS18B20温度センサーを使用して水温の取得ができたので、今回は取得したデータをAWS IoT Core経由でDynamoDBに保存するところまでやってみる。
開発環境
- Python 3.11.2
- Raspberry Pi OS (64-bit)
- AWS IoT Core
- Amazon DynamoDB
システム構成
水温計DS18B20から温度を取得し、Raspberry PiとIoT Coreを接続、Raspberry Piから送信されたデバイスデータをIoT CoreのルールでDynamoDBに保存する。
※ DS18B20→Raspberry Piへの温度取得に必要な設定と配線図は、前回の記事に記載しています。
AWS IoT Coreの設定
マネージドコンソールからIoTCoreに入る
[デバイスを接続] からデバイスの設定画面に移動します。
デバイスがIoTCoreに接続できるか確認する
Raspberry PiにSSH接続後、以下のコマンドを実行しIoTCoreに接続できるか確認する
ping hogefuga.iot.ap-northeast-1.amazonaws.com
接続できない場合は、RaspberryPiのWifi設定などインターネットに接続できる環境になっているか確認する
上記コマンドを実行し、結果が返ってこればRaspberryPi側の設定はできている。
Reply from xx.xx.xxx.90: bytes=32 time=21ms TTL=248
Reply from xx.xx.xxx.90: bytes=32 time=16ms TTL=248
Reply from xx.xx.xxx.90: bytes=32 time=21ms TTL=248
接続するデバイスの設定
1. デバイス名を登録
今回は raspberry_pi_4 で設定
2. プラットフォームとSDKを選択
今回は、RaspberryPi OSを使用しているので、Linux を選択
SDKは Python を使用する
3. デバイスにソフトウェアをインストール
接続キットをダウンロード
4. SCPコマンドでダウンロードしたZipをRaspberry Pi に転送
scp .\connect_device_package.zip [ユーザー名]@[IPアドレス]:/home/[ユーザー名]
Raspberry PiにSSH接続で入り、転送したZipフォルダを解凍する
unzip connect_device_package.zip
5. 接続キットを実行
解凍したファイルからstart.shに実行権限を追加
chmod +x start.sh
Pythonの仮想環境を作成する
python -m venv venv
Pythonの仮想環境へアクティベートする
source ./venv/bin/activate
Pythonの仮想環境へアクティベートした状態でないと、start.sh中にエラーになります
インストールスクリプトを実行する
./start.sh
→ インストールが完了すると、アプリケーションが動作し始めます
Publishing message to topic 'sdk/test/python': Hello World! [1]
Received message from topic 'sdk/test/python': b'"Hello World! [1]"'
Publishing message to topic 'sdk/test/python': Hello World! [2]
Received message from topic 'sdk/test/python': b'"Hello World! [2]"'
Publishing message to topic 'sdk/test/python': Hello World! [3]
Received message from topic 'sdk/test/python': b'"Hello World! [3]"'
Publishing message to topic 'sdk/test/python': Hello World! [4]
Received message from topic 'sdk/test/python': b'"Hello World! [4]"'
Publishing message to topic 'sdk/test/python': Hello World! [5]
Received message from topic 'sdk/test/python': b'"Hello World! [5]"'
AWSのコンソールパネルで送信されたメッセージがサブスクリプションされていることが分かる。
これでRaspberryPiをAWS IoT Coreの連携は一旦完了
MQTTパブリッシュプログラム作成
ここからRaspberry Piに実装するスクリプトの作成に入る
ファイル構成
今回作成するプログラムのファイル構成は以下。
├── mqtt_publish.py
├── utils
│ └── ds18B20.py
├── venv
└── .env
mqtt_publish.py
今回のメインになるプログラム。
RaspberryPiからIoT CoreへMQTTプロトコルを使用してパブリッシュする。
import os
from os.path import join, dirname
from dotenv import load_dotenv
from awscrt import mqtt, http
from awsiot import mqtt_connection_builder
import sys
import threading
import time
import json
from datetime import datetime, date
from utils import ds18b20
# 環境変数を取得
dotenv_path = join(dirname(__file__), '.env')
load_dotenv(dotenv_path)
ENDPOINT = os.environ.get("ENDPOINT")
PORT = int(os.environ.get("PORT"))
CERT_PATH = os.environ.get("CERT_FILEPATH")
PRIVATE_KEY_PATH = os.environ.get("PRI_KEY_FILEPATH")
CA_PATH = os.environ.get("CA_FILEPATH")
CLIENT_ID = os.environ.get("CLIENT_ID")
MESSAGE_COUNT = int(os.environ.get("MESSAGE_COUNT"))
MESSAGE_TOPIC = os.environ.get("MESSAGE_TOPIC")
received_count = 0
received_all_event = threading.Event()
# 接続切断時コールバック
def on_connection_interrupted(connection, error, **kwargs):
print("Connection interrupted. error: {}".format(error))
# コネクション接続が再確立コールバック
def on_connection_resumed(connection, return_code, session_present, **kwargs):
print("Connection resumed. return_code: {} session_present: {}".format(return_code, session_present))
if return_code == mqtt.ConnectReturnCode.ACCEPTED and not session_present:
print("Session did not persist. Resubscribing to existing topics...")
resubscribe_future, _ = connection.resubscribe_existing_topics()
resubscribe_future.add_done_callback(on_resubscribe_complete)
def on_resubscribe_complete(resubscribe_future):
resubscribe_results = resubscribe_future.result()
print("Resubscribe results: {}".format(resubscribe_results))
for topic, qos in resubscribe_results['topics']:
if qos is None:
sys.exit("Server rejected resubscribe to topic: {}".format(topic))
# サブスクライブされたトピックがメッセージを受信したときのコールバック
def on_message_received(topic, payload, dup, qos, retain, **kwargs):
print("Received message from topic '{}': {}".format(topic, payload))
global received_count
received_count += 1
if received_count == MESSAGE_COUNT:
received_all_event.set()
# 接続が成功したときのコールバック
def on_connection_success(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionSuccessData)
print("Connection Successful with return code: {} session present: {}".format(callback_data.return_code, callback_data.session_present))
# 接続試行が失敗した場合のコールバック
def on_connection_failure(connection, callback_data):
assert isinstance(callback_data, mqtt.OnConnectionFailureData)
print("Connection failed with error code: {}".format(callback_data.error))
# 接続がクローズされたときのコールバック
def on_connection_closed(connection, callback_data):
print("Connection closed")
def make_mqtt_connection():
# MQTT接続を作成
mqtt_connection = mqtt_connection_builder.mtls_from_path(
endpoint=ENDPOINT,
port=PORT,
cert_filepath=CERT_PATH,
pri_key_filepath=PRIVATE_KEY_PATH,
ca_filepath=CA_PATH,
on_connection_interrupted=on_connection_interrupted,
on_connection_resumed=on_connection_resumed,
client_id=CLIENT_ID,
clean_session=False,
keep_alive_secs=30,
http_proxy_options=None,
on_connection_success=on_connection_success,
on_connection_failure=on_connection_failure,
on_connection_closed=on_connection_closed
)
return mqtt_connection
def main():
mqtt_connection = make_mqtt_connection()
print(f"Connecting to {ENDPOINT} with client ID '{CLIENT_ID}'...")
connect_future = mqtt_connection.connect()
# コネクションが利用可能になるまで待機
connect_future.result()
print("Connected!")
# 水温センサーから温度を取得
temperature = ds18b20.measure()
# JSONデータを作成
datetime_now = datetime.now().strftime("%Y%m%d%H%M")
message_json = json.dumps(
{
'device_id' : 'ds18b20',
"temperature": temperature,
"datetime" : datetime_now,
}
)
# MESSAGE_COUNTの数だけメッセージをパブリッシュする(0の場合は無限ループ)
if MESSAGE_COUNT == 0:
print("Sending messages until program killed")
else:
print("Sending {} message".format(MESSAGE_COUNT))
publish_count = 1
while (publish_count <= MESSAGE_COUNT) or (MESSAGE_COUNT == 0):
print("Publishing message to topic '{}': {}".format(MESSAGE_TOPIC, message_json))
mqtt_connection.publish(
topic=MESSAGE_TOPIC,
payload=message_json,
qos=mqtt.QoS.AT_LEAST_ONCE)
time.sleep(1)
publish_count = publish_count + 1
# コネクション切断
print("Disconnecting...")
disconnect_future = mqtt_connection.disconnect()
disconnect_future.result()
print("Disconnected!")
if __name__ == '__main__':
main()
utils/ds18B20.py
前回の記事 で作成したプログラムを参考に作成。
温度センサー(DS18B20)から温度を取得するプログラムをmqtt_publish.pyでインポートするライブラリとして使用する
from w1thermsensor import W1ThermSensor
def measure():
try:
ds18b20_sensor = W1ThermSensor()
temperature = round(ds18b20_sensor.get_temperature(),1)
except Exception as e:
temperature = -9999
return temperature
venv
Pythonの仮想環境
.env
環境変数の設定ファイル
ENDPOINT=hogehoge.iot.ap-northeast-1.amazonaws.com
PORT=8883
CERT_FILEPATH=raspberry_pi_4.cert.pem
PRI_KEY_FILEPATH=raspberry_pi_4.private.key
CA_FILEPATH=root-CA.crt
CLIENT_ID=raspberry_pi_4
MESSAGE_COUNT=1
MESSAGE_TOPIC=raspberry_pi_4/ds18b20/python
トピック名とクライアントID変更
トピック名とクライアントIDがデフォルトのままになっているので、環境変数ファイル(.env)に記載した内容になるようコンソール画面で変更する
IoT Coreのコンソール画面から、[セキュリティ] > [ポリシー] > [raspberry_pi_4-Policy]に移動し、JSON記載画面に移動し変更を行う
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Publish",
"iot:Receive",
"iot:PublishRetain"
],
"Resource": [
"arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topic/raspberry_pi_4/ds18b20/java",
"arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topic/raspberry_pi_4/ds18b20/python",
"arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topic/raspberry_pi_4/ds18b20/js"
]
},
{
"Effect": "Allow",
"Action": "iot:Subscribe",
"Resource": [
"arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topicfilter/raspberry_pi_4/ds18b20/java",
"arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topicfilter/raspberry_pi_4/ds18b20/python",
"arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:topicfilter/raspberry_pi_4/ds18b20/js"
]
},
{
"Effect": "Allow",
"Action": "iot:Connect",
"Resource": [
"arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:client/sdk-java",
"arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:client/raspberry_pi_4",
"arn:aws:iot:ap-northeast-1:xxxxxxxxxxxx:client/sdk-nodejs-*"
]
}
]
}
Amazon DynamoDBを作成する
ここまででRaspberry → IoT Coreの設定は完了したので、次はIoT Core → DynamoDB保存の設定を行う。
トピックのJSON構造
DynamoDBの設定の前に一旦トピックの内容を確認
RaspberryPi → IoT CoreへのMQTTメッセージは、以下のJSON形式でパブリッシュされる
{
"device_id" : 'ds18b20',
"temperature": temperature,
"datetime" : datetime_now,
}
DynamoDBのテーブル構造
項目 | 値 | データ型 |
---|---|---|
テーブル名 | raspberry_pi_4_data | |
パーティションキー | device_id | String |
ソートキー | datetime | String |
パーティションキーとソートキーで複合プライマリーキーとなるように設定する
(参考) Amazon DynamoDB のコアコンポーネント
DynamoDBテーブルの作成
DynamoDBのコンソール画面から [テーブル] > [テーブル作成] を開き、テーブル構造で決めた設定内容を入力する
IoT Core ルールを作成する
最後に、IoT Coreルールを作成し、IoT Coreにデバイスデータが飛んで来た時にDynamoDBへ保存する設定を行う
ルールの設定
IoT Coreのコンソールから、[メッセージのルーティング] > [ルール] に移動し、[ルールを作成] する
ルール名: raspberry_pi_4_data
説明:IoT Coreで受信したraspberry pi からのJSONデータを DynamoDBに保存する
SQLステートメントの設定
受信したMQTTメッセージを、DynamoDBに保存する為のSQLを設定する
SQLのバージョン:2016-03-23
SQLステートメント
SELECT
device_id AS device_id,
temperature AS temperature,
datetime AS datetime,
FROM
'raspberry_pi_4/ds18b20/python'
ルールアクションをアタッチ
先ほど作成したDynamoDBテーブル設定をします。
-
アクション:DynamoDBv2
-
テーブル:raspberry_pi_4_data
-
IAMロール:raspberry_pi_4_query_role([新しいロールを作成] から新規作成します)
動作確認
Raspberry Pi上でPythonスクリプトを実行する
python ./mqtt_publish.py
IoT Coreへパブリッシュが正常に完了すれば、以下が出力される
Connecting to hogehoge.iot.ap-northeast-1.amazonaws.com with client ID 'raspberry_pi_4'...
Connection Successful with return code: 0 session present: False
Connected!
Sending 1 message
Publishing message to topic 'raspberry_pi_4/ds18b20/python': {"device_id": "ds18b20", "temperature": 27.6, "datetime": "202408150030"}
Disconnecting...
Connection closed
Disconnected!
IoT Coreを確認する
IoT Coreのコンソール画面から。[MQTTクライアント]を開く。
トピックを指定し、サブスライブした状態でスクリプトを実行すると IoT Coreにパブリッシュされていることが分かる。
DynamoDBを確認する
DynamoDBのコンソール画面から、[項目を探索] 画面を開く。
[raspberry_pi_4_data] テーブルの画面を開くと、IoT Coreでサブスクリプションしたデータが保存されていることが分かる
最後に
これで水温測定 → Raspberry Piから、IoT Core → DynamoDBまでデータと送れるようになったので、cron設定や定期実行用スクリプトを作成すれば定期的にDynamoDBへデータを送信することができる。
そこまでできればWebアプリケーションとの連携準備が整うはずなので、今後はWebアプリケーションからDynamoDBのデータを取得して、データ可視化を目標にやっていきたいと思う。