2024.8.27追記
2024年8月現在、MQTTclientの認証方法が更新されいる。最新のMQTTClientの動きに合わせてコードや記事の一部を更新した。
はじめに
2023年5月現在、日本でも購入できるようになり、在庫も潤沢になり比較的手軽に入手しやすくなったRaspberry Pi Pico W。Wi-Fi機能付きであるため、Pico Wから得られたセンサデータをAWSへ送信して、なんちゃってホームIoT構築することができる!と夢が広がるが、Pico W + AWS Iot Core環境をセットアップを構築するにあたって、まとまった日本語記事が見つからなかった。データを送って保存するまでの手順を簡単に紹介する。
この記事で紹介すること
- Raspberry Pi Pico W本体でアナログ電圧値を取得する方法
- MicroPythonを使ってMQTTでIoTCoreへのデータ送信する方法
- IoT Coreに送信されたデータをDynamoDBへデータを保存する方法
この記事で紹介しないこと
- MicroPythonの詳細な説明
- AWS IoT Coreの詳細な説明
- MQTTの詳細な説明
モチベーション
- IoT Coreを一度使ってみたい!・・・でも、Raspberry Piが入手できず出来なかった。
Pico Wは良いチャンス。 - 洗面所など、ビルトインのため自動消灯式の電球へ変えられない照明がある。
照度センサーを付けて、付け忘れ時にLINEなどへ通知させたい。
Step1 Raspberry Pi Pico W本体でアナログ電圧値を取得する
準備するもの
- Raspberry Pi Pico W …今回は3台用意
- フォトレジスタ
https://www.amazon.co.jp/gp/product/B082X6XGMQ - 金属皮膜抵抗 5.6KΩ
- はんだ工具セット一式
1.1 回路セットアップ
フォトレジスタ(周辺の照度変化に応じて抵抗値が変わる)と金属皮膜抵抗で、VCC電圧を分圧して、フォトレジストの抵抗値変化をアナログ電圧値変化として読み取る。GL5528は、暗所で抵抗値が大、明るいと抵抗値が小になる。この回路のAD値としては、暗所で電圧小、明るいと電圧大が観測される。抵抗の5.6KΩという値は、実際の部屋で抵抗値を変えながら値をモニターし、我が家に合う最適なものを選んだ。
他の2台も同じ回路とした。
1.2 MicroPython開発環境の準備
先人がすでにLチカまでの環境構築をされている。
https://qiita.com/relu/items/945bcb45120c30837745
手順としては、.uf2
形式のファームウェアをPico Wへ焼き、MicroPython環境であるThonnyとCOM接続する。Thonnyでは、Pico Wのファイルシステムにアクセスできるので、あとは任意のソフトを書いて焼けば、Pico WでPythonが走る・・・という流れ。MicroPythonは、ESP32でもよく利用されている様子。筆者は今回初めて利用した。
システムルートには2つのpythonファイルが置ける。
-
boot.py
最初にこちらが実行される。 -
main.py
boot.py
の後に実行される。
参考:https://micropython-docs-ja.readthedocs.io/ja/latest/pyboard/tutorial/script.html
boot.py
ナシ、main.py
だけでも動作することは確認した。
ADC2ポートに接続されているため、achine.ADC(2)
のようにsensor
を定義する。うまくいけばThonnyの標準出力にセンサー電圧値が出力される。
import machine
import time
def read_sensor():
sensor = machine.ADC(2)
factor = 3.3 / 65535
sensor_volt = sensor.read_u16() * factor
return sensor_volt
while True:
print(read_sensor())
time.sleep_ms(1000)
Step2 MicroPythonからMQTTでIoT Coreへのデータ送信する
受信するAWS側の準備と、データを送信するPico W側の準備に分けて解説する。
2.1 AWS側の準備
IoT Coreとエッジ端末でできることは色々あるが、やりたいことが増えれば増えるほど、SDKを必要とする。現在、IoTCoreのSDKは、Pythonの実行環境があれば動作するようで、Raspbery Pi4*/Zeroには対応しているようだが、残念ながらPico Wには非対応である。今回は単に、エッジデータをAWSに対して送信する環境を構築する。
参考:IoT Coreを使ってエッジ端末でできること
- MQTTでエッジ端末とデータを送受信する
- AWSのIoT Coreエンドポイントに対してセンサーデータを送信する
- MQTTメッセージをエッジ側で受信して、あらかじめ決まったアクション(あらかじめ予約されたアプリケーション)を実行する - IoT GreengrassでLambdaライクにエッジへアプリケーションデプロイ、実行する(OTA)
2.1.1 IoT Coreコンソール画面からモノを登録する
必要に応じてモノのタイプなども登録する。IoTのThingsのモノである。
2.1.2 証明書を生成させるオプションを選択する
2.1.3 ポリシーをアタッチする。
必要に応じたポリシーを事前に生成しておき、証明書にアタッチする。
IoT Coreの資格情報の管理は、IAMではなくて、IoT Core内の独自のポリシーによって資格管理がなされる。例えば、IoT Coreエンドポイントにエッジ端末から接続するiot:Connect
、MQTTトピックを発行するためのiot:Publish
と言った要領。
証明書に結び付く形でポリシーを選び、資格管理をすることができる。
モノ
├─証明書
│ ├─ポリシーA
│ │ ├─iot:Connect
│ │ └─iot:Publish
│ └─ポリシーB
│ └─iot:Publish
├─証明書
│ └─ポリシーB
│ └─iot:Publish
2.1.4 証明書のダウンロードと変換
デバイスの一意性を証明するデバイス証明書と、通信用のprivate keyをダウンロードしておく。デバイス証明書、プライベートキー、ルートCA証明書の3点が必要である。
後述するMQTTライブラリumqtt.simple
では、.DER
形式の証明書が必要なため、.PEM
形式のファイルをOpenSSLにて変換する。今回はWindows用のOpenSLL 1.1.1を利用した。
# デバイス証明書
>openssl x509 -in 87b0f66d**"省略"**84a3543-certificate.pem.crt -inform PEM -out certificate.der -outform DER
# private key
>openssl rsa -in 87b0f66d**"省略"**84a3543-private.pem.key -inform PEM -out private.der -outform DER
# routeCA key
>openssl x509 -in AmazonRootCA1.pem -out routeca.der -inform PEM -outform DER
2.2 Pico Wの準備
MQTTの設計と、Iot Coreへデータを送信する
2.2.1 MQTT準備
IoT Coreのエンドポイントに対してMQTTでデータを送信する。
MQTTでは、3点事前に準備が必要である。
- エンドポイントの準備
- MQTTトピックの設計
- メッセージの準備
1つ目は前の章ですでに実施済み。2つ目トピックは、ベストプライスがAWSに示されているので参考にする。3つ目は、今回は照度情報だけ送りたいのでそれに応じたメッセージとする。
参考:MQTTトピック設計のガイドライン
https://d1.awsstatic.com/whitepapers/ja_JP/Designing_MQTT_Topics_for_AWS_IoT_Core.pdf?did=wp_card&trk=wp_card
MQTTトピック
# AWS例
dt/<application>/<context>/<thing-name>/<dt-type>
♯ 今回
dt/homeCtrl/tokyo/pico_***
メッセージ(アナログ値をそのまま送る)
{"illuminance": 1.23456}
2.2.2 main.py準備
MicroPythonでは、普通のPythonと同様に書ける。
AWS Iot Coreをデータに送るPythonスクリプトにおいて気を付けたポイントは以下。
- MQTTのライブラリ
umqtt.simple
を活用する
https://mpython.readthedocs.io/en/master/library/mPython/umqtt.simple.html -
.DER
ファイルはbinary形式で読み込む - Pico Wが複数端末あるため、デバイス名は
picoW_{MACアドレス}
形式で管理する - MQTTでは、クライアントIDが重複すると接続が解除される。一意なもの(デイバス名)とする。
- MQTTメッセージは、dict型データを
json.dumps
し、binary形式とする。 - Wi-Fi接続にはリトライ処理を入れておく
フォルダ構成、コード
root
├─certs
│ ├─routeca.der
│ ├─private.der
│ └─certificate.der
├─lib/umqtt
│ └─simple.py
└─main.py
コードはこちらにあります。
import json
import ssl
import time
import machine
import network
import ntptime
import ubinascii
from umqtt.simple import MQTTClient
iot_core_endpoint = "{your-end-point}.amazonaws.com"
mqtt_topic = "dt/homeCtrl/tokyo/"
wifi_access_point = "{yourSSID}"
wifi_password = "{yourSSID-wifi_password}"
def getClientID():
mac = ubinascii.hexlify(network.WLAN().config("mac"), ":").decode()
return "picoW_" + mac
def get_ssl_context():
keyfile = "/certs/private.der"
certfile = "/certs/certificate.der"
cafile = "/certs/routeca.der"
context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=cafile)
context.load_cert_chain(certfile, keyfile)
return context
def conncet_wifi():
wlan = network.WLAN(network.STA_IF)
wlan.active(False)
time.sleep_ms(1000)
wlan.active(True)
time.sleep_ms(3000)
retry = 10
while retry > 0:
wlan.connect(wifi_access_point, wifi_password)
timeout = 10
while not wlan.isconnected() and timeout > 0:
time.sleep_ms(1000)
print(".")
timeout -= 1
if not wlan.isconnected():
print("not connected")
if wlan.isconnected():
print("connected")
break
retry -= 1
def read_sensor():
sensor = machine.ADC(2)
factor = 3.3 / 65535
sensor_volt = sensor.read_u16() * factor
return sensor_volt
def getBinaryMessage(value1):
message = {}
message["illuminance"] = value1
data = json.dumps(message).encode()
return data
# initialize
led = machine.Pin("LED", machine.Pin.OUT)
led.off()
# wifi connect
conncet_wifi()
ntptime.settime()
ssl_context = get_ssl_context()
led.on()
# mqtt connect
mqtt_client_id = getClientID()
mqtt = MQTTClient(
mqtt_client_id, iot_core_endpoint, port=8883, keepalive=10000, ssl=ssl_context
)
mqtt.connect()
# send data
while True:
try:
led.on()
for i in range(100):
Msg = getBinaryMessage(read_sensor())
print(Msg)
print(mqtt_topic + mqtt_client_id)
mqtt.publish(topic=mqtt_topic + mqtt_client_id, msg=Msg, qos=0)
time.sleep_ms(60 * 1000)
except:
led.off()
conncet_wifi()
try:
mqtt.disconnect()
time.sleep_ms(1000)
except:
pass
mqtt.connect()
Thonnyで各種ファイルを書き込むとPico Wで動作し始める。print
標準出力でMsg
が出力され、正常動作している場合はLEDが点灯し、Pico W動作していることを確認できるようにしている。
Step3 MQTTデータをDynamoDBへ保存する
IoT Coreに発射されたデータをDynamoDBへ保存する。ルール内では、デバイスのトピックやメッセージをETLしてDynamoDBへ保存したり、Lambdaアクションへ繋げたりと、次のステップを定義できる。
3.1 ルールの作成
今回は、AWS IoTのルール機能からDynamoDBへ保存するルールを作る。DynamoDBについては複数選べるが、自動で入れ子型のデータを絡むに変換してくれるDynamoDBv2のアクションの方が、後のデータ利用に都合がよい。
-
メッセージ、トピック
- MQTTトピック:
dt/homeCtrl/tokyo/pico_***
- メッセージ:
{"illuminance": 1.23456}
- MQTTトピック:
-
保存先となるDynamoDB
DynamoDBのデータ構造としては、全3つの異なるデバイスから次々に送信されるデータが異なる項目として扱えるような構造で保存されるようにした。- プライマリキー:
timeStamp
- ソートキー:
DeviceID
- プライマリキー:
-
SQL文
- プライマリーの情報timeStampがないのでIoT Core標準関数
timestamp()
を使って生成する - ソートキーの情報deviceIDがないので、
topic()
関数でMQTTトピックからパースする -
#
をうまく使うことで、複数のトピックを一まとめに同じルールで処理することが可能になる。
(逆に言えば、最初のMQTTのトピック設計がかなり重要である。)
- プライマリーの情報timeStampがないのでIoT Core標準関数
SELECT *,cast(timestamp() As String) AS timeStamp, topic(4) AS deviceID, FROM "dt/homeCtrl/tokyo/#"
3.2 DynamoDB側でのデータの確認
DynamoDBコンソールからデータを確認する。今回Pico Wに焼いたソフトは、起動してから1分間隔にデータを送信してくるプログラムである。timeStamp
の降順でソートをかけると、次々に3種類の末尾5b,5c,5dのデバイスから送信されたデータが無事DynamoDBへ保存されたことが確認できる。