想定読者
- センサー類の接続・制御などはやったことがない
- クラウドサービスを触ったことがある
- AWS のマネージドコンソールで設定などをしたことがあると良い
- GNU Linux の CLI などを使ったことがある
上記のような人(私)が、調べた情報をもとに試行錯誤したログ
センサー周りで参考になりそうな記事
- https://iot.firstfournotes.com/monitor/co2/
- http://takashiski.hatenablog.com/entry/2017/06/04/131737
- https://qiita.com/UedaTakeyuki/items/c5226960a7328155635f
- https://qiita.com/revsystem/items/5a362e749ef80358e801
- https://pc.watch.impress.co.jp/docs/column/nishikawa/1006048.html
- https://tool-lab.com/make/raspberrypi-startup-22/
やりたいこと
- センサーで居室の二酸化炭素濃度を測る
- 測った結果をもとに何かをする
- 濃度が閾値を超えていた時にアラームをあげる
- 濃度の時系列での推移を眺めて、仕事の能率との相関を見てみたい
全体像
- CO2 濃度センサー
- Raspberry Pi
- Python と多少のシェル
- センサーの制御
- ログの書き出し
- Fluentd
- 書き出されたログをバッファリングして、集約先へ出力
- Python と多少のシェル
- S3
- センサーデータを保持する
- AWS IoT
- リアルタイムなセンサーデータを処理するハブとなる
クラウド
S3 にデータを溜め込むだけではリアルタイムに処理を行うことが難しい。
そこで AWS IoT を用いて様々な AWS サービスと連携するようにすることを目標にする。
ただし、それぞれのサービスの具体的な設定を逐一書いていくと大変なので、AWS の設定などは他の記事を適宜参照しつつ、問題になりそうな部分のフォローを書くことにする。
使うもの
物理
- Raspberry Pi 3 Model B+
- ネットワークへの接続などは済ませてあること
- MH-Z14A(センサー)
- ピンヘッダ接続ケーブル
電子
- AWS アカウント
- お金が発生します(fluentd の転送先に使わないという選択をすれば不要)
キーワード
- GPIO
- UART
MH-Z14A のスペックシート
-
https://www.winsen-sensor.com/d/files/infrared-gas-sensor/mh-z14a_co2-manual-v1_01.pdf
- なぜかセンサーへ送信すべきコマンド(バイナリ列)が記載されていないので下記の前身モデル(?)を参考にする
- https://www.winsen-sensor.com/d/files/PDF/Infrared%20Gas%20Sensor/NDIR%20CO2%20SENSOR/MH-Z14%20CO2%20V2.4.pdf
抜粋
PIN | Description |
---|---|
Pad16 | GND |
Pad17 | Vin Voltage Input |
Pad18 | UART (RXD) TTL Level input |
Pad19 | UART (TXD) TTL Level output |
Raspberry Pi 3 で UART
- Raspberry Pi 2 まではデフォルトで固定クロックで使えた
- Raspberry Pi 3 からは Bluetooth モジュールが追加され、組み込みの UART はそっちに割り振られている
- 選択肢は 2 つ
- Bluethooth 無効化する設定をする
- miniUart を使う設定をする
- 選択肢は 2 つ
参考: Raspberrypi 3 でUART通信(コンソール&汎用) - Qiita
UART
- デバイス間の接続をする際は RX/TX が互い違いになるように接続する
- 接続する際はデバイスの電源を落としておく
- Raspberry Pi の GPIO はデフォルトでプルアップ・プルダウン抵抗が接続されていてソフトウェアで切り替えが可能
配線
GPIO ピンアサインの公式ドキュメントはわかりづらいので https://pinout.xyz/ を見ると良い。
ここでは半田付けなどはせずにピンヘッダ接続ケーブルを輪ゴムで仮止めして使っている。
(この画像通りにやることは推奨しない)
センサー値の取得
まずは、使用するデバイスの権限を調べて pi ユーザーからもアクセス可能にする。
$ whoami
pi
$ ls -la /dev/ttyS0
crw-rw---- 1 root dialout 4, 64 11月 29 10:42 /dev/ttyS0
$ sudo gpasswd -a pi dialout
次に、Python のスクリプトを使用してセンサーから値を取得・出力をする。
ここの言語はなんでも良いが初めてということもあり情報が多い言語を使うことにした。
ほとんどそのまま動きそうなリポジトリがあったのでそれを参考にしているが、どうもそれも何か別のものを元にしているのか不要なコードが含まれていたりした。
#!/usr/bin/env python3
import serial
import time
import datetime
import json
import argparse
import sys
class MHZ14A():
PACKET = [0xff, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79]
ZERO = [0xff, 0x01, 0x87, 0x00, 0x00, 0x00, 0x00, 0x00, 0x78]
def __init__(self, ser):
self.serial = serial.Serial(ser, 9600, timeout=1)
time.sleep(2)
def zero(self):
self.serial.write(bytearray(MHZ14A.ZERO))
def get(self):
self.serial.write(bytearray(MHZ14A.PACKET))
res = self.serial.read(size=9)
res = bytearray(res)
checksum = 0xff & (~(res[1] + res[2] + res[3] + res[4] + res[5] + res[6] + res[7]) + 1)
if res[8] == checksum:
return {
"ppm": (res[2] << 8) | res[3],
"dt": datetime.datetime.today().isoformat(),
"ts": datetime.datetime.today().timestamp(),
}
else:
raise Exception("checksum: " + hex(checksum))
def close(self):
self.serial.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mode")
args = parser.parse_args()
mode = args.mode or "co2"
co2 = MHZ14A("/dev/ttyS0")
if mode == "zero":
co2.zero()
else:
try:
print(json.dumps(co2.get()))
except:
# 本当はエラーハンドリングをする
pass
co2.close()
if __name__ == '__main__':
main()
上記のファイル相当のものを gist に用意してあるので必要であればそこからダウンロードできる。
$ MHZ14A_SCRIPT="https://gist.githubusercontent.com/watiko/b0a82cb401f288f51b39fcc75fe8ce2c/raw/9fe284932055133169450029ad7d1db4826e25ff/MHZ14A.py"
$ wget $MHZ14A_SCRIPT -O /usr/local/bin/mhz14a
$ chmod +x /usr/local/bin/mhz14a
$ mhz14a
{"ppm": 1358, "dt": "2018-11-29T10:43:33.446096", "ts": 1543488213.446343}
これでセンサーの値を JSON 形式で標準出力するコマンドが作成できたことになる。
手順的にはそこまででもないが、データシートを眺めてコマンドを発行、戻ってきた結果を適切に取り扱うコードを自分で書くのは大変だと思う。
センサー値の蓄積
作成したコマンドを使って、定期的にログファイルにセンサー値を取得するスクリプトを作ってみる。
(引数のパースなどをしているので大仰に見えるが logger
と main
関数の中身さえ理解できれば良い)
#!/usr/bin/env bash
set -e -o pipefail
# assert
CMDNAME=`basename "$0"`
function usage() {
echo "Usage: $CMDNAME -l /path/to/log.json -c /usr/local/bin/mhz14a -w 30" 1>&2
exit 1
}
while getopts :l:c:w: OPT; do
case "$OPT" in
l) LOGFILE_PATH="$OPTARG"
;;
c) COMMAND="$OPTARG"
;;
w) WAIT="$OPTARG"
;;
*) echo "unknown option: $OPT: $OPTARG"
usage
;;
esac
done
if [[ "$LOGFILE_PATH" = "" ]]; then
echo "-l option required"
usage
fi
if [[ "$COMMAND" = "" ]]; then
echo "-c option required"
usage
fi
if [[ "$WAIT" = "" ]]; then
WAIT=30
fi
echo "logfile: $LOGFILE_PATH"
echo "command: $COMMAND"
echo "wait seconds: $WAIT"
# function
function logger() {
"$COMMAND" | tee --append "$LOGFILE_PATH"
}
function main() {
while :
do
logger && sleep "$WAIT"
done
}
main
無限ループの中で logger
と sleep
による待機を繰り返しており、 logger
の中では渡されたコマンドを実行し、その結果を tee
で標準出力とログファイル両方に書き出している。
このファイルを /usr/local/bin/logging
などに配置し以下のようにして使う。
$ chmod +x /usr/local/bin/logging
$ /usr/local/bin/logging -l /tmp/log.json -c /usr/local/bin/mhz14a -w 1
logfile: /tmp/log.json
command: /usr/local/bin/mhz14a
wait seconds: 30
{"ts": 1543490833.990155, "ppm": 1408, "dt": "2018-11-29T11:27:13.989904"}
{"ts": 1543490837.274029, "ppm": 1408, "dt": "2018-11-29T11:27:17.273774"}
Ctrl-C
$ cat /tmp/log.json
{"ts": 1543490833.990155, "ppm": 1408, "dt": "2018-11-29T11:27:13.989904"}
{"ts": 1543490837.274029, "ppm": 1408, "dt": "2018-11-29T11:27:17.273774"}
これでファイルにセンサーの値を追記していく仕組みができた。
残りは以下のような作業が必要になる。
- ログファイルの値をどこかのサーバーへ集約する仕組みを作る(fluentd)
- デバイスの再起動などで止まらないようにデーモンにする(systemd)
- ログファイルを定期的に退避する仕組み(logrotate)
各種ファイルの配置先を作成する
あらかじめログファイルをおく場所や設定ファイルをおく場所を作っておく。
$ sudo mkdir /var/log/co2 && sudo chown pi:pi -R $_
$ sudo mkdir /var/log/fluent && sudo chown pi:pi -R $_
$ sudo mkdir /var/log/fluent/s3 && sudo chown pi:pi -R $_
$ sudo mkdir /var/log/fluent/aws-iot && sudo chown pi:pi -R $_
$ sudo mkdir /etc/fluent && sudo chown pi:pi -R $_
fluentd でログを収集・集約する
fluentd とは
簡単に説明すると、ファイルなどに書き出したログを別の場所に転送してくれるツール。
ログのソースや、転送先はプラグイン(Ruby gem)を追加することで増やすことができる。
導入
溜め込んだログは全て S3 に転送し、リアルタイムにアラートを上げるために AWS IoT の MQTT ブローカにも転送する。
ところが、後者の MQTT プロトコルに対応するプラグインが Raspbian 標準の Ruby ではバージョンの問題で動かないので rbenv を導入して動かすことにする。
$ sudo apt-get install ruby-dev libssl-dev
$ ruby --version
rbenv
この記事を参考に rbenv をシステムワイドにインストールして対処した。
この方法以外でもシステムで使えるように Ruby 2.5.x がインストールできれば良い。
(バージョンも作業中に最新のバージョンでもおそらく問題がない)
fluentd そのものとプラグインの導入
$ sudo apt-get install libssl-dev
$ sudo gem install fluentd fluent-plugin-s3 fluent-plugin-mqtt-io
fluentd の設定
基本的に各サービスで初期設定(バケットの作成など)を行なってから fluentd の設定をすることになる。
もし、AWS IoT への設定などを後回しにするのであればコメントアウトするなりすれば良い。
# ログファイルの末尾に追加された行を拾う
<source>
@type tail
path /var/log/co2/co2.log
pos_file /var/log/fluent/co2.log.pos
tag raspberry.co2
format json
</source>
# fluentd は source にタグをつけて最初に match した設定を使うので、
# 複数の転送先を設定する際は copy プラグイン(組み込み)を使う必要がある。
<match raspberry.*>
@type copy
# AWS IoT
<store>
@type mqtt
host 'XXXXXXX.iot.ap-northeast-1.amazonaws.com'
port '8883'
# 変えてね
topic '${THING_TYPE}/${THING_NAME}'
qos 0
<security>
use_tls true
<tls>
# ルート証明書は少しわかりにくいので後述
ca_file /etc/fluentd/aws-iot/rootCA.pem
key_file /etc/fluentd/aws-iot/a92b653a8e-private.pem.key
cert_file /etc/fluentd/aws-iot/a92b653a8e-certificate.pem.crt
</tls>
</security>
<format>
@type json
</format>
</store>
# Amazon S3
<store>
@type s3
aws_key_id YOUR_AWS_KEY_ID
aws_sec_key YOUR_AWS_SECRET_KEY
s3_bucket YOUR_S3_BUCKET_NAME
s3_region ap-northeast-1
path logs/${tag}/%Y/%m/%d/
s3_object_key_format %{path}%{time_slice}_%{index}.%{file_extension}
<buffer tag,time>
@type file
path /var/log/fluent/s3
timekey 3600 # 1 hour partition
timekey_wait 10m
timekey_use_utc true # use utc
</buffer>
<format>
@type json
</format>
</store>
</match>
AWS IoT
色々と設定しなければならずこの記事で書くには複雑な気もする。
ひとまず設定しないといけない項目などに付いてリストを上げる。
項目リスト
- 管理
- ログの設定: CloudWatch logs に AWS IoT でのエラーなどを書き込む設定
- モノ(Thing)の作成: デバイスと一対一で紐付ける(今回は二酸化炭素センサーとモノを紐付ける)
- タイプ: モノの分類?よくわかっていないが作る
- 安全性
- 証明書: デバイスに設置して認証を行う。認可は後述のどのポリシーが紐づいているか次第
- ポリシー: AWS IoT のリソースなどへの権限を管理している。わかりにくい
実験段階ではなんでもできるポリシーを割り当てるのもアリかもしれない
- Act
- ルール: これを設定しないとどうにもならないが、なくても MQTT へのパブリッシュはできるので後述
ポリシー
ポリシーは以下のような感じのもの(YOUR_ACCOUNT_ID
は差し替え)を用意すると ${モノのタイプ}/${モノの名前}
という MQTT のトピックにメッセージを投げることができるので便利
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": "iot:Connect",
"Resource": [
"*"
]
},
{
"Effect": "Allow",
"Action": "iot:Publish",
"Resource": [
"arn:aws:iot:ap-northeast-1:YOUR_ACCOUNT_ID:topic/${iot:Connection.Thing.ThingTypeName}/${iot:Connection.Thing.ThingName}"
]
}
]
}
ルート証明書
これらの証明書をすべてあなたのデバイスに保存すると、貴重なメモリスペースを占有する可能性があります。デバイスが RSA ベースの検証を実装している場合は、Amazon ルート CA 3およびAmazon ルート CA 4 ECC 証明書を省略できます。デバイスが ECC ベースの証明書検証を実装している場合は、Amazon ルート CA 1およびAmazon ルート CA 2 RSA 証明書を省略できます。
とのことですが、マイコンなどでもない限りメモリ使用量を気にする必要はないという建前で実装がどちらになっているかを確認するのが億劫だったので全ての証明書を使うことにした。
$ wget https://www.amazontrust.com/repository/AmazonRootCA{1,2,3,4}.pem
$ cat AmazonRootCA* > rootCA.pem
Amazon S3
バケットを用意し、そのバケットに権限のある IAM ユーザを作成し、 API のアクセスキーを発行すれば良い。
fluentd を試しに動かしてみる
$ fluentd -c /etc/fluent/fluent.conf -v -o /var/log/fluent/fluent.log
これで /usr/local/bin/logging
と /usr/local/bin/mhz14a
それから fluentd
を組み合わせればセンサーの値が S3, AWS IoT に転送できるようになった。
しかし、再起動すると全て立ち上げ直しになったりなど不便な点が多い。次はこれらが自動で行われるようサービス化する。
systemd でサービス化
サービスの追加
サービスとは何かという話をできるほど詳しくないので早速設定を流し込んでいく。
$ sudo tee /etc/systemd/system/co2.service <<EOF
[Unit]
Description=co2 logger
[Service]
ExecStart=/usr/local/bin/logging -l /var/log/co2/co2.log -c /usr/local/bin/mhz14a -w 30
Restart=always
[Install]
WantedBy=multi-user.target
EOF
$ sudo tee /etc/systemd/system/fluent.service <<EOF
[Unit]
Description=co2 logger
[Service]
ExecStart=/usr/local/bin/fluentd -c /etc/fluent/fluent.conf -v -o /var/log/fluent/fluent.log
#User=pi
#Group=pi
Restart=always
[Install]
WantedBy=multi-user.target
EOF
$ sudo tee /etc/systemd/system/fluent-restart.service <<EOF
[Unit]
Description=restart fluent
[Service]
Type=oneshot
ExecStart=/bin/systemctl try-restart fluent.service
[Install]
WantedBy=multi-user.target
EOF
$ sudo tee /etc/systemd/system/fluent-restart.timer <<EOF
[Unit]
Description=restart fluent
[Timer]
OnCalendar=hourly
Persistent=true
[Install]
WantedBy=timers.target
EOF
これで以下のサービス・タイマーのファイルが作成される。
-
co2.service
: 二酸化炭素濃度を計測しログファイルに書き込む -
fluent.service
: 書き込まれた情報を転送する -
fluent-restart.service
:fluent.service
を再起動する -
fluent-restart.timer
: 一時間ごとにfluent-restart.service
を起動する
余談
なぜ restart 用のサービスを用意したかというと、実際に fluentd を走らせていると不可解なエラーが発生して、 mqtt へのパブリッシュがうまくいかなくなるから。(暫定的な対応)
2018-11-29 05:06:22 +0000 [debug]: #0 fluent/log.rb:302:debug: publish_event_stream: Fluent::MultiEventStream
2018-11-29 05:06:22 +0000 [debug]: #0 fluent/log.rb:302:debug: MqttOutput::block (2 levels) in publish_event_stream: raspberry/co2, 1543467982, {"dt"=>"2018-11-29T05:06:22.783533", "ppm"=>878}
2018-11-29 05:06:22 +0000 [error]: #0 fluent/log.rb:362:error: System call error occurs.,Errno::EPIPE,Broken pipe
2018-11-29 05:06:22 +0000 [error]: #0 fluent/log.rb:362:error: Retry in 1 sec
2018-11-29 05:06:22 +0000 [warn]: #0 fluent/log.rb:342:warn: emit transaction failed: error_class=Errno::EPIPE error="Broken pipe" location="/usr/local/.rbenv/versions/2.5.3/lib/ruby/2.5.0/openssl/buffering.rb:325:in syswrite tag="raspberry.co2"
2018-11-29 05:06:22 +0000 [warn]: #0 plugin/in_tail.rb:397:receive_lines: suppressed same stacktrace
2018-11-29 05:06:23 +0000 [debug]: #0 fluent/log.rb:302:debug: connected to mqtt broker XXX.iot.ap-northeast-1.amazonaws.com:8883 for out_mqtt
サービスの有効化・開始
ファイルを作成しただけだとまだダメで、 systemd の管理するファイルにシンボリックリンクを作成するコマンドを発行した後に、 service を有効化する必要がある。
$ sudo systemctl daemon-reload
$ sudo systemctl enable co2.service
$ sudo systemctl enable fluent.service
$ sudo systemctl enable fluent-restart.timer
$ sudo systemctl start co2.service
$ sudo systemctl start fluent.service
ここまでやると再起動してもログが書き出されるし、転送もされるようになった。
ところが、ログは書き出される一方で誰も削除などを行わないのでこれに対処する。
ログをローテションさせる
センサーの出力などを書き続けていくといつかはファイルシステムの空き容量を食いつぶすことになる。
そこで、 logrotate というツールを使用する。
logrotate とは
一定の条件に従ってログファイルの世代交代を行う。
また保持する最大の世代数を設定することで必要以上にファイルシステムの空き容量を使用することを避けることがでる。
logrotate の設定例
$ sudo tee /etc/logrotate.d/co2 <<EOF
/var/log/co2/co2.log {
missingok
rotate 10
dateext
compress
daily
minsize 10M
}
EOF
$ sudo tee /etc/logrotate.d/fluent <<EOF
/var/log/fluent/fluent.log {
missingok
rotate 5
dateext
compress
daily
minsize 10M
}
EOF
$ logrotate -dv /etc/logrotate.d/co2
$ logrotate -dv /etc/logrotate.d/fluent
これで設定ができた。もし動いていないと感じるようであれば設定を見直すなり sudo logrotate -fv /etc/logrotate.d/co2
などを試してみる。
データの活用
ここまででデータは S3 に蓄積されているし、 AWS IoT の MQTT ブローカーまでメッセージが届いている状態になっている。
ここからはそのデータを活用する方法について触れていく。
S3 に貯めたデータを Python を用いてプロットする
まずは 2018/11/28
日のデータを一つの JSON にするところからやっていく。
awscli の profile は適宜読み替えてほしい。
$ YOUR_S3_BUCKET_NAME=XXXXX
$ YOUR_FLUENTD_SOURCE_TAG_NAME=YYYYY
$ aws --profile iot s3 cp --recursive s3://${YOUR_S3_BUCKET_NAME}/logs/${YOUR_FLUENTD_SOURCE_TAG_NAME}/2018/11/28 logs/11/28
$ find logs/11/29 -name '*.gz' -print0 | xargs -0 -I{} gunzip {} -c > co2.log.json
上記のコマンドで作成したデータ(co2.log.json
)を Python のスクリプトでプロットする。(Python には詳しくありません)
import pandas as pd
from matplotlib import pyplot
from datetime import datetime, timezone, timedelta
with open('./co2.log.json') as co2_log:
df = pd.read_json(co2_log, lines=True)
df['dt'] = pd.to_datetime(df['dt'])
df.set_index('dt', inplace=True)
df.index = df.index.tz_localize('UTC').tz_convert('Asia/Tokyo')
# 絞り込みはこんな感じ
# JST = timezone(timedelta(hours=+9), 'JST')
# df = df[df.index > datetime(2018, 11, 27, 19, 30, 0, 0, tzinfo=JST)]
series = pd.Series(df['ppm'], name='co2 ppm', index=df.index)
series.plot()
pyplot.show()
元データが違うがプロット結果はこんな感じに表示される。(値は気にしないでいただけると幸いです)
AWS IoT に送ったデータを処理する
AWS IoT に送ったデータはルールとそれを処理するアクションを設定することで使用可能になる。
- ルール: MQTT トピックの絞り込みと、データの簡単な加工
- アクション: ルールに対して複数紐付けが可能で、ルールで取得したデータを処理する方法を設定する
今回は次のサービスの組み合わせで Slack にアラームを投げることを目標とする。
- AWS IoT Rules: CloudWatch Metrics でカスタムメトリクスとして二酸化炭素濃度を追加する
- CloudWatch Alerm: 二酸化炭素濃度が基準値を超えているかを判断し、 Amazon SNS へ通知
- Amazon SNS: トピックを作成
- AWS Lambda: CloudWatch Alerm のアラームが上がると Slack へ投稿する funtion の作成、上記トピックの購読
これはとても設定範囲が大きいので重要なところを掻い摘んで記載する。
AWS IoT Rules
クエリを以下のように設定すると { "ppm": 999 }
といった形の JSON になる。
SELECT ppm FROM '${THING_TYPE}/${THING_NAME}'
アクションは CloudWatch Metrics のものを追加すればよく、単位と値にさえ気をつければ問題はない。
- 単位: ドキュメントに列挙されているものを選ぶ必要がある。(今回は None を選ぶべきか)
- 値:
$ppm
と指定すれば良い。
AWS Lambda
良い記事があるのでそちらを参照すること。
https://qiita.com/hf7777hi/items/e0f43f0fb7e2effa0af8
https://qiita.com/TanakanoAnchan/items/dae4edecfd0d85ea2542
終わりに
駆け足で様々な要素を組み合わせて二酸化炭素濃度を測る仕組みを眺めてきた。
この記事で紹介した方法以外にも同じようなことを実現する方法はあるはずで、各々がやりやすい方法を模索してほしい。