7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Nerves で環境センサーを動かしてアラートする

Posted at

作るもの

部屋の環境を監視し,環境に応じて(二酸化炭素濃度が一定以上になったときなど)に通知をするもの.

使うもの

  • Raspberry Pi 3b (など)
  • MHZ19 二酸化炭素センサー
  • BME680 センサー
  • Nerves
  • IFTTT (webhook)

前提

  • Nerves プロジェクトがある (ここでは WarblerFirmware という名前のプロジェクト)
  • センサーが配線されている

MHZ19 を使う

Mhz19モジュールを準備しました.
この記事を参照: Elixir × Nerves で MH-Z19 を動かす

BME680 を使う

幸いライブラリがあるので,使います.(https://github.com/lucaong/elixir_bme680)

def deps = [
	# ....
	{:elixir_bme680, "~> 0.2.2"},
]

値の読み取り

elixir_bme680README.md にも書いてますが,こんな感じで使います.
i2c_address0x760x77 の二種類のセンサーが売ってるみたいなので,指定してあげます.

{:ok, bme680_pid} = Bme680.start_link(i2c_address: 0x77)
Bme680.measure(bme680_pid)
# %Bme680.Measurement{
#     temperature: 21.74,
#     pressure: 1090.52,
#     humidity: 45.32,
#     gas_resistance: 10235
#   }

IAQ(Indoor Air Quality) を算出

BME680 は,BSEC というソフトウェアと一緒に使うと IAQ インデックスという空気の質の指標を算出できるらしいです.BSEC を組み込むことは難しかったので,https://github.com/G6EJD/BME680-Example を参考に 簡易的に IAQ の値を出す処理を実装してみました.理論的なところや,これが正しいかは全然わかりません.ただ,CO2濃度が高いとスコアが悪くなるなど,それっぽい値が出るので今回は良しとしました・・・

  • get_aqi_score で IAQ インデックスと,コメントを返します.(例: {91.12, "Good"})
defmodule WarblerFirmware.Sensors.Bme680Tools do
  alias Bme680

  @hum_weighting 0.25
  @gas_weighting 0.75
  @hum_reference 40

  def get_aqi_score(bme680_pid) do
    measurement = Bme680.measure(bme680_pid)
    hum_score = get_hum_score(measurement.humidity)
    gas_score = get_gas_score(measurement.gas_resistance)
    score = hum_score + gas_score
    {score |> Float.round(2), get_aqi_string(score)}
  end

  defp get_aqi_string(score) do
    score = (100 - score) * 5

    cond do
      score >= 0 && score < 50 -> "Good"
      score >= 50 && score < 150 -> "Moderate"
      score >= 150 && score < 175 -> "Unhealthy for Sensitive Groups"
      score >= 175 && score < 200 -> "Unhealthy"
      score >= 200 && score < 300 -> "Very Unhealthy"
      score >= 300 -> "Hazardous"
      true -> "Measurement Error"
    end
  end

  defp get_hum_score(humidity) do
    cond do
      humidity >= 38 && humidity <= 42 ->
        # Humidity +/-5% around optimum
        @hum_weighting * 100

      humidity < 38 ->
        @hum_weighting / @hum_reference * humidity * 100

      true ->
        (-@hum_weighting / (100 - @hum_reference) * humidity + 0.416666) * 100
    end
  end

  defp get_gas_score(gas_reference) do
    gas_lower_limit = 5000
    gas_upper_limit = 50000

    gas_reference =
      cond do
        gas_reference > gas_upper_limit ->
          gas_upper_limit

        gas_reference < gas_lower_limit ->
          gas_lower_limit

        true ->
          gas_reference
      end

    (@gas_weighting / (gas_upper_limit - gas_lower_limit) * gas_reference -
       gas_lower_limit * (@gas_weighting / (gas_upper_limit - gas_lower_limit))) * 100
  end
end

センサーモジュールの定義

次は,2つのセンサーを扱う GenServer を定義します.

方針

  • MHZ19モジュールの pid と,bme680ライブラリ の pid を状態として持つ.
  • :read メッセージを受け取りセンサーの値を返す.
  • 使いやすいように read_sensor_data で読めるようにする.
  • measurement_to_string でいい感じの文字列にする.

コード

defmodule WarblerFirmware.Sensors.Sensor do
  use GenServer

  alias WarblerFirmware.Sensors
  alias WarblerFirmware.Sensors.Bme680Tools


  def start_link(init \\ "") do
      GenServer.start_link(__MODULE__, init, name: __MODULE__)
    end
  end

  def init(_opts) do
    {:ok, mhz19_pid} = GenServer.start_link(Sensors.Mhz19, {})
    {:ok, bme680_pid} = Bme680.start_link(i2c_address: 0x77)
    {:ok, %{mhz19_pid: mhz19_pid, bme680_pid: bme680_pid}}
  end

  def handle_call(:read, _from, state) do
    {:ok, co2} = Sensors.Mhz19.measure(state.mhz19_pid)
    bme680_reading = Bme680.measure(state.bme680_pid)
    {aqi_score, aqi_string} = Bme680Tools.get_aqi_score(state.bme680_pid)

    data = %{
      co2: co2,
      gas_resistance: bme680_reading.gas_resistance,
      humidity: bme680_reading.humidity,
      pressure: bme680_reading.pressure,
      temperature: bme680_reading.temperature,
      aqi_score: aqi_score,
      aqi_string: aqi_string
    }

    {:reply, data, state}
  end

  def measurement_to_string(measurement) do
    try do
      """
      二酸化炭素濃度: #{measurement.co2}(ppm)
      気温: #{measurement.temperature}(℃)
      湿度: #{measurement.humidity}(%)
      気圧: #{measurement.pressure}(hPa)
      気体抵抗: #{measurement.gas_resistance}(ohms)
      AQI: #{measurement.aqi_score}(#{measurement.aqi_string})
      """
    rescue
      _ -> "計測に失敗しています"
    end
  end

  def read_sensor_data() do
    case GenServer.call(__MODULE__, :read) do
      %{} = data -> data
      _ -> %{}
    end
  end
end

定期的に通知する

準備

ifttt で webhook をトリガーにするアクションを作る

また,IFTTT で適当に webhook を作っておき,config に api key と webhook の名前を入れておく.(本当は コミット対象にならないファイルなどに入れておいたほうがいいかもしれない)

config.exs
config :air_monitor,
  ifttt_api_key: "abcdefgh-HIJKLMNO_P",
  ifttt_webhook_name: "notify_line"

ライブラリを入れる

mix.exs
defp deps do [
	# ...
	{:httpoison, "~> 1.8"},
] 

コード

  • @alert_interval_min分ごとにセンサーを読んで,二酸化炭素濃度に応じて(@co2_threshold より高ければ) webhook をトリガーする.
  • :work の プロセスコールを受けてセンサーを読み,処理をする.
  • プロセスコールを受けるたびに Process.send_after/2 を使って次回の呼び出しを予約しておく.
air_monitor.ex
defmodule WarblerFirmware.AirMonitor do
  use GenServer

  alias WarblerFirmware.Sensors.Sensor

  @alert_interval_min 20
  @co2_threshold 800
  @ifttt_webhook_name Application.fetch_env!(:air_monitor, :ifttt_webhook_name)
  @ifttt_api_key Application.fetch_env!(:air_monitor, :ifttt_api_key)
  @line_notify_url "https://maker.ifttt.com/trigger/#{@ifttt_webhook_name}/with/key/#{
                     @ifttt_api_key
                   }"

  def start_link(state \\ %{}) do
    GenServer.start_link(__MODULE__, state, name: __MODULE__)
  end

  def init(state) do
    Kernel.send(__MODULE__, :work)
    {:ok, state}
  end

  def handle_info(:work, state) do
    schedule_work()
    work(state)
  end

  defp work(state) do
    measurement = Sensor.read_sensor_data()

    if(measurement.co2 > @co2_threshold) do
      ("CO2濃度が高くなっています\n" <> Sensor.measurement_to_string(measurement))
      |> push_notify_line()
    end

    {:noreply, state}
  end

  defp push_notify_line(message) do
    headers = [{"Content-type", "application/json"}]

    HTTPoison.get(
      @line_notify_url,
      headers,
      params: %{value1: message}
    )
  end

  defp schedule_work() do
    Process.send_after(self(), :work, @alert_interval_min * 60 * 1000)
  end
end

Nerves 起動時に立ち上がるようにする

AirMonitor が起動時に立ち上がるように,application.ex を編集しておく.

application.ex
def children(_target) do
    [
      {WarblerFirmware.AirMonitor, %{}}
    ]
  end

動作

あとは,mix upload すれば,こういうふうに通知が来るようになる.
なぜか初回の通知まで書き込んでからタイムラグが有るが,2分ほど待つと,(部屋のCO2濃度が高ければ) 初回の通知が来る.

(ちなみにこの画像は typo のせいで 気体抵抗の値が CO2 濃度になっている)

終わりに

  • BME680 から得られた計測値は表示するだけで,特に使われていないですが,トリガー条件に加えれば,色々できそうです.(気温があがったらエアコンを付けるなど)
  • 前回の記事が初めてでしたが,LGTMなど反応をいただけたので喜んでいます.
7
3
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?