Help us understand the problem. What is going on with this article?

Raspberry Pi 3 Model B+ で二酸化炭素濃度を測る(MH-Z14A) -AWS IoT もあるよ-

More than 1 year has passed since last update.

想定読者

  • センサー類の接続・制御などはやったことがない
  • クラウドサービスを触ったことがある
    • AWS のマネージドコンソールで設定などをしたことがあると良い
  • GNU Linux の CLI などを使ったことがある

上記のような人(私)が、調べた情報をもとに試行錯誤したログ

センサー周りで参考になりそうな記事

やりたいこと

  • センサーで居室の二酸化炭素濃度を測る
  • 測った結果をもとに何かをする
    • 濃度が閾値を超えていた時にアラームをあげる
    • 濃度の時系列での推移を眺めて、仕事の能率との相関を見てみたい

全体像

  • CO2 濃度センサー
  • Raspberry Pi
    • Python と多少のシェル
    • センサーの制御
    • ログの書き出し
    • Fluentd
    • 書き出されたログをバッファリングして、集約先へ出力
  • S3
    • センサーデータを保持する
  • AWS IoT
    • リアルタイムなセンサーデータを処理するハブとなる

クラウド

S3 にデータを溜め込むだけではリアルタイムに処理を行うことが難しい。
そこで AWS IoT を用いて様々な AWS サービスと連携するようにすることを目標にする。
ただし、それぞれのサービスの具体的な設定を逐一書いていくと大変なので、AWS の設定などは他の記事を適宜参照しつつ、問題になりそうな部分のフォローを書くことにする。

使うもの

物理

  • Raspberry Pi 3 Model B+
    • ネットワークへの接続などは済ませてあること
  • MH-Z14A(センサー)
  • ピンヘッダ接続ケーブル

電子

  • AWS アカウント
    • お金が発生します(fluentd の転送先に使わないという選択をすれば不要)

キーワード

  • GPIO
  • UART

MH-Z14A のスペックシート

抜粋

PIN 番号の図

PIN Description
Pad16 GND
Pad17 Vin Voltage Input
Pad18 UART (RXD) TTL Level input
Pad19 UART (TXD) TTL Level output

Raspberry Pi 3 で UART

  • Raspberry Pi 2 まではデフォルトで固定クロックで使えた
  • Raspberry Pi 3 からは Bluetooth モジュールが追加され、組み込みの UART はそっちに割り振られている
    • 選択肢は 2 つ
    • Bluethooth 無効化する設定をする
    • miniUart を使う設定をする

参考: Raspberrypi 3 でUART通信(コンソール&汎用) - Qiita

UART

  • デバイス間の接続をする際は RX/TX が互い違いになるように接続する
  • 接続する際はデバイスの電源を落としておく
  • Raspberry Pi の GPIO はデフォルトでプルアップ・プルダウン抵抗が接続されていてソフトウェアで切り替えが可能

配線

GPIO ピンアサインの公式ドキュメントはわかりづらいので https://pinout.xyz/ を見ると良い。

ここでは半田付けなどはせずにピンヘッダ接続ケーブルを輪ゴムで仮止めして使っている。

(この画像通りにやることは推奨しない)

配線

センサー値の取得

まずは、使用するデバイスの権限を調べて pi ユーザーからもアクセス可能にする。

$ whoami
pi
$ ls -la /dev/ttyS0
crw-rw---- 1 root dialout 4, 64 11月 29 10:42 /dev/ttyS0
$ sudo gpasswd -a pi dialout

次に、Python のスクリプトを使用してセンサーから値を取得・出力をする。

ここの言語はなんでも良いが初めてということもあり情報が多い言語を使うことにした。

ほとんどそのまま動きそうなリポジトリがあったのでそれを参考にしているが、どうもそれも何か別のものを元にしているのか不要なコードが含まれていたりした。

#!/usr/bin/env python3

import serial
import time
import datetime
import json
import argparse
import sys


class MHZ14A():
    PACKET = [0xff, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79]
    ZERO = [0xff, 0x01, 0x87, 0x00, 0x00, 0x00, 0x00, 0x00, 0x78]

    def __init__(self, ser):
        self.serial = serial.Serial(ser, 9600, timeout=1)
        time.sleep(2)

    def zero(self):
        self.serial.write(bytearray(MHZ14A.ZERO))

    def get(self):
        self.serial.write(bytearray(MHZ14A.PACKET))
        res = self.serial.read(size=9)
        res = bytearray(res)
        checksum = 0xff & (~(res[1] + res[2] + res[3] + res[4] + res[5] + res[6] + res[7]) + 1)
        if res[8] == checksum:
            return {
                "ppm": (res[2] << 8) | res[3],
                "dt": datetime.datetime.today().isoformat(),
                "ts": datetime.datetime.today().timestamp(),
            }
        else:
            raise Exception("checksum: " + hex(checksum))

    def close(self):
        self.serial.close()


def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode")
    args = parser.parse_args()
    mode = args.mode or "co2"

    co2 = MHZ14A("/dev/ttyS0")

    if mode == "zero":
        co2.zero()
    else:
        try:
            print(json.dumps(co2.get()))
        except:
            # 本当はエラーハンドリングをする
            pass
    co2.close()


if __name__ == '__main__':
    main()

上記のファイル相当のものを gist に用意してあるので必要であればそこからダウンロードできる。

$ MHZ14A_SCRIPT="https://gist.githubusercontent.com/watiko/b0a82cb401f288f51b39fcc75fe8ce2c/raw/9fe284932055133169450029ad7d1db4826e25ff/MHZ14A.py"
$ wget $MHZ14A_SCRIPT -O /usr/local/bin/mhz14a
$ chmod +x /usr/local/bin/mhz14a
$ mhz14a
{"ppm": 1358, "dt": "2018-11-29T10:43:33.446096", "ts": 1543488213.446343}

これでセンサーの値を JSON 形式で標準出力するコマンドが作成できたことになる。

手順的にはそこまででもないが、データシートを眺めてコマンドを発行、戻ってきた結果を適切に取り扱うコードを自分で書くのは大変だと思う。

センサー値の蓄積

作成したコマンドを使って、定期的にログファイルにセンサー値を取得するスクリプトを作ってみる。

(引数のパースなどをしているので大仰に見えるが loggermain 関数の中身さえ理解できれば良い)

#!/usr/bin/env bash

set -e -o pipefail

# assert
CMDNAME=`basename "$0"`

function usage() {
  echo "Usage: $CMDNAME -l /path/to/log.json -c /usr/local/bin/mhz14a -w 30" 1>&2
  exit 1
}

while getopts :l:c:w: OPT; do
  case "$OPT" in
    l) LOGFILE_PATH="$OPTARG"
      ;;
    c) COMMAND="$OPTARG"
      ;;
    w) WAIT="$OPTARG"
      ;;
    *) echo "unknown option: $OPT: $OPTARG"
       usage
      ;;
  esac
done

if [[ "$LOGFILE_PATH" = "" ]]; then
  echo "-l option required"
  usage
fi

if [[ "$COMMAND" = "" ]]; then
  echo "-c option required"
  usage
fi

if [[ "$WAIT" = "" ]]; then
  WAIT=30
fi

echo "logfile: $LOGFILE_PATH"
echo "command: $COMMAND"
echo "wait seconds: $WAIT"

# function
function logger() {
  "$COMMAND" | tee --append "$LOGFILE_PATH"
}

function main() {
  while :
  do
    logger && sleep "$WAIT"
  done
}

main

無限ループの中で loggersleep による待機を繰り返しており、 logger の中では渡されたコマンドを実行し、その結果を tee で標準出力とログファイル両方に書き出している。

このファイルを /usr/local/bin/logging などに配置し以下のようにして使う。

$ chmod +x /usr/local/bin/logging
$ /usr/local/bin/logging -l /tmp/log.json -c /usr/local/bin/mhz14a -w 1
logfile: /tmp/log.json
command: /usr/local/bin/mhz14a
wait seconds: 30
{"ts": 1543490833.990155, "ppm": 1408, "dt": "2018-11-29T11:27:13.989904"}
{"ts": 1543490837.274029, "ppm": 1408, "dt": "2018-11-29T11:27:17.273774"}
Ctrl-C
$ cat /tmp/log.json
{"ts": 1543490833.990155, "ppm": 1408, "dt": "2018-11-29T11:27:13.989904"}
{"ts": 1543490837.274029, "ppm": 1408, "dt": "2018-11-29T11:27:17.273774"}

これでファイルにセンサーの値を追記していく仕組みができた。

残りは以下のような作業が必要になる。

  • ログファイルの値をどこかのサーバーへ集約する仕組みを作る(fluentd)
  • デバイスの再起動などで止まらないようにデーモンにする(systemd)
  • ログファイルを定期的に退避する仕組み(logrotate)

各種ファイルの配置先を作成する

あらかじめログファイルをおく場所や設定ファイルをおく場所を作っておく。

$ sudo mkdir /var/log/co2            && sudo chown pi:pi -R $_
$ sudo mkdir /var/log/fluent         && sudo chown pi:pi -R $_
$ sudo mkdir /var/log/fluent/s3      && sudo chown pi:pi -R $_
$ sudo mkdir /var/log/fluent/aws-iot && sudo chown pi:pi -R $_
$ sudo mkdir /etc/fluent             && sudo chown pi:pi -R $_

fluentd でログを収集・集約する

https://www.fluentd.org/

fluentd とは

簡単に説明すると、ファイルなどに書き出したログを別の場所に転送してくれるツール。
ログのソースや、転送先はプラグイン(Ruby gem)を追加することで増やすことができる。

導入

溜め込んだログは全て S3 に転送し、リアルタイムにアラートを上げるために AWS IoT の MQTT ブローカにも転送する。

ところが、後者の MQTT プロトコルに対応するプラグインが Raspbian 標準の Ruby ではバージョンの問題で動かないので rbenv を導入して動かすことにする。

$ sudo apt-get install ruby-dev libssl-dev
$ ruby --version

rbenv

この記事を参考に rbenv をシステムワイドにインストールして対処した。

この方法以外でもシステムで使えるように Ruby 2.5.x がインストールできれば良い。
(バージョンも作業中に最新のバージョンでもおそらく問題がない)

http://d.hatena.ne.jp/komiyak/20150603/1433364444

fluentd そのものとプラグインの導入

$ sudo apt-get install libssl-dev
$ sudo gem install fluentd fluent-plugin-s3 fluent-plugin-mqtt-io 

fluentd の設定

基本的に各サービスで初期設定(バケットの作成など)を行なってから fluentd の設定をすることになる。
もし、AWS IoT への設定などを後回しにするのであればコメントアウトするなりすれば良い。

# ログファイルの末尾に追加された行を拾う
<source>
  @type tail
  path     /var/log/co2/co2.log
  pos_file /var/log/fluent/co2.log.pos
  tag raspberry.co2
  format json
</source>

# fluentd は source にタグをつけて最初に match した設定を使うので、
# 複数の転送先を設定する際は copy プラグイン(組み込み)を使う必要がある。
<match raspberry.*>
  @type copy

  # AWS IoT
  <store>
    @type mqtt
    host 'XXXXXXX.iot.ap-northeast-1.amazonaws.com'
    port '8883'
    # 変えてね
    topic '${THING_TYPE}/${THING_NAME}'
    qos 0
    <security>
      use_tls true
      <tls>
        # ルート証明書は少しわかりにくいので後述
        ca_file   /etc/fluentd/aws-iot/rootCA.pem
        key_file  /etc/fluentd/aws-iot/a92b653a8e-private.pem.key
        cert_file /etc/fluentd/aws-iot/a92b653a8e-certificate.pem.crt
      </tls>
    </security>
    <format>
      @type json
    </format>
  </store>

  # Amazon S3
  <store>
    @type s3

    aws_key_id           YOUR_AWS_KEY_ID
    aws_sec_key          YOUR_AWS_SECRET_KEY
    s3_bucket            YOUR_S3_BUCKET_NAME
    s3_region            ap-northeast-1
    path                 logs/${tag}/%Y/%m/%d/
    s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}

    <buffer tag,time>
      @type file
      path /var/log/fluent/s3
      timekey 3600 # 1 hour partition
      timekey_wait 10m
      timekey_use_utc true # use utc
    </buffer>
    <format>
      @type json
    </format>
  </store>
</match>
AWS IoT

色々と設定しなければならずこの記事で書くには複雑な気もする。
ひとまず設定しないといけない項目などに付いてリストを上げる。

項目リスト
  • 管理
    • ログの設定: CloudWatch logs に AWS IoT でのエラーなどを書き込む設定
    • モノ(Thing)の作成: デバイスと一対一で紐付ける(今回は二酸化炭素センサーとモノを紐付ける)
    • タイプ: モノの分類?よくわかっていないが作る
  • 安全性
    • 証明書: デバイスに設置して認証を行う。認可は後述のどのポリシーが紐づいているか次第
    • ポリシー: AWS IoT のリソースなどへの権限を管理している。わかりにくい 実験段階ではなんでもできるポリシーを割り当てるのもアリかもしれない
  • Act
    • ルール: これを設定しないとどうにもならないが、なくても MQTT へのパブリッシュはできるので後述
ポリシー

ポリシーは以下のような感じのもの(YOUR_ACCOUNT_ID は差し替え)を用意すると ${モノのタイプ}/${モノの名前} という MQTT のトピックにメッセージを投げることができるので便利

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": "iot:Connect",
      "Resource": [
        "*"
      ]
    },
    {
      "Effect": "Allow",
      "Action": "iot:Publish",
      "Resource": [
        "arn:aws:iot:ap-northeast-1:YOUR_ACCOUNT_ID:topic/${iot:Connection.Thing.ThingTypeName}/${iot:Connection.Thing.ThingName}"
      ]
    }
  ]
}
ルート証明書

ドキュメント

これらの証明書をすべてあなたのデバイスに保存すると、貴重なメモリスペースを占有する可能性があります。デバイスが RSA ベースの検証を実装している場合は、Amazon ルート CA 3およびAmazon ルート CA 4 ECC 証明書を省略できます。デバイスが ECC ベースの証明書検証を実装している場合は、Amazon ルート CA 1およびAmazon ルート CA 2 RSA 証明書を省略できます。

とのことですが、マイコンなどでもない限りメモリ使用量を気にする必要はないという建前で実装がどちらになっているかを確認するのが億劫だったので全ての証明書を使うことにした。

$ wget https://www.amazontrust.com/repository/AmazonRootCA{1,2,3,4}.pem
$ cat AmazonRootCA* > rootCA.pem
Amazon S3

バケットを用意し、そのバケットに権限のある IAM ユーザを作成し、 API のアクセスキーを発行すれば良い。

fluentd を試しに動かしてみる

$ fluentd -c /etc/fluent/fluent.conf -v -o /var/log/fluent/fluent.log

これで /usr/local/bin/logging/usr/local/bin/mhz14a それから fluentd を組み合わせればセンサーの値が S3, AWS IoT に転送できるようになった。

しかし、再起動すると全て立ち上げ直しになったりなど不便な点が多い。次はこれらが自動で行われるようサービス化する。

systemd でサービス化

https://wiki.archlinux.jp/index.php/Systemd

サービスの追加

サービスとは何かという話をできるほど詳しくないので早速設定を流し込んでいく。

$ sudo tee /etc/systemd/system/co2.service <<EOF
[Unit]
Description=co2 logger

[Service]
ExecStart=/usr/local/bin/logging -l /var/log/co2/co2.log -c /usr/local/bin/mhz14a -w 30
Restart=always

[Install]
WantedBy=multi-user.target
EOF

$ sudo tee /etc/systemd/system/fluent.service <<EOF
[Unit]
Description=co2 logger

[Service]
ExecStart=/usr/local/bin/fluentd -c /etc/fluent/fluent.conf -v -o /var/log/fluent/fluent.log
#User=pi
#Group=pi
Restart=always

[Install]
WantedBy=multi-user.target
EOF

$ sudo tee /etc/systemd/system/fluent-restart.service <<EOF
[Unit]
Description=restart fluent

[Service]
Type=oneshot
ExecStart=/bin/systemctl try-restart fluent.service

[Install]
WantedBy=multi-user.target
EOF

$ sudo tee /etc/systemd/system/fluent-restart.timer <<EOF
[Unit]
Description=restart fluent

[Timer]
OnCalendar=hourly
Persistent=true

[Install]
WantedBy=timers.target
EOF

これで以下のサービス・タイマーのファイルが作成される。

  • co2.service: 二酸化炭素濃度を計測しログファイルに書き込む
  • fluent.service: 書き込まれた情報を転送する
  • fluent-restart.service: fluent.service を再起動する
  • fluent-restart.timer: 一時間ごとに fluent-restart.service を起動する

余談

なぜ restart 用のサービスを用意したかというと、実際に fluentd を走らせていると不可解なエラーが発生して、 mqtt へのパブリッシュがうまくいかなくなるから。(暫定的な対応)

2018-11-29 05:06:22 +0000 [debug]: #0 fluent/log.rb:302:debug: publish_event_stream: Fluent::MultiEventStream
2018-11-29 05:06:22 +0000 [debug]: #0 fluent/log.rb:302:debug: MqttOutput::block (2 levels) in publish_event_stream: raspberry/co2, 1543467982, {"dt"=>"2018-11-29T05:06:22.783533", "ppm"=>878}
2018-11-29 05:06:22 +0000 [error]: #0 fluent/log.rb:362:error: System call error occurs.,Errno::EPIPE,Broken pipe
2018-11-29 05:06:22 +0000 [error]: #0 fluent/log.rb:362:error: Retry in 1 sec
2018-11-29 05:06:22 +0000 [warn]: #0 fluent/log.rb:342:warn: emit transaction failed: error_class=Errno::EPIPE error="Broken pipe" location="/usr/local/.rbenv/versions/2.5.3/lib/ruby/2.5.0/openssl/buffering.rb:325:in syswrite tag="raspberry.co2"
2018-11-29 05:06:22 +0000 [warn]: #0 plugin/in_tail.rb:397:receive_lines: suppressed same stacktrace
2018-11-29 05:06:23 +0000 [debug]: #0 fluent/log.rb:302:debug: connected to mqtt broker XXX.iot.ap-northeast-1.amazonaws.com:8883 for out_mqtt

サービスの有効化・開始

ファイルを作成しただけだとまだダメで、 systemd の管理するファイルにシンボリックリンクを作成するコマンドを発行した後に、 service を有効化する必要がある。

$ sudo systemctl daemon-reload
$ sudo systemctl enable co2.service
$ sudo systemctl enable fluent.service
$ sudo systemctl enable fluent-restart.timer
$ sudo systemctl start  co2.service
$ sudo systemctl start  fluent.service

ここまでやると再起動してもログが書き出されるし、転送もされるようになった。

ところが、ログは書き出される一方で誰も削除などを行わないのでこれに対処する。

ログをローテションさせる

センサーの出力などを書き続けていくといつかはファイルシステムの空き容量を食いつぶすことになる。
そこで、 logrotate というツールを使用する。

https://wiki.archlinux.jp/index.php/Logrotate

logrotate とは

一定の条件に従ってログファイルの世代交代を行う。
また保持する最大の世代数を設定することで必要以上にファイルシステムの空き容量を使用することを避けることがでる。

logrotate の設定例

$ sudo tee /etc/logrotate.d/co2 <<EOF
/var/log/co2/co2.log {
  missingok
  rotate 10
  dateext
  compress
  daily
  minsize 10M
}
EOF

$ sudo tee /etc/logrotate.d/fluent <<EOF
/var/log/fluent/fluent.log {
  missingok
  rotate 5
  dateext
  compress
  daily
  minsize 10M
}
EOF
$ logrotate -dv /etc/logrotate.d/co2
$ logrotate -dv /etc/logrotate.d/fluent

これで設定ができた。もし動いていないと感じるようであれば設定を見直すなり sudo logrotate -fv /etc/logrotate.d/co2 などを試してみる。

データの活用

ここまででデータは S3 に蓄積されているし、 AWS IoT の MQTT ブローカーまでメッセージが届いている状態になっている。

ここからはそのデータを活用する方法について触れていく。

S3 に貯めたデータを Python を用いてプロットする

まずは 2018/11/28 日のデータを一つの JSON にするところからやっていく。

awscli の profile は適宜読み替えてほしい。

$ YOUR_S3_BUCKET_NAME=XXXXX
$ YOUR_FLUENTD_SOURCE_TAG_NAME=YYYYY
$ aws --profile iot s3 cp --recursive s3://${YOUR_S3_BUCKET_NAME}/logs/${YOUR_FLUENTD_SOURCE_TAG_NAME}/2018/11/28 logs/11/28
$ find logs/11/29 -name '*.gz' -print0 | xargs -0 -I{} gunzip {} -c > co2.log.json

上記のコマンドで作成したデータ(co2.log.json)を Python のスクリプトでプロットする。(Python には詳しくありません)

import pandas as pd
from matplotlib import pyplot

from datetime import datetime, timezone, timedelta

with open('./co2.log.json') as co2_log:
    df = pd.read_json(co2_log, lines=True)
    df['dt'] = pd.to_datetime(df['dt'])
    df.set_index('dt', inplace=True)
    df.index = df.index.tz_localize('UTC').tz_convert('Asia/Tokyo')


    # 絞り込みはこんな感じ
    # JST = timezone(timedelta(hours=+9), 'JST')
    # df = df[df.index > datetime(2018, 11, 27, 19, 30, 0, 0, tzinfo=JST)]

    series = pd.Series(df['ppm'], name='co2 ppm', index=df.index)

    series.plot()

    pyplot.show()

元データが違うがプロット結果はこんな感じに表示される。(値は気にしないでいただけると幸いです)

プロット結果

AWS IoT に送ったデータを処理する

AWS IoT に送ったデータはルールとそれを処理するアクションを設定することで使用可能になる。

  • ルール: MQTT トピックの絞り込みと、データの簡単な加工
  • アクション: ルールに対して複数紐付けが可能で、ルールで取得したデータを処理する方法を設定する

今回は次のサービスの組み合わせで Slack にアラームを投げることを目標とする。

  • AWS IoT Rules: CloudWatch Metrics でカスタムメトリクスとして二酸化炭素濃度を追加する
  • CloudWatch Alerm: 二酸化炭素濃度が基準値を超えているかを判断し、 Amazon SNS へ通知
  • Amazon SNS: トピックを作成
  • AWS Lambda: CloudWatch Alerm のアラームが上がると Slack へ投稿する funtion の作成、上記トピックの購読

これはとても設定範囲が大きいので重要なところを掻い摘んで記載する。

AWS IoT Rules

クエリを以下のように設定すると { "ppm": 999 } といった形の JSON になる。

SELECT ppm FROM '${THING_TYPE}/${THING_NAME}'

アクションは CloudWatch Metrics のものを追加すればよく、単位と値にさえ気をつければ問題はない。

- 単位: ドキュメントに列挙されているものを選ぶ必要がある。(今回は None を選ぶべきか)

- 値: $ppm と指定すれば良い。

AWS Lambda

良い記事があるのでそちらを参照すること。

https://qiita.com/hf7777hi/items/e0f43f0fb7e2effa0af8
https://qiita.com/TanakanoAnchan/items/dae4edecfd0d85ea2542

終わりに

駆け足で様々な要素を組み合わせて二酸化炭素濃度を測る仕組みを眺めてきた。
この記事で紹介した方法以外にも同じようなことを実現する方法はあるはずで、各々がやりやすい方法を模索してほしい。

Why do not you register as a user and use Qiita more conveniently?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away