LoginSignup
17
12

More than 1 year has passed since last update.

SwitchBot Meter を InfluxDBに貯めてGrafanaでグラフ化した

Last updated at Posted at 2020-03-14

☕️概要

以下のデータをRaspberry Piで収集して、SynologyNAS 上の Docker で動いている InfluxDB に書き込み、Grafanaでグラフ化した。

  • SwitchBot 温湿度計
  • BME280 気圧 温湿度計
  • スマートメータ 瞬時消費電力

といっても既存の記事のお世話になりまくっただけの単なるまとめである。
本当はNetatmoが欲しかったのだが日本の代理店が無くなってしまったので、SwitchBot 温湿度計で我慢しようと思ったのだが、、、

  • Hubを使っていると標準のグラフの記録インターバルが30分間隔だったり
  • しばらく表示させたままにしていると表示の更新が止まってしまう

何とかしたいと思っていたら、世の中は既に出来るようになっているではないか。

💻機材

言語はなんとなくPython3を使用したが、完全に素人である

🏺InfluxDB

私はたまたまSynology NASがDockerに対応していたのでそこでコンテナを動かしている。
適当に公式イメージを起動しただけなのだが、デフォルトでは認証が特にないので何かした方がよいと思われる。
私は素人なので何がベストプラクティスなのか判断が付かないため、ここでは言及を避ける。

書き込み

事前にDBを作っておきさえすれば、以下のコードだけで書ける。神か:pray:

from influxdb import InfluxDBClient

client = InfluxDBClient("サーバーのアドレス", 8086, "ID", "パスワード", "DB名")
record = [
    { # Switchbot 温湿度計データの場合
        "fields" : {
            "rssi"    : data["rssi"],
            "battery" : data["battery"],
            "temp"    : data["temp"],
            "humid"   : float(data["humid"]),
        },
        "tags" : {
            "device" : data["device"], # BLEのMACアドレスを入れている
            "name"   : data["name"]    # MACアドレスからフレンドリ名を引いて書いている
        },
        "measurement" : "enviroment" # 適当だよ!^^
    },
]
client.write_points(record)

🌡SwitchBot 温湿度計

素晴らしいことに2秒おきに測定値がBroadcastされている。単4電池2本。
神メーカーなのでこのBLE仕様が公開されている :hugging:
Meter BLE open API

実装はbluepyを使った。
素晴らしい神マニュアルが存在するので、サンプルコードをベースに適当にした。

最近、FWバージョンアップをしたら動かなくなってしまったので見てみると、ScanResponseにしか128bit Service UUIDが含まれなくなってしまったようなので少し変えた。

#!/usr/bin/env python3
import time
from bluepy.btle import Scanner, DefaultDelegate

DEVICE_NAMES = {
    "00:11:22:33:44:55" : "リビング",
    "66:77:88:99:aa:bb" : "寝室",
    "cc:dd:ee:ff:00:11" : "屋外",
}

class ScanDelegate(DefaultDelegate):
    def __init__(self):
        DefaultDelegate.__init__(self)

    def handleDiscovery(self, dev, isNewDev, isNewData):
        if dev.addr in DEVICE_NAMES.keys():
            advertise = dev.getScanData()
            if self.__isMeterAdvertise(advertise, dev):
                record = self.__readMesuredValue(advertise, dev)
                self.__writeRecord(record)
    
    def __isMeterAdvertise(self, advertise):
        for (adtype, desc, value) in advertise:
            if ((adtype == 22) & (value[4:6] == "54")):
                return True
        return False
    
    def __readMesuredValue(self, advertise, dev):
        result = {"rssi" : dev.rssi, "Addr" : dev.addr}
        result["name"] = DEVICE_NAMES.get(dev.addr, dev.addr)
        # advertiseリスト内の要素のうち、adTypeが22のレコードを取り出し、
        # そのタプルのvalueを取り出して、先頭のUUIDを捨てる
        value = next(filter(lambda d:True if d[0] == 22 else False, advertise))[2][4:]
        result["battery"] = (int(value[ 4: 6], 16) & 0x7F)
        result["temp"]    = (int(value[ 6: 8], 16) & 0x0F) / 10.0
        result["temp"]   += (int(value[ 8:10], 16) & 0x7F)
        result["temp"]   *= -1 if (int(value[8:10], 16) & 0x80 == 0) else 1
        result["humid"]   = (int(value[10:12], 16) & 0x7F)
        return result

Scanner().withDelegate(ScanDelegate()).scan(0.0)

🏞BME280 気圧 温湿度計

Raspberry Piへの接続など、基本的なところは以下の記事を参考にさせていただいた。
ラズパイ+BME280でIoT環境センサー構築(その1)
wiring.jpg
のちに線を逆に出しておけばよかったと後悔する図

BME280のデータシートを見ると、I2Cで使うならばCSB端子をHiにしなさいと書かれているので、そのようにしておいたが、内臓Pull-Upで吊ってあるようなので元記事のように未接続でも問題ないのかもしれない。

ソフト的には素晴らしいことにSwitch Scienceが神サンプルコードを公開してくれているのでこれをそのまま使う。
私はせっかくなので最速でサンプリングさせるようにレジスタ値を設定したのだが、Normal Modeで連続でサンプリングさせているときにread_byte_data()で1byteずつ読むと読んでる途中でデータが書き換わるのではないかと思いBME280のデータシートを確認すると、バーストリードしなはれとの事なので、readData()でレジスタを読むところはread_i2c_block_data()に置き換えた。(波形は未確認、、、)

def readData():
    #data = []
    #for i in range (0xF7, 0xF7+8):
    #    data.append(bus.read_byte_data(i2c_address, i))
    data = bus.read_i2c_block_data(i2c_address, 0xF7, 8)

⚡️スマートメータ 瞬時消費電力

以下の記事を見て、私は神じゃんと思った。
スマートメーターの情報を最安ハードウェアで引っこ抜く

更に、以下の記事で神実装をしてくださっている方がいたので、これをベースにさせて頂く。
Worker Threadデザインパターンでスマートメーター計測ロガー

keiconf_broute.py をベースにUpload先がInfluxDBとなるように適当に実装した。
すごく適当なのでスキーマ的な定義をconf.py側から流し込めない残念実装である。

#!/usr/bin/env python3
import threading
import sys
import queue
from keilib.worker import Worker
from influxdb import InfluxDBClient
from logging import getLogger, StreamHandler, DEBUG
logger = getLogger(__name__)

class InfluxDbWriter ( Worker ):
    def __init__( self , record_que, db_addr, db_port, db_user, db_pw, db_name):
        super().__init__()
        self.record_que = record_que
        self.client = InfluxDBClient(db_addr, db_port, db_user, db_pw, db_name)

    def run ( self ):
        logger.info('[START]')
        while not self.stopEvent.is_set():
            try:
                filename, key, value, nazo = self.record_que.get(timeout=3)
            except:
                continue

            record = [
                {
                    "fields" : {
                        "power" : value,
                    },
                    "tags" : {
                        "device" : "Rpi B-SmartMeter",
                        "name" : "Rpi B-SmartMeter",
                    },
                    "measurement" : "enviroment"
                },
            ]
            self.client.write_points(record)
            logger.debug(record)
        
        logger.info('[STOP]')

🤖daemon化する

以下の神記事を参考に、3つのプログラムをdaemonにしておく。
pythonスクリプトをdaemonにする[systemd編]

これで再起動しても自動的にいい感じになるし、例外が起きて落ちても勝手に再起動してもらえるだろう。

📈Grafanaでグラフを書く

これも公式のイメージを適当に起動した。なんもしなくて動くので本当に神である。
初回ログインについては公式のマニュアルを参照する。
あとはチュートリアルに沿ってDatasourceであるInfluxDBを追加するなどすれば準備は完了だ。

適当にDashboardを作り、New PanelでAdd Visualizationを追加し、
例えば気温であれば以下のように設定すればよい。
image.png

🎊完成

case.jfif
タブレットホルダで壁に掛けた
tab.jpg

いい感じではないか!

課題

先駆者の皆様に大変感謝いたします。

17
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
17
12