本記事は、NSSOL Advent Calendar 2024 16日目の記事です。
ぜひ、他の記事もご覧ください。
はじめに
背景
在宅で仕事をしていると、ふと「(物理的に)空気が悪いな」と思うことがよくあります。
耐えきれずに換気をすると、確かに何かが良くなったように感じます。
しかし、実際に空気のどこがどのくらい悪くて、どう変化したのか、よくわかりません。
さらに言えば、どのような換気法がよい(窓をどのくらい開けるか、エアコンをどのように設定するか)のか、自分の中でははっきりしていません。
その辺を知るために、センサを用いて空気に関する量を色々取得して眺めてみたい、というのが本記事の目的となります。
今回は学習目的も兼ねて環境を自作することにします。
(最近発売されたSwitchBotのCO2センサなど、市販のものでもよいと思います)
せっかくなのでいっぱい項目を測定してみることにします。
測定する量は以下の5種類です:
- CO2濃度
- 温度
- 相対湿度
- 気圧
- 照度
この量が一日を通じてどう変化したのかを観察するためのシステムを実装しました。
記事ではシステムの実装法を紹介します。
ついでにこのシステムで測定出来た内容を一部紹介します。
構成
今回作ったシステムの構成図です。
以下の3つに分けて、実装法を解説します。
- マイコンでのセンシング+データ送信
- AWSでのデータ受信+格納
- ローカルPCでのデータ可視化
実装①:マイコンでのセンシング+データ送信
機器の用意
必要なものは
- WiFi接続とI2C通信が可能なマイコン
- 電源+電源ケーブル(TypeA-TypeC)
- センサ類
- ブレッドボード、ジャンパ
- はんだ、はんだごて
です。
自分は以下のものを購入しました。
値段については、執筆当時(2024.12.15)のものになります。
機器 | リンク | 値段 |
---|---|---|
マイコン | Freenove ESP32-WROVER | ¥2480 |
電源ケーブル | (100均で購入) | ¥110 |
センサ(CO2、温度、相対湿度) | Dovhmoh SCD40ガスセンサーモジュール | ¥2819 |
センサ(温度、気圧) | GAOHOU GY-BMP280-3.3 | ¥599 |
センサ(照度) | KKHMF 3個 BH1750 GY302 | ¥899 |
ブレッドボード | サンハヤト SAD-101 ニューブレッドボード | ¥568 |
合計 | ¥7475 |
今回は別用で購入したカメラ付きのESP32を流用しているため、ESP32にしては高めな値段になっています。
もっと安いマイコン(例えば、カメラなしのESP32)でも実行可能だと思います(未検証)。
CO2の測定にはSCD40というセンサを使用しています。
このセンサは校正のために週に一度以上、400ppm程度の外気に触れさせる必要があります。
詳細は公式のデータシートをご確認ください。
ジャンパとはんだ / はんだごては、手元に余っていたものをそのまま使いました。
手元にない場合は、センサの接続で必要になるので別途購入が必要です。
他にも、ブレッドボードももう一枚、手元に余っていたものを使用しています。
部品の接続
センサについて、ブレッドボードに接続した時に測定部が板の表側に来るようにピンヘッダをはんだ付けします。
(筆者は向きを間違えたせいで照度センサを一つ台無しにしました)
今回使用するマイコンのピン配置は以下のようになっています。
(引用:GitHub)
各センサのVCC(VDD)、GND、SCL、SDAを対応するピンに並列で繋ぎます。
ブレッドボードの穴は、画像のように裏側で繋がっています。
これを元に配線を行います。
(引用:ブレッドボードの使い方【ブレッドボードでLEDを光らせてみよう)
AWS IoT Coreでの設定
マイコンをAWSに接続するにあたり、IoT Coreを使います。
AWSのアカウントを持っていない場合、ここで作成をしておきます。
IoT Coreにてモノの登録を行い、デバイスの証明書、公開鍵、秘密鍵、ルートCA証明書を入手しておきます。
詳細についてはこちらの方の記事をご参照ください。
Arduino IDEの準備
マイコンにコードを書き込むために、今回はArduino IDEを使用します。
以下のリンクからダウンロードをします。
以下の設定を行います。
Board Managerの追加
File > Preferences から、Additional boards manager URLsに、以下のURLを追加します。
https://raw.githubusercontent.com/espressif/arduino-esp32/gh-pages/package_esp32_index.json
OKを押して適用します。
ライブラリの追加
Tools > Manage Libraries を選択、あるいはGUI左列の本棚のマークをクリックすると、ライブラリの検索画面が立ち上がります。
以下の4つのライブラリをインストールします。
ボードの設定
接続しているポートに(デフォルトだとCOM3)、ESP32 Wrover Moduleを選択します。
OKを押して適用したら完了です。
コードのビルド
以下の2つのコードを同じディレクトリに配置し、左上にある、→ボタンをクリックして書き込みます。
数分程度かかると思います。(Leaving...となっていれば書き込み自体は完了しています)
コード(クリックして展開)
#include <WiFi.h>
#include <PubSubClient.h>
#include <WiFiClientSecure.h>
#include "secrets.h"
#include <Wire.h>
#include <BH1750.h>
#include <Adafruit_BMP280.h>
#include <SensirionI2CScd4x.h>
#include <time.h>
// ===== センサーの初期化 =====
BH1750 lightMeter;
Adafruit_BMP280 bmp;
SensirionI2CScd4x scd4x;
// ===== AWS IoT設定 =====
const int mqttPort = 8883;
const char* mqttUser = "";
const char* mqttPassword = "";
WiFiClientSecure espClient;
PubSubClient client(espClient);
// ===== グローバル変数 =====
time_t lastSentTime = 0; // 最後にデータを送信した時刻
// ===== Wi-Fi接続設定 =====
void setupWiFi() {
Serial.print("Connecting to WiFi...");
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(500);
Serial.print(".");
}
Serial.println("\nWiFi connected.");
Serial.println("IP address: " + WiFi.localIP().toString());
// NTPサーバーの設定
configTime(9 * 3600, 0, "pool.ntp.org", "time.nist.gov");
Serial.print("Waiting for time sync...");
while (time(nullptr) < 8 * 3600 * 2) { // NTP同期を待つ
Serial.print(".");
delay(1000);
}
Serial.println(" Time synchronized!");
}
// ===== AWS IoT Coreへの接続 =====
void connectToAWS() {
Serial.print("Connecting to AWS IoT...");
espClient.setCACert(caCert);
espClient.setCertificate(clientCert);
espClient.setPrivateKey(clientKey);
while (!client.connected()) {
if (client.connect(clientID)) {
Serial.println("Connected to AWS IoT Core!");
} else {
Serial.print("Failed to connect, rc=");
Serial.println(client.state());
delay(1000);
}
}
}
// ===== センサーの初期化 =====
void initializeSensors() {
Wire.begin();
if (!lightMeter.begin()) {
Serial.println("Could not find a valid BH1750 sensor, check wiring!");
while (1);
}
Serial.println("BH1750 sensor initialized.");
if (!bmp.begin(0x76)) {
Serial.println("Could not find a valid BMP280 sensor, check wiring!");
while (1);
}
Serial.println("BMP280 sensor initialized.");
scd4x.begin(Wire);
uint16_t error = scd4x.stopPeriodicMeasurement();
if (error) {
Serial.println("Failed to stop previous SCD40 measurement.");
}
error = scd4x.startPeriodicMeasurement();
if (!error) {
Serial.println("SCD40 sensor initialized and started periodic measurement.");
} else {
Serial.println("Failed to start SCD40 measurement, check wiring!");
while (1);
}
}
// ===== センサーデータ取得 =====
String collectSensorData() {
uint16_t co2 = 0;
float temperature_scd40 = 0.0f, humidity = 0.0f;
float temperature_bmp280 = bmp.readTemperature();
float pressure = bmp.readPressure() / 100.0F;
float lux = lightMeter.readLightLevel();
bool isDataReady = false;
uint16_t error = scd4x.getDataReadyFlag(isDataReady);
if (isDataReady) {
scd4x.readMeasurement(co2, temperature_scd40, humidity);
}
// 現在の日付と時刻を取得
time_t now = time(nullptr);
struct tm* timeInfo = localtime(&now);
char dateTimeBuffer[20]; // YYYY-MM-DD HH:MM:SS形式
strftime(dateTimeBuffer, sizeof(dateTimeBuffer), "%Y-%m-%d %H:%M:%S", timeInfo);
// JSON形式でデータを構築
String jsonData = "{";
jsonData += "\"Timestamp\":\"" + String(dateTimeBuffer) + "\",";
jsonData += "\"CO2\":" + String(co2) + ",";
jsonData += "\"Temperature_SCD40\":" + String(temperature_scd40) + ",";
jsonData += "\"Humidity\":" + String(humidity) + ",";
jsonData += "\"Temperature_BMP280\":" + String(temperature_bmp280) + ",";
jsonData += "\"Pressure\":" + String(pressure) + ",";
jsonData += "\"Lux\":" + String(lux);
jsonData += "}";
return jsonData;
}
// ===== データ送信 =====
void publishData(String data) {
if (client.publish(mqttTopic, data.c_str())) {
Serial.println("Message sent to AWS IoT Core: " + data);
} else {
Serial.println("Failed to send message.");
}
}
// ===== セットアップ =====
void setup() {
Serial.begin(115200);
setupWiFi();
initializeSensors();
client.setServer(mqttServer, mqttPort);
connectToAWS();
}
// ===== ループ =====
void loop() {
if (!client.connected()) {
connectToAWS();
}
client.loop();
// 現在の時刻を取得
time_t now = time(nullptr);
struct tm* timeInfo = localtime(&now);
// 1分ごとかつ秒が0のときにデータ送信
if (timeInfo->tm_sec == 0 && timeInfo->tm_min % 1 == 0 && now != lastSentTime) {
String sensorData = collectSensorData();
publishData(sensorData);
lastSentTime = now; // 送信時刻を更新
}
}
#include <pgmspace.h>
const char* clientID = "YOUR_THING_NAME"; // AWS IoT Coreで設定したモノの名前
const char* mqttTopic = "YOUR_TOPIC_NAME"; // 通信で使用するTopicの名前
// Wi-Fi設定
const char* ssid = "YOUR_WIFI_SSID"; // Wi-FiのSSID
const char* password = "YOUR_WIFI_PASSWORD"; // Wi-Fiのパスワード
const char* mqttServer = "xxxxxxxxxxxx-ats.iot.your_region_info.amazonaws.com"; // AWS IoT CoreのエンドポイントURL
// Amazon Root CA
const char* caCert = R"EOF(
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
)EOF";
// デバイスの証明書
const char* clientCert = R"KEY(
-----BEGIN CERTIFICATE-----
-----END CERTIFICATE-----
)KEY";
// デバイスの秘密鍵
const char* clientKey = R"KEY(
-----BEGIN RSA PRIVATE KEY-----
-----END RSA PRIVATE KEY-----
)KEY";
secret.h
にあるWiFiのSSID/パスワード、AWSの認証情報についてはご自身のものをご記入ください。
mqttTopic
には適当な名前(あとでIoT CoreとLambda等を接続する際に使います)を、mqttServer
はIoT Coreの設定画面から取得したドメイン名をご記入ください。
実装②:AWSでのデータ受信+格納
S3のバケット作成
センサ情報が保存されるストレージ(バケット)を作成します。
S3の管理画面にアクセスし、バケットを作成
からバケットを作成します。
名前を適当に設定し、それ以外はデフォルトの設定にします。
Lambdaの関数作成
マイコンから情報が来た場合に、S3にデータを格納するためのLambdaの関数を設定します。
関数の作成
から、適当に関数名を決め、ランタイムについてはPython 3.x
の関数を作成します。
(使用している関数は、ここではputDataToCSV
としています)
実装したLambdaの関数について、以下のコードをコード
のlambda_function.pyに記入し、Deployします。
コード(クリックして展開)
import boto3
import csv
import io
import json
s3 = boto3.client('s3')
# S3バケット名
BUCKET_NAME = "YOUR_BUCKET_NAME"
# カラム名(ヘッダー)
HEADERS = ["Timestamp", "CO2[ppm]", "Temperature_SCD40[°C]", "Humidity[%]", "Temperature_BMP280[°C]", "Pressure[hPa]", "Lux[lx]"]
def lambda_handler(event, context):
# ESP32から送信されたJSONデータをそのまま処理
try:
# ESP32からのデータをそのまま取得
data = event # IoT Coreからのイベントデータ
except json.JSONDecodeError as e:
return {
'statusCode': 400,
'body': f"Invalid JSON format: {e}"
}
# 日付ベースのファイル名を生成
current_date = data["Timestamp"].split(" ")[0] # 日付部分を抽出
file_name = f"data/{current_date}.csv"
# 新しいデータを準備
new_data = [
data["Timestamp"],
data["CO2"],
data["Temperature_SCD40"],
data["Humidity"],
data["Temperature_BMP280"],
data["Pressure"],
data["Lux"]
]
try:
# S3から既存ファイルを取得
response = s3.get_object(Bucket=BUCKET_NAME, Key=file_name)
csv_content = response['Body'].read().decode('utf-8')
existing_data = list(csv.reader(io.StringIO(csv_content)))
except s3.exceptions.NoSuchKey:
# ファイルが存在しない場合
existing_data = []
# データを追記
output = io.StringIO()
writer = csv.writer(output)
if not existing_data:
# 新しいファイルの場合、ヘッダーを追加
writer.writerow(HEADERS)
# 元のデータと新しいデータを統合
writer.writerows(existing_data)
writer.writerow(new_data)
# S3にアップロード
s3.put_object(
Bucket=BUCKET_NAME,
Key=file_name,
Body=output.getvalue(),
ContentType="text/csv"
)
return {
'statusCode': 200,
'body': f"Data successfully added to {file_name}"
}
この関数でS3にアクセスできるように、権限を変更します。
先ほど作成した関数のロールが作成されているので、IAMのロール
から、S3のGetObject
、PutObject
と、デバッグ用にCloudWatchのCreateLogStream
、CreateLogStream
、PutLogEvents
を許可しておきます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:GetObject",
"s3:PutObject"
],
"Resource": [
"arn:aws:s3:::YOUR_BUCKET_NAME/*"
]
},
{
"Effect": "Allow",
"Action": [
"logs:CreateLogGroup",
"logs:CreateLogStream",
"logs:PutLogEvents"
],
"Resource": "arn:aws:logs:*:*:*"
}
]
}
IoT Coreのルール設定
マイコンから投げられてきたデータをLambdaに渡すためのルールを設定します。
メッセージのルーティング
> ルール
から、ルールの作成
をクリックし、以下のようなルールを作成します。
- SQLのバージョン:
2016-03-23
SELECT * FROM 'YOUR_TOPIC_NAME'
マイコンに書き込んだコードのmqttTopicと、同じtopic名を入力してください。
IoTについてもロールが作成されているので、Lambdaへのアクセスをする権限を追加します。
IAMのロール
から、LambdaへのInvokeFunction
を許可しておきます。
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction"
],
"Resource": "arn:aws:lambda:your-region:xxxxxxxxxxx:function:YOUR_FUNCTION_NAME"
}
]
}
実装③:ローカルPCでのデータ可視化
ここまでで、マイコンで測定した時系列データをS3に保存し続けるようなシステムを実装しました。
手元で見れさえすればよいので、ローカルのPCでデータを可視化することにします。
データを可視化する手段として、今回はStreamlitを採用します。
(PythonやDataFrameを触ったことがあれば手っ取り早く実装できるのと、個人的にデザインが気に入っているのが選定理由です)
AWS CLIの環境設定
S3にアクセスするために、AWS CLIを使います。
以下の記事を参考に設定を行います。
Streamlitでの可視化
Anacondaなどの環境に、以下のライブラリをインストールします。
pip install pandas streamlit boto3
記事の実行環境は以下の通りです:
- python :
3.11.7
- streamlit :
1.41.1
- pandas :
2.1.4
- boto3 :
1.35.81
以下のコードを適当な場所にapp.py
の名前で保存し、実行します。
streamlit run .\app.py
import streamlit as st
import boto3
import pandas as pd
from io import StringIO
from datetime import datetime
# S3 設定
BUCKET_NAME = 'YOUR_BUCKET_NAME'
DATA_PREFIX = 'data/'
DEFAULT_DATE = datetime.now().strftime('%Y-%m-%d.csv')
# 色のマッピング(落ち着いた配色)
COLOR_MAP = {
"CO2": "#2E8B57", # シーグリーン
"Temperature_SCD40": "#A52A2A", # ブラウン
"Temperature_BMP280": "#CD853F", # ペルーベージュ
"Humidity": "#4682B4", # スチールブルー
"Pressure": "#708090", # スレートグレー
"Lux": "#BDB76B" # ダークカーキ
}
# S3 クライアント作成
s3 = boto3.client('s3')
# データを S3 から読み込む
def load_data_from_s3(bucket_name, file_path):
"""
指定した S3 バケットとファイルから CSV データを読み込み、DataFrame に変換。
"""
try:
obj = s3.get_object(Bucket=bucket_name, Key=file_path)
data = obj['Body'].read().decode('utf-8')
df = pd.read_csv(StringIO(data))
df['Timestamp'] = pd.to_datetime(df['Timestamp']) # Timestamp列をdatetime型に変換
return df
except Exception as e:
st.error(f"Error loading data from S3: {e}")
return None
# S3 バケット内のファイル一覧を取得
def list_files_in_s3(bucket_name, prefix):
"""
指定した S3 バケットのプレフィックス以下のファイル一覧を取得。
"""
try:
objects = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)
if 'Contents' in objects:
return [obj['Key'].replace(prefix, '') for obj in objects['Contents']]
else:
return []
except Exception as e:
st.error(f"Error listing files in S3: {e}")
return []
# 列名を簡素化
def simplify_column_names(df):
"""
特殊文字を含む列名を簡素化する。
"""
rename_map = {col: col.split("[")[0].strip() for col in df.columns if col != "Timestamp"}
df.rename(columns=rename_map, inplace=True)
return rename_map
# グラフ描画
def plot_graph(df, columns_to_plot, rename_map):
"""
指定された列をプロットし、Streamlit に表示する。
"""
for col in columns_to_plot:
if col in df.columns:
original_col = {v: k for k, v in rename_map.items()}[col] # 元の列名を取得
df[col] = pd.to_numeric(df[col], errors="coerce") # 数値型に変換
cleaned_col = df[col].dropna() # 欠損値を削除
if not cleaned_col.empty:
st.subheader(f"{original_col}")
st.line_chart(cleaned_col, use_container_width=True, color=COLOR_MAP[col])
else:
st.warning(f"No valid data for column: {original_col}")
else:
st.warning(f"Column '{col}' not found in the data.")
# Streamlit アプリ本体
def main():
st.title("ESP32 Sensor Data Visualization")
# S3 バケット内のファイル一覧を取得
file_list = list_files_in_s3(BUCKET_NAME, DATA_PREFIX)
if not file_list:
st.error("No files found in the S3 bucket.")
return
# サイドバーでファイル選択
default_index = file_list.index(DEFAULT_DATE) if DEFAULT_DATE in file_list else 0
selected_file = st.sidebar.selectbox("Select Date", file_list, index=default_index)
file_path = f"{DATA_PREFIX}{selected_file}"
# データ読み込み
df = load_data_from_s3(BUCKET_NAME, file_path)
if df is None:
return
# 列名を簡素化
rename_map = simplify_column_names(df)
columns_to_plot = [rename_map[col] for col in [
"CO2[ppm]",
"Temperature_SCD40[°C]",
"Temperature_BMP280[°C]",
"Humidity[%]",
"Pressure[hPa]",
"Lux[lx]"
] if col in rename_map]
# Timestamp をインデックスに設定
df.set_index('Timestamp', inplace=True)
# グラフ描画
plot_graph(df, columns_to_plot, rename_map)
if __name__ == "__main__":
main()
DataFrameについて、カラム名に一部特殊文字([]
や.
)が入っていると正しくplotがされません。
今回の場合だと元データのカラムに[]を使用してしまっているので、可視化側でそれを除外するような実装をしています。
立ち上がったサーバ(http://localhost:8501
)にアクセスすると、以下のようなダッシュボードが表示されます。
このシステムでわかること
部屋の空気を観察したり振り返ったりするシステムの実装法を紹介しました。
このシステムを稼働させてわかったことがいくつかあるので、一部を紹介します。
一日中換気をせずに、部屋で作業をしていた日のCO2濃度です。
だんだん高くなり、なんと3000ppmあたりの数字が出ています。
(センサの校正自体は済ませているはずなので、正しい値だと信じることにします。)
気持ちよく作業を続けられるどころではなく、健康に害が出るような数値になっています。
(引用:CO2モニター(二酸化炭素計測機))
やはり、換気なしでは健康的な環境ではなくなってしまうようです。
とはいえ、今の季節(12月)だと換気をし続けると手先が凍えて、それはそれで作業どころではありません。
エアコンを入れるか、他の方もやられているような換気Botを作るのが次の目標でしょうか。
おわりに
色々取得して眺めるシステムを実装しました。
何となく肌感でわかっていたことでも、改めて可視化すると納得できて面白いですね。
今後、このあたりを観察できると面白そうかなと考えています。
- 窓の開け具合による換気性能の評価(CO2濃度)
- エアコンの性能評価(CO2濃度、温度、湿度)
- 天気と気圧変化の関係(気圧)
- 生活サイクルの評価(照度)
それでは。