☕️概要
以下のデータをRaspberry Piで収集して、SynologyNAS 上の Docker で動いている InfluxDB に書き込み、Grafanaでグラフ化した。
- SwitchBot 温湿度計
- BME280 気圧 温湿度計
- スマートメータ 瞬時消費電力
といっても既存の記事のお世話になりまくっただけの単なるまとめである。
本当はNetatmoが欲しかったのだが日本の代理店が無くなってしまったので、SwitchBot 温湿度計で我慢しようと思ったのだが、、、
- Hubを使っていると標準のグラフの記録インターバルが30分間隔だったり
- しばらく表示させたままにしていると表示の更新が止まってしまう
何とかしたいと思っていたら、世の中は既に出来るようになっているではないか。
💻機材
- Raspberry Pi ZeroW(もっといいやつにしてここでブラウザ動かせばよかった・・・)
- Python3 InfluxDB Grafana
- SwitchBot 温湿度計
- SwitchScience BME280搭載 温湿度・気圧センサモジュール
- HEMS用 Wi-SUNモジュール, Wi-SUN Route-B 専用 RL7023 Stick-D/IPS
言語はなんとなくPython3を使用したが、完全に素人である
🏺InfluxDB
私はたまたまSynology NASがDockerに対応していたのでそこでコンテナを動かしている。
適当に公式イメージを起動しただけなのだが、デフォルトでは認証が特にないので何かした方がよいと思われる。
私は素人なので何がベストプラクティスなのか判断が付かないため、ここでは言及を避ける。
書き込み
事前にDBを作っておきさえすれば、以下のコードだけで書ける。神か
from influxdb import InfluxDBClient
client = InfluxDBClient("サーバーのアドレス", 8086, "ID", "パスワード", "DB名")
record = [
{ # Switchbot 温湿度計データの場合
"fields" : {
"rssi" : data["rssi"],
"battery" : data["battery"],
"temp" : data["temp"],
"humid" : float(data["humid"]),
},
"tags" : {
"device" : data["device"], # BLEのMACアドレスを入れている
"name" : data["name"] # MACアドレスからフレンドリ名を引いて書いている
},
"measurement" : "enviroment" # 適当だよ!^^
},
]
client.write_points(record)
🌡SwitchBot 温湿度計
素晴らしいことに2秒おきに測定値がBroadcastされている。単4電池2本。
神メーカーなのでこのBLE仕様が公開されている
Meter BLE open API
実装はbluepy
を使った。
素晴らしい神マニュアルが存在するので、サンプルコードをベースに適当にした。
最近、FWバージョンアップをしたら動かなくなってしまったので見てみると、ScanResponseにしか128bit Service UUIDが含まれなくなってしまったようなので少し変えた。
#!/usr/bin/env python3
import time
from bluepy.btle import Scanner, DefaultDelegate
DEVICE_NAMES = {
"00:11:22:33:44:55" : "リビング",
"66:77:88:99:aa:bb" : "寝室",
"cc:dd:ee:ff:00:11" : "屋外",
}
class ScanDelegate(DefaultDelegate):
def __init__(self):
DefaultDelegate.__init__(self)
def handleDiscovery(self, dev, isNewDev, isNewData):
if dev.addr in DEVICE_NAMES.keys():
advertise = dev.getScanData()
if self.__isMeterAdvertise(advertise, dev):
record = self.__readMesuredValue(advertise, dev)
self.__writeRecord(record)
def __isMeterAdvertise(self, advertise):
for (adtype, desc, value) in advertise:
if ((adtype == 22) & (value[4:6] == "54")):
return True
return False
def __readMesuredValue(self, advertise, dev):
result = {"rssi" : dev.rssi, "Addr" : dev.addr}
result["name"] = DEVICE_NAMES.get(dev.addr, dev.addr)
# advertiseリスト内の要素のうち、adTypeが22のレコードを取り出し、
# そのタプルのvalueを取り出して、先頭のUUIDを捨てる
value = next(filter(lambda d:True if d[0] == 22 else False, advertise))[2][4:]
result["battery"] = (int(value[ 4: 6], 16) & 0x7F)
result["temp"] = (int(value[ 6: 8], 16) & 0x0F) / 10.0
result["temp"] += (int(value[ 8:10], 16) & 0x7F)
result["temp"] *= -1 if (int(value[8:10], 16) & 0x80 == 0) else 1
result["humid"] = (int(value[10:12], 16) & 0x7F)
return result
Scanner().withDelegate(ScanDelegate()).scan(0.0)
🏞BME280 気圧 温湿度計
Raspberry Piへの接続など、基本的なところは以下の記事を参考にさせていただいた。
ラズパイ+BME280でIoT環境センサー構築(その1)
のちに線を逆に出しておけばよかったと後悔する図
BME280のデータシートを見ると、I2Cで使うならばCSB端子をHiにしなさいと書かれているので、そのようにしておいたが、内臓Pull-Upで吊ってあるようなので元記事のように未接続でも問題ないのかもしれない。
ソフト的には素晴らしいことにSwitch Scienceが神サンプルコードを公開してくれているのでこれをそのまま使う。
私はせっかくなので最速でサンプリングさせるようにレジスタ値を設定したのだが、Normal Mode
で連続でサンプリングさせているときにread_byte_data()
で1byteずつ読むと読んでる途中でデータが書き換わるのではないかと思いBME280のデータシートを確認すると、バーストリードしなはれとの事なので、readData()
でレジスタを読むところはread_i2c_block_data()
に置き換えた。(波形は未確認、、、)
def readData():
#data = []
#for i in range (0xF7, 0xF7+8):
# data.append(bus.read_byte_data(i2c_address, i))
data = bus.read_i2c_block_data(i2c_address, 0xF7, 8)
⚡️スマートメータ 瞬時消費電力
以下の記事を見て、私は神じゃんと思った。
スマートメーターの情報を最安ハードウェアで引っこ抜く
更に、以下の記事で神実装をしてくださっている方がいたので、これをベースにさせて頂く。
Worker Threadデザインパターンでスマートメーター計測ロガー
keiconf_broute.py
をベースにUpload先がInfluxDBとなるように適当に実装した。
すごく適当なのでスキーマ的な定義をconf.py
側から流し込めない残念実装である。
#!/usr/bin/env python3
import threading
import sys
import queue
from keilib.worker import Worker
from influxdb import InfluxDBClient
from logging import getLogger, StreamHandler, DEBUG
logger = getLogger(__name__)
class InfluxDbWriter ( Worker ):
def __init__( self , record_que, db_addr, db_port, db_user, db_pw, db_name):
super().__init__()
self.record_que = record_que
self.client = InfluxDBClient(db_addr, db_port, db_user, db_pw, db_name)
def run ( self ):
logger.info('[START]')
while not self.stopEvent.is_set():
try:
filename, key, value, nazo = self.record_que.get(timeout=3)
except:
continue
record = [
{
"fields" : {
"power" : value,
},
"tags" : {
"device" : "Rpi B-SmartMeter",
"name" : "Rpi B-SmartMeter",
},
"measurement" : "enviroment"
},
]
self.client.write_points(record)
logger.debug(record)
logger.info('[STOP]')
🤖daemon化する
以下の神記事を参考に、3つのプログラムをdaemonにしておく。
pythonスクリプトをdaemonにする[systemd編]
これで再起動しても自動的にいい感じになるし、例外が起きて落ちても勝手に再起動してもらえるだろう。
📈Grafanaでグラフを書く
これも公式のイメージを適当に起動した。なんもしなくて動くので本当に神である。
初回ログインについては公式のマニュアルを参照する。
あとはチュートリアルに沿ってDatasourceであるInfluxDBを追加するなどすれば準備は完了だ。
適当にDashboardを作り、New PanelでAdd Visualizationを追加し、
例えば気温であれば以下のように設定すればよい。
🎊完成
タブレットホルダで壁に掛けた
いい感じではないか!
課題
- たまにGrafanaが勝手にログイン画面に戻ってしまう
- 少し楽になった。
grafana-kiosk でログイン、全画面表示を自動化
- 少し楽になった。
- NASへのディスクアクセスが永久に発生する
- 室温がBLEで垂れ流しなので在宅してるのかモロばれ
先駆者の皆様に大変感謝いたします。