はじめに
ものすご~~く今さら感はありますが、自宅の環境モニタリングシステムをラズパイで作ってみました。
influxdbとgrafanaを利用してダッシュボードも作ってみました。
使用したパーツたち
BME280
ボッシュのBME280を搭載した温湿度・気圧センサーモジュールです。
頭痛持ちなので、気圧もモニタリングしたくてこれを選びました。
5Vモデルで、I²Cに対応しています。
MH-Z19E
NDIR方式のCO2センサーです。
シリアル/PWMに対応しています。
今回はシリアル通信です。
動作電圧が5V±0.1Vらしいので、ちょっと使いづらいかもしれません。
配線図
コード
Githubにあります。
センサー値取得
from datetime import datetime
from modules.altitude_handler import calculate_altitude_hypsometric
from modules.influx_handler import write_to_influxdb
from modules.disconfort_handler import disconfort_calculate
import adafruit_bme280
from modules.mhz19e_handler import MHZ19E
def read_sensor_data(bme280_sensor, co2_sensor, api):
"""センサーからデータを読み取り、表示し、InfluxDBに送信する。"""
try:
# CO2濃度の読み取り
co2_ppm = 500.00 # デフォルト値
if co2_sensor:
co2_reading = co2_sensor.read_co2()
if co2_reading is not None:
co2_ppm = co2_reading
else:
print("警告: MH-Z19からデータを読み取れませんでした。デフォルト値を使用します")
# BME280からすべてのデータを読み取り
if bme280_sensor:
try:
temperature = bme280_sensor.temperature
humidity = bme280_sensor.relative_humidity
pressure = bme280_sensor.pressure
altitude = bme280_sensor.altitude
except Exception as e:
print(f"エラー: BME280からデータを読み取れませんでした: {e}")
raise Exception("BME280が利用できません")
else:
raise Exception("BME280が利用できません")
# 不快指数の計算
disc_value = disconfort_calculate(temperature, humidity)
# 気圧高度補正の計算
corrected_altitude = calculate_altitude_hypsometric(bme280_sensor.sea_level_pressure, pressure, temperature, humidity)
# ここはなくてもいい
print(
f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] "
f"温度: {temperature:.2f} ℃, "
f"湿度: {humidity:.2f} %, "
f"CO2濃度: {co2_ppm:.2f} ppm, "
f"気圧: {pressure:.2f} hPa, "
f"センサー高度: {altitude:.2f} m, "
f"補正高度: {corrected_altitude:.2f} m, "
f"(海面気圧補正値: {bme280_sensor.sea_level_pressure:.2f} hPa)"
)
write_to_influxdb(api, temperature, pressure, humidity, disc_value, co2_ppm)
except (OSError, RuntimeError) as e:
print(f"センサーからの読み取りエラー: {e}")
BME280
もMH-Z19
もメソッドかインスタンスで扱えるので、とても楽です。
不快指数
不快指数も求めています。
def disconfort_calculate(temp: float, humidity: float)-> float:
disc_value = 0.81 * temp + 0.01 * humidity * (0.99 * temp - 14.3) + 46.3
return disc_value
高度測定について
せっかく気圧センサーがあるので、高度を計ってみようということで高度計算コードも入っています。
それっぽい精度が出ればよかったので、地点を変えての比較計測などはしていません。
仕組みとしては、こんな感じです。
気圧高度
アメダスの海面規正気圧を0mとし、対数気圧高度式を利用してそこそこ精度が出るようにしています。
h = \frac{R_d \times T_v}{g_0} \ln\left(\frac{P_0}{P}\right)
パラメータ
-
h
: 高度 (m) -
R_d
: 乾燥空気の気体定数 (287.058 J/(kg·K)) -
T_v
: 仮想温度 (K) -
g₀
: 重力加速度 (9.80665 m/s²) -
P₀
: 海面気圧 (hPa) -
P
: 測定地点の気圧 (hPa)
仮想温度の計算
湿度の影響を考慮した仮想温度を計算:
def calculate_virtual_temperature(temperature_c, relative_humidity_pct, pressure_hpa):
# 飽和水蒸気圧の計算(テテンスの式)
es = 6.1078 * (10 ** ((7.5 * temperature_c) / (237.3 + temperature_c)))
# 実際の水蒸気圧
e = es * (relative_humidity_pct / 100.0)
# 混合比の計算
r = (EPSILON * e) / (pressure_hpa - e)
# 仮想温度
tv_k = temperature_k * (1 + r / EPSILON) / (1 + r)
自動気圧校正
気象庁AMeDASデータ連携
30分間隔で気象庁のアメダスデータから海面気圧を取得して、センサーを自動校正します。
気象庁にjson形式で観測データがあるので、それを利用しています。
なお、jsonファイル自体は3時間おきに新しくなりますが、中のデータは10分ごとに更新されています。
def get_latest_normal_pressure(point_id: str) -> float | None:
"""
気象庁のアメダスJSONデータから最新の海面更正気圧(normalPressure)を取得する。
"""
try:
jst = timezone(timedelta(hours=9))
now_jst = datetime.now(jst)
json_hour = (now_jst.hour // 3) * 3
date_str = now_jst.strftime('%Y%m%d')
url = f"https://www.jma.go.jp/bosai/amedas/data/point/{point_id}/{date_str}_{json_hour:02d}.json"
print(f"\n校正データ取得中: {url}")
response = requests.get(url, timeout=15)
response.raise_for_status()
data = response.json()
if not data:
print("警告: JSONデータが空です。")
return None
latest_timestamp_key = sorted(data.keys())[-1]
latest_data = data[latest_timestamp_key]
normal_pressure_list = latest_data.get("normalPressure")
if normal_pressure_list and isinstance(normal_pressure_list, list) and len(normal_pressure_list) > 0:
normal_pressure = normal_pressure_list[0]
if normal_pressure is not None:
dt_str = datetime.strptime(latest_timestamp_key, "%Y%m%d%H%M%S").strftime('%Y-%m-%d %H:%M:%S')
print(f"気圧データ取得成功: {dt_str}")
return float(normal_pressure)
print("警告: 最新データに 'normalPressure' が見つかりません。")
return None
except requests.exceptions.RequestException as e:
print(f"警告: ネットワークエラー: {e}")
return None
except (ValueError, KeyError, IndexError) as e:
print(f"警告: JSON解析またはキー取得失敗: {e}")
return None
except Exception as e:
print(f"予期せぬエラーが発生しました: {e}")
return None
校正プロセス
- 3時間毎に更新されるアメダスデータを取得
- 最新の
normalPressure
(海面更正気圧)を抽出 - BME280センサーの基準海面気圧を更新
- 次回の測定から新しい基準値を使用
サンプル
温度: 25.11 ℃, 湿度: 52.33 %, 気圧: 1013.74 hPa, 補正高度: 32.52 m, (海面気圧補正値: 1017.50 hPa)
国交省の高度データで自宅位置が23m、マンションの4階分の高さを考慮して10mプラスすると大体32~33mなので、そこそこ精度が出ていると思います。
ただ時間経過でズレていくので、校正時間を短くすれば精度を維持できるはずです。
ダッシュボード
grafanaで作りました。
しきい値はまだ試行錯誤中です。
湿度・CO2濃度はまだダミー値なので気にしないでください。
ダッシュボードのJSON