はじめに
エイチームフィナジーアドベントカレンダー12日目を担当します @yothio です。
某ウイルスの影響により、エイチームグループでは2020年の3月頃から在宅でも業務ができるようになりました。
ただ、今までのオフィス勤務と同じぐらいパフォーマンスをだすのはなかなか難しいと思ってます、大きく変化がある「職場環境」「コミュニケーション」の2つによってパフォーマンスが発揮しづらいかと思います。
自宅でもパフォーマンスを落とさず、むしろ上げるために意識して行ってきたことをシステム化して話をします。
内容としては「部屋の二酸化炭素濃度」を「計測」することで
「換気を促し」「在宅の環境を良くすること」です
二酸化炭素とパフォーマンスの関係
そもそも「二酸化炭素濃度とパフォーマンスにどんな関係があるんだ?」と思う方が多いと思います
下記に掲載しているリンクのどれもが「ローレンス・バークレー国立研究所」の結果を使用した記事となっていますが二酸化炭素(CO2)濃度が増えることによってパフォーマンスに悪影響を与えていることがわかったようです。
CO2濃度が脳の機能(意思決定)に与える影響についてニューヨーク州立大学と共同研究を行いました。22人の被験者に対し9項目の意思決定テストを行い比較したところ、CO2濃度が600ppmの環境に比べて、1000ppmで6項目が劣り、2500ppmの環境では7項目が大幅に劣ったそうです。
また、換気扇を用いた話ですが下記のサイトでも「なぜ換気を行うのか」「換気を行うことでの良い効果」について書かれています
なんで換気をするのかしら?|換気の情報サイト「いいね!換気扇」
そのことを踏まえると定期的な換気を行うのはとても大事だということがわかります。
システム概要
二酸化炭素の濃度を測るモジュールとしてMH-Z14Aを使用し
モジュールと接続し、通知までのつなぎとしてRaspberry Pi 3 Model B(以下: RaspberryPi)
そして通知先にSlackを使用します
流れとしては下記のとおりです
- RaspberryPiのセットアップ
- MH-Z14Aと繋ぐ
- 二酸化炭素濃度を取得するプログラムの記述
- 通知を飛ばすためのしきい値設定
- Slackへの通知設定
RaspberryPiのセットアップ
初期セットアップについては他の方が記事にしているのでザックリと実施したことを載せていきます
今回必ず実施する必要があるのは一番下のUARTの有効化です
- [RaspberryPiのOSを書き込み]
(https://qiita.com/shippokun/items/9070fc58f69d8c063e44)- OSないと始まらないので、Raspbianを選択
-
買ったらまず実施!RaspberryPiのセキュリティ対策 - Qiita
- デフォルトユーザー使うのよくないって広辞苑に書いてあった。
- WiFi設定(コマンドライン) - Raspberry Pi公式ドキュメントを日本語訳
-
Raspberry Pi に固定IPアドレスを割り当てる方法(Raspbian Jessie) - Qiita
- SSHで接続するのにIPが変わると嬉しくないので実施
-
Raspberry PiでUARTの有効化+シリアル通信 - Qiita
- モジュールとのやりとりはUARTで実施するため
MH-Z14Aと繋ぐ
コチラがMHZ-14Aのマニュアルになります。
とても丁寧にPIN番号と説明が書いてあるのでRaspberryPiのGPIOとにらめっこしながら繋げていきます。
RaspberryPi | MH-Z14A | |
---|---|---|
電源 | 4 | 17 |
GND | 5 | 16 |
入力ピン | 8 | 18 |
出力ピン | 10 | 19 |
対応したPINを設定した写真がこちらになります
[GPIO \- Raspberry Pi Documentation](https://www.raspberrypi.org/documentation/usage/gpio/README.md)二酸化炭素濃度を取得する
次は実際にコードを書いて二酸化炭素濃度を取得してみます
pythonでシリアル通信の結果を取得するため、あらかじめpyserial
をインストールしておきます。
$ pip install pyserial
MH-Z14Aに関すること調べていると動きそうなコードがあったため、少し変更して使用いたします
GitHub - keepworking/MH-Z14A-PI: co2 sensor python code
import serial
import time
import datetime
import json
import argparse
import sys
import csv
import requests
class MHZ14A():
PACKET = [0xff, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79]
ZERO = [0xff, 0x01, 0x87, 0x00, 0x00, 0x00, 0x00, 0x00, 0x78]
def __init__(self, ser):
self.serial = serial.Serial(ser, 9600, timeout=3)
time.sleep(2)
def zero(self):
self.serial.write(bytearray(MHZ14A.ZERO))
def get(self):
self.serial.write(bytearray(MHZ14A.PACKET))
res = self.serial.read(size=9)
res = bytearray(res)
checksum = 0xff & (~(res[1] + res[2] + res[3] + res[4] + res[5] + res[6] + res[7]) + 1)
if res[8] == checksum:
return {
"date_label": datetime.datetime.today().strftime("%Y/%m/%d %H:%M"),
"ppm": (res[2] << 8) | res[3],
}
else:
raise Exception("checksum: " + hex(checksum))
def close(self):
self.serial.close()
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mode")
args = parser.parse_args()
mode = args.mode or "co2"
co2 = MHZ14A("/dev/ttyS0")
if mode == "zero":
co2.zero()
else:
try:
print(co2.get())
except Exception as e:
print(e)
pass
co2.close()
if __name__ == '__main__':
main()
通知を飛ばすためのしきい値設定
測った値をChart.jsを使用してグラフとして見ていきましょう。
グラフを見る感じだと急激に数値が下がっている箇所はしっかり換気を実施した箇所となるため400ppmあたりが下限値ですねまた気象庁が出している二酸化炭素濃度の経年変化を見る感じ下限値とほぼ同じなので正しく取得できていますね
今回測定してみて定期的に換気を実施していることもあり、1000ppmを超えることはなさそうなので800ppmをしきい値として設定しましょう。
Slackへの通知設定
Slackへの通知はWebhookを利用して送信します
Slack での Incoming Webhook の利用 | Slackの手順通りに実施すればWebhookのURLが手に入ります
上記リンクをクリックしたあとに下の画像にある「新しいSlackアプリを作成」をクリック
Slackへ通知を送るためのアプリを作成する
好きなApp Nameと通知を送信したいWorkspaceを選択
さきほど作成したものが確認できるのでそれをクリック(今回はco2_notify)
Webhook経由で通知を送るため下記の赤枠の箇所をクリック
あとは通知を飛ばしたいチャンネルを選択して、WebhookのURLをコピーしておく
あとはしきい値を超えたらWebhookのURLに対してリクエストを飛ばすように設定して完了
import requests
def notify_slack():
headers = { 'Content-type': 'application/json' }
data = '{"text":"二酸化炭素の濃度が濃くなっています、換気しましょう。"}'.encode("utf-8")
url = 'さっきコピーしたWebhookのURL'
requests.post(url, headers=headers, data=data)
def main():
# ~~~省略~~~
if mode == "zero":
co2.zero()
else:
try:
result = co2.get()
if result.get('ppm') > 800:
notify_slack()
素敵。
最後に
いかがだったでしょうか、某ウイルスの影響により在宅で勤務している方が多いので
そういった人たちに換気の重要性と自分なりの改善手段を伝えることができたなら幸いです
プログラム全文
import serial
import time
import datetime
import json
import argparse
import sys
import csv
import requests
class MHZ14A():
PACKET = [0xff, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79]
ZERO = [0xff, 0x01, 0x87, 0x00, 0x00, 0x00, 0x00, 0x00, 0x78]
def __init__(self, ser):
self.serial = serial.Serial(ser, 9600, timeout=3)
time.sleep(2)
def zero(self):
self.serial.write(bytearray(MHZ14A.ZERO))
def get(self):
self.serial.write(bytearray(MHZ14A.PACKET))
res = self.serial.read(size=9)
res1 = self.serial.read()
res = bytearray(res)
checksum = 0xff & (~(res[1] + res[2] + res[3] + res[4] + res[5] + res[6] + res[7]) + 1)
if res[8] == checksum:
return {
"date_label": datetime.datetime.today().strftime("%Y/%m/%d %H:%M"),
"ppm": (res[2] << 8) | res[3],
"dt": datetime.datetime.today().isoformat(),
"ts": datetime.datetime.today().timestamp(),
}
else:
raise Exception("checksum: " + hex(checksum))
def close(self):
self.serial.close()
def write_csv(params, output_path):
with open(output_path + '/data.csv', 'a') as csv_file:
fieldnames = ['ppm', 'date_label', 'dt', 'ts']
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writerow(params)
def notify_slack(ppm):
headers = {
'Content-type': 'application/json',
}
data = '{"text":"二酸化炭素の濃度が濃くなっています、換気しましょう。"}'.encode("utf-8")
url = 'hogehoge'
requests.post(url, headers=headers, data=data)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--mode")
args = parser.parse_args()
mode = args.mode or "co2"
co2 = MHZ14A("/dev/ttyS0")
if mode == "zero":
co2.zero()
else:
try:
result = co2.get()
write_csv(result, '/home/Ryoshiki/workspace')
if result.get('ppm') > 800:
notify_slack(result.get('ppm'))
except Exception as e:
print(e)
pass
co2.close()
if __name__ == '__main__':
main()