作るもの
部屋の環境を監視し,環境に応じて(二酸化炭素濃度が一定以上になったときなど)に通知をするもの.
使うもの
- Raspberry Pi 3b (など)
- MHZ19 二酸化炭素センサー
- BME680 センサー
- Nerves
- IFTTT (webhook)
前提
- Nerves プロジェクトがある (ここでは
WarblerFirmware
という名前のプロジェクト) - センサーが配線されている
MHZ19 を使う
Mhz19
モジュールを準備しました.
この記事を参照: Elixir × Nerves で MH-Z19 を動かす
BME680 を使う
幸いライブラリがあるので,使います.(https://github.com/lucaong/elixir_bme680)
def deps = [
# ....
{:elixir_bme680, "~> 0.2.2"},
]
値の読み取り
elixir_bme680
の README.md
にも書いてますが,こんな感じで使います.
i2c_address
は 0x76
か 0x77
の二種類のセンサーが売ってるみたいなので,指定してあげます.
{:ok, bme680_pid} = Bme680.start_link(i2c_address: 0x77)
Bme680.measure(bme680_pid)
# %Bme680.Measurement{
# temperature: 21.74,
# pressure: 1090.52,
# humidity: 45.32,
# gas_resistance: 10235
# }
IAQ(Indoor Air Quality) を算出
BME680 は,BSEC というソフトウェアと一緒に使うと IAQ インデックスという空気の質の指標を算出できるらしいです.BSEC を組み込むことは難しかったので,https://github.com/G6EJD/BME680-Example を参考に 簡易的に IAQ の値を出す処理を実装してみました.理論的なところや,これが正しいかは全然わかりません.ただ,CO2濃度が高いとスコアが悪くなるなど,それっぽい値が出るので今回は良しとしました・・・
-
get_aqi_score
で IAQ インデックスと,コメントを返します.(例:{91.12, "Good"}
)
defmodule WarblerFirmware.Sensors.Bme680Tools do
alias Bme680
@hum_weighting 0.25
@gas_weighting 0.75
@hum_reference 40
def get_aqi_score(bme680_pid) do
measurement = Bme680.measure(bme680_pid)
hum_score = get_hum_score(measurement.humidity)
gas_score = get_gas_score(measurement.gas_resistance)
score = hum_score + gas_score
{score |> Float.round(2), get_aqi_string(score)}
end
defp get_aqi_string(score) do
score = (100 - score) * 5
cond do
score >= 0 && score < 50 -> "Good"
score >= 50 && score < 150 -> "Moderate"
score >= 150 && score < 175 -> "Unhealthy for Sensitive Groups"
score >= 175 && score < 200 -> "Unhealthy"
score >= 200 && score < 300 -> "Very Unhealthy"
score >= 300 -> "Hazardous"
true -> "Measurement Error"
end
end
defp get_hum_score(humidity) do
cond do
humidity >= 38 && humidity <= 42 ->
# Humidity +/-5% around optimum
@hum_weighting * 100
humidity < 38 ->
@hum_weighting / @hum_reference * humidity * 100
true ->
(-@hum_weighting / (100 - @hum_reference) * humidity + 0.416666) * 100
end
end
defp get_gas_score(gas_reference) do
gas_lower_limit = 5000
gas_upper_limit = 50000
gas_reference =
cond do
gas_reference > gas_upper_limit ->
gas_upper_limit
gas_reference < gas_lower_limit ->
gas_lower_limit
true ->
gas_reference
end
(@gas_weighting / (gas_upper_limit - gas_lower_limit) * gas_reference -
gas_lower_limit * (@gas_weighting / (gas_upper_limit - gas_lower_limit))) * 100
end
end
センサーモジュールの定義
次は,2つのセンサーを扱う GenServer を定義します.
方針
- MHZ19モジュールの pid と,bme680ライブラリ の pid を状態として持つ.
-
:read
メッセージを受け取りセンサーの値を返す. - 使いやすいように
read_sensor_data
で読めるようにする. -
measurement_to_string
でいい感じの文字列にする.
コード
defmodule WarblerFirmware.Sensors.Sensor do
use GenServer
alias WarblerFirmware.Sensors
alias WarblerFirmware.Sensors.Bme680Tools
def start_link(init \\ "") do
GenServer.start_link(__MODULE__, init, name: __MODULE__)
end
end
def init(_opts) do
{:ok, mhz19_pid} = GenServer.start_link(Sensors.Mhz19, {})
{:ok, bme680_pid} = Bme680.start_link(i2c_address: 0x77)
{:ok, %{mhz19_pid: mhz19_pid, bme680_pid: bme680_pid}}
end
def handle_call(:read, _from, state) do
{:ok, co2} = Sensors.Mhz19.measure(state.mhz19_pid)
bme680_reading = Bme680.measure(state.bme680_pid)
{aqi_score, aqi_string} = Bme680Tools.get_aqi_score(state.bme680_pid)
data = %{
co2: co2,
gas_resistance: bme680_reading.gas_resistance,
humidity: bme680_reading.humidity,
pressure: bme680_reading.pressure,
temperature: bme680_reading.temperature,
aqi_score: aqi_score,
aqi_string: aqi_string
}
{:reply, data, state}
end
def measurement_to_string(measurement) do
try do
"""
二酸化炭素濃度: #{measurement.co2}(ppm)
気温: #{measurement.temperature}(℃)
湿度: #{measurement.humidity}(%)
気圧: #{measurement.pressure}(hPa)
気体抵抗: #{measurement.gas_resistance}(ohms)
AQI: #{measurement.aqi_score}(#{measurement.aqi_string})
"""
rescue
_ -> "計測に失敗しています"
end
end
def read_sensor_data() do
case GenServer.call(__MODULE__, :read) do
%{} = data -> data
_ -> %{}
end
end
end
定期的に通知する
準備
ifttt で webhook をトリガーにするアクションを作る
また,IFTTT で適当に webhook を作っておき,config
に api key と webhook の名前を入れておく.(本当は コミット対象にならないファイルなどに入れておいたほうがいいかもしれない)
config :air_monitor,
ifttt_api_key: "abcdefgh-HIJKLMNO_P",
ifttt_webhook_name: "notify_line"
ライブラリを入れる
defp deps do [
# ...
{:httpoison, "~> 1.8"},
]
コード
-
@alert_interval_min
分ごとにセンサーを読んで,二酸化炭素濃度に応じて(@co2_threshold
より高ければ) webhook をトリガーする. -
:work
の プロセスコールを受けてセンサーを読み,処理をする. - プロセスコールを受けるたびに
Process.send_after/2
を使って次回の呼び出しを予約しておく.
defmodule WarblerFirmware.AirMonitor do
use GenServer
alias WarblerFirmware.Sensors.Sensor
@alert_interval_min 20
@co2_threshold 800
@ifttt_webhook_name Application.fetch_env!(:air_monitor, :ifttt_webhook_name)
@ifttt_api_key Application.fetch_env!(:air_monitor, :ifttt_api_key)
@line_notify_url "https://maker.ifttt.com/trigger/#{@ifttt_webhook_name}/with/key/#{
@ifttt_api_key
}"
def start_link(state \\ %{}) do
GenServer.start_link(__MODULE__, state, name: __MODULE__)
end
def init(state) do
Kernel.send(__MODULE__, :work)
{:ok, state}
end
def handle_info(:work, state) do
schedule_work()
work(state)
end
defp work(state) do
measurement = Sensor.read_sensor_data()
if(measurement.co2 > @co2_threshold) do
("CO2濃度が高くなっています\n" <> Sensor.measurement_to_string(measurement))
|> push_notify_line()
end
{:noreply, state}
end
defp push_notify_line(message) do
headers = [{"Content-type", "application/json"}]
HTTPoison.get(
@line_notify_url,
headers,
params: %{value1: message}
)
end
defp schedule_work() do
Process.send_after(self(), :work, @alert_interval_min * 60 * 1000)
end
end
Nerves 起動時に立ち上がるようにする
AirMonitor
が起動時に立ち上がるように,application.ex
を編集しておく.
def children(_target) do
[
{WarblerFirmware.AirMonitor, %{}}
]
end
動作
あとは,mix upload
すれば,こういうふうに通知が来るようになる.
なぜか初回の通知まで書き込んでからタイムラグが有るが,2分ほど待つと,(部屋のCO2濃度が高ければ) 初回の通知が来る.
終わりに
- BME680 から得られた計測値は表示するだけで,特に使われていないですが,トリガー条件に加えれば,色々できそうです.(気温があがったらエアコンを付けるなど)
- 前回の記事が初めてでしたが,LGTMなど反応をいただけたので喜んでいます.