search
LoginSignup
1

More than 1 year has passed since last update.

posted at

updated at

Organization

在宅の環境を少しでもよくしたい...そうだ、部屋の空気に注目してみよう!

はじめに

エイチームフィナジーアドベントカレンダー12日目を担当します @yothio です。

某ウイルスの影響により、エイチームグループでは2020年の3月頃から在宅でも業務ができるようになりました。
ただ、今までのオフィス勤務と同じぐらいパフォーマンスをだすのはなかなか難しいと思ってます、大きく変化がある「職場環境」「コミュニケーション」の2つによってパフォーマンスが発揮しづらいかと思います。

自宅でもパフォーマンスを落とさず、むしろ上げるために意識して行ってきたことをシステム化して話をします。

内容としては「部屋の二酸化炭素濃度」を「計測」することで
換気を促し」「在宅の環境を良くすること」です

二酸化炭素とパフォーマンスの関係

そもそも「二酸化炭素濃度とパフォーマンスにどんな関係があるんだ?」と思う方が多いと思います
下記に掲載しているリンクのどれもが「ローレンス・バークレー国立研究所」の結果を使用した記事となっていますが二酸化炭素(CO2)濃度が増えることによってパフォーマンスに悪影響を与えていることがわかったようです。

CO2濃度が脳の機能(意思決定)に与える影響についてニューヨーク州立大学と共同研究を行いました。22人の被験者に対し9項目の意思決定テストを行い比較したところ、CO2濃度が600ppmの環境に比べて、1000ppmで6項目が劣り、2500ppmの環境では7項目が大幅に劣ったそうです。

上記引用元
参考リンクA
参考リンクB

また、換気扇を用いた話ですが下記のサイトでも「なぜ換気を行うのか」「換気を行うことでの良い効果」について書かれています
なんで換気をするのかしら?|換気の情報サイト「いいね!換気扇」

そのことを踏まえると定期的な換気を行うのはとても大事だということがわかります。

システム概要

二酸化炭素の濃度を測るモジュールとしてMH-Z14Aを使用し
モジュールと接続し、通知までのつなぎとしてRaspberry Pi 3 Model B(以下: RaspberryPi)
そして通知先にSlackを使用します

流れとしては下記のとおりです
1. RaspberryPiのセットアップ
2. MH-Z14Aと繋ぐ
3. 二酸化炭素濃度を取得するプログラムの記述
4. 通知を飛ばすためのしきい値設定
5. Slackへの通知設定

RaspberryPiのセットアップ

初期セットアップについては他の方が記事にしているのでザックリと実施したことを載せていきます
今回必ず実施する必要があるのは一番下のUARTの有効化です

MH-Z14Aと繋ぐ

コチラがMHZ-14Aのマニュアルになります。

とても丁寧にPIN番号と説明が書いてあるのでRaspberryPiのGPIOとにらめっこしながら繋げていきます。

RaspberryPi MH-Z14A
電源 4 17
GND 5 16
入力ピン 8 18
出力ピン 10 19

対応したPINを設定した写真がこちらになります


GPIO - Raspberry Pi Documentation

二酸化炭素濃度を取得する

次は実際にコードを書いて二酸化炭素濃度を取得してみます
pythonでシリアル通信の結果を取得するため、あらかじめpyserialをインストールしておきます。

$ pip install pyserial

MH-Z14Aに関すること調べていると動きそうなコードがあったため、少し変更して使用いたします
GitHub - keepworking/MH-Z14A-PI: co2 sensor python code

mh_z14a.py
import serial
import time
import datetime
import json
import argparse
import sys
import csv
import requests

class MHZ14A():
    PACKET = [0xff, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79]
    ZERO = [0xff, 0x01, 0x87, 0x00, 0x00, 0x00, 0x00, 0x00, 0x78]

    def __init__(self, ser):
        self.serial = serial.Serial(ser, 9600, timeout=3)
        time.sleep(2)

    def zero(self):
        self.serial.write(bytearray(MHZ14A.ZERO))

    def get(self):
        self.serial.write(bytearray(MHZ14A.PACKET))
        res = self.serial.read(size=9)
        res = bytearray(res)
        checksum = 0xff & (~(res[1] + res[2] + res[3] + res[4] + res[5] + res[6] + res[7]) + 1)
        if res[8] == checksum:
            return {
                "date_label": datetime.datetime.today().strftime("%Y/%m/%d %H:%M"),
                "ppm": (res[2] << 8) | res[3],
            }
        else:
            raise Exception("checksum: " + hex(checksum))

    def close(self):
        self.serial.close()

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode")
    args = parser.parse_args()
    mode = args.mode or "co2"

    co2 = MHZ14A("/dev/ttyS0")

    if mode == "zero":
        co2.zero()
    else:
        try:
            print(co2.get())
        except Exception as e:
            print(e)
            pass
    co2.close()

if __name__ == '__main__':
    main()

しっかりと二酸化炭素濃度(ppm)が取得できていますね

通知を飛ばすためのしきい値設定

測った値をChart.jsを使用してグラフとして見ていきましょう。


グラフを見る感じだと急激に数値が下がっている箇所はしっかり換気を実施した箇所となるため400ppmあたりが下限値ですね

また気象庁が出している二酸化炭素濃度の経年変化を見る感じ下限値とほぼ同じなので正しく取得できていますね

今回測定してみて定期的に換気を実施していることもあり、1000ppmを超えることはなさそうなので800ppmをしきい値として設定しましょう。

Slackへの通知設定

Slackへの通知はWebhookを利用して送信します
Slack での Incoming Webhook の利用 | Slackの手順通りに実施すればWebhookのURLが手に入ります

上記リンクをクリックしたあとに下の画像にある「新しいSlackアプリを作成」をクリック

Slackへ通知を送るためのアプリを作成する
好きなApp Nameと通知を送信したいWorkspaceを選択


さきほど作成したものが確認できるのでそれをクリック(今回はco2_notify)


Webhook経由で通知を送るため下記の赤枠の箇所をクリック


そしてWebhookの有効化


あとは通知を飛ばしたいチャンネルを選択して、WebhookのURLをコピーしておく

あとはしきい値を超えたらWebhookのURLに対してリクエストを飛ばすように設定して完了

mh_z14a.py
import requests

def notify_slack():
    headers = { 'Content-type': 'application/json' }
    data = '{"text":"二酸化炭素の濃度が濃くなっています、換気しましょう。"}'.encode("utf-8")
    url = 'さっきコピーしたWebhookのURL'
    requests.post(url, headers=headers, data=data)

def main():
    # ~~~省略~~~
    if mode == "zero":
        co2.zero()
    else:
        try:
            result = co2.get()
            if result.get('ppm') > 800:
                notify_slack()

素敵。

最後に

いかがだったでしょうか、某ウイルスの影響により在宅で勤務している方が多いので
そういった人たちに換気の重要性と自分なりの改善手段を伝えることができたなら幸いです

プログラム全文


import serial
import time
import datetime
import json
import argparse
import sys
import csv
import requests


class MHZ14A():
    PACKET = [0xff, 0x01, 0x86, 0x00, 0x00, 0x00, 0x00, 0x00, 0x79]
    ZERO = [0xff, 0x01, 0x87, 0x00, 0x00, 0x00, 0x00, 0x00, 0x78]

    def __init__(self, ser):
        self.serial = serial.Serial(ser, 9600, timeout=3)
        time.sleep(2)

    def zero(self):
        self.serial.write(bytearray(MHZ14A.ZERO))

    def get(self):
        self.serial.write(bytearray(MHZ14A.PACKET))
        res = self.serial.read(size=9)
        res1 = self.serial.read()
        res = bytearray(res)
        checksum = 0xff & (~(res[1] + res[2] + res[3] + res[4] + res[5] + res[6] + res[7]) + 1)
        if res[8] == checksum:
            return {
                "date_label": datetime.datetime.today().strftime("%Y/%m/%d %H:%M"),
                "ppm": (res[2] << 8) | res[3],
                "dt": datetime.datetime.today().isoformat(),
                "ts": datetime.datetime.today().timestamp(),
            }
        else:
            raise Exception("checksum: " + hex(checksum))

    def close(self):
        self.serial.close()

def write_csv(params, output_path):
    with open(output_path + '/data.csv', 'a') as csv_file:

        fieldnames = ['ppm', 'date_label', 'dt', 'ts']
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        writer.writerow(params)

def notify_slack(ppm):
    headers = {
                'Content-type': 'application/json',
                }
    data = '{"text":"二酸化炭素の濃度が濃くなっています、換気しましょう。"}'.encode("utf-8")
    url = 'hogehoge'
    requests.post(url, headers=headers, data=data)

def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--mode")
    args = parser.parse_args()
    mode = args.mode or "co2"

    co2 = MHZ14A("/dev/ttyS0")

    if mode == "zero":
        co2.zero()
    else:
        try:
            result = co2.get()
            write_csv(result, '/home/Ryoshiki/workspace')
            if result.get('ppm') > 800:
                notify_slack(result.get('ppm'))
        except Exception as e:
            print(e)
            pass
    co2.close()


if __name__ == '__main__':
    main()

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
What you can do with signing up
1