1. はじめに
- 背景:最近 AWS 全然触れてなかった+うちの部屋の二酸化炭素濃度高すぎて頭働かへんけど,定量的ではないなあ、、、
- 目的:室内CO₂を定期計測し、クラウドに保存、異常時に通知
- 使用機器・サービス
- Raspberry Pi + MH-Z19C
- AWS IoT Core, Lambda, S3, SNS
- Athena
- 弊社の偉大なる先輩のコード
2. Raspberry Pi 側
2-1. ビルド
git clone https://github.com/miutaku/co2-sensor
PWM のみで動作を確認しています.
2-2. AWS IoT Core を利用するための準備
# 1. Thing 作成
aws iot create-thing --thing-name RaspiCO2
# 2. 証明書作成
aws iot create-keys-and-certificate \
--set-as-active \
--certificate-pem-outfile cert.pem \
--public-key-outfile public.key \
--private-key-outfile private.key
# 出力される certificateArn は後でポリシーに紐付ける
# 3. ポリシー作成
aws iot create-policy \
--policy-name RaspiCO2Policy \
--policy-document '{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": ["iot:Publish","iot:Subscribe","iot:Connect","iot:Receive"],
"Resource": "*"
}
]
}'
# 4. 証明書にポリシーをアタッチ
aws iot attach-policy --policy-name RaspiCO2Policy --target <certificateArn>
# 5. Thing に証明書をアタッチ
aws iot attach-thing-principal --thing-name RaspiCO2 --principal <certificateArn>
# 6. MQTT エンドポイント取得
aws iot describe-endpoint --output json
# JSON 内の "endpointAddress" を Python スクリプトの client.configureEndpoint() に使用
2-3. AWS IoT Core に送信するためのコード
# co2_publish.py
import requests, time
from AWSIoTPythonSDK.MQTTLib import AWSIoTMQTTClient
client = AWSIoTMQTTClient("raspiCO2")
client.configureEndpoint("<your-iot-endpoint>", 8883) # 先ほど出力されたエンドポイント
client.configureCredentials("root-CA.crt", "private.key", "cert.pem") # 先ほどの鍵たち
client.connect()
while True:
co2 = requests.get("http://127.0.0.1:8080/co2").json()["co2"]
print(co2)
# うちのWifiはよく途切れるので
try:
client.publish("raspi/co2", str(co2), 0)
except Exception as e:
print(e)
time.sleep(10)
3. AWS 側
3-1. IoT Core
トピック raspi/co2 をサブスクライブするルール作成
SELECT * FROM 'raspi/co2'
3-2. Lambda(S3保存 + アラート)
import json, boto3, time
from datetime import datetime, timedelta
s3 = boto3.client('s3')
BUCKET = "my-co2-bucket"
TOPIC_ARN = "<SNS Topic ARN>"
THRESHOLD = 1000 # お役所が出している基準値
def lambda_handler(event, context):
co2 = event
ts = int(time.time())
dt = datetime.utcfromtimestamp(ts) + timedelta(hours=9) # JST
key_prefix = f"co2/{dt.year}/{dt.month:02}/{dt.day:02}/{dt.hour:02}/{dt.minute:02}/"
key = key_prefix + f"{ts}.json"
payload = {"co2": co2, "timestamp": ts}
s3.put_object(Bucket=BUCKET, Key=key, Body=json.dumps(payload))
# 夜は通知したくないので
if 9 <= dt.hour < 22 and co2 > THRESHOLD:
sns = boto3.client("sns")
sns.publish(TopicArn=TOPIC_ARN,
Message=f"CO₂が閾値を超えました: {co2} ppm",
Subject="CO₂アラート")
return {"status": "ok"}
IAM ロールの権限忘れずに!
3-3. SNS 設定
コンソールから SMS サブスクリプション作成
Lambda から TopicArn 経由で通知
4. S3 → Athena → QuickSight 可視化
Athena 外部テーブル例
CREATE EXTERNAL TABLE co2_data (
co2 double,
timestamp bigint
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
LOCATION 's3://my-co2-bucket/co2/';
※やってないけど,,,QuickSight で Athena をデータソースに接続 → 時系列グラフ作成も簡単にできるはず
5. 注意点 / Tips
- Raspberry Pi は常時電源接続が前提
- SMS は送信ごとに料金がかかる
6. まとめ
- IoT からクラウド、可視化までの フルパイプライン構築例
- Lambda で JST 時間を扱うことで夜間アラート回避
小言
うちの部屋ずっと基準値超えててアラートなりまくりぴえん