1. はじめに
実家で新しく猫を飼うことになりました。私は別の家で暮らしているので毎日猫に会うことはできませんが、成長の過程をデータだけでも見たいので猫トイレに体重計をつけようと考えました。また、体重変化や排泄物量を調べることで病気の発見にも繋がるかもしれません。
それと最近こういう電子工作的なことをしていないので暇つぶしを兼ねています。
製作したハードウェアとソフトウェアはGithubにあげてあります。
Miyabi1456/cat-weight-measurement
ノルウェージャンフォレストキャット
2. 先行例調査
いきなり手を付ける前に、まずは先行例を調べました。猫の体重変化、排泄物の量が知りたいというのは一般的なニーズなので、当然ながらいくつも見つかります。有名どころの製品をいくつか見て、良さそうなのがあれば電子工作は止めにしてそれを買うつもりで調べました。
2-1. 製品
リンク先の情報から、
- 専用のシステムトイレを使う(うんちは猫砂、おしっこは吸水シートで受けるタイプ)
-
体重は10g単位、尿量は1g単位で区別し計測
吸水パッドを尿量計、トイレ自体を体重計で支持する方式で、2つの重量計で別々に計測する方式のようです - 猫砂はペットケアモニター専用チップまたは検証済の市販システムトイレ用チップ
- 計測上限 10kg
- トイレサイズ(おそらく外寸): 約380mm×約576mm
- 個体識別は個体識別バッジを首輪につけるか、バッジなしの体重識別モード
- 本体代税込6,980円, 10頭まで月額税込330円
うちの猫にはむいてないかな~って感じの製品です。
まず前提として、猫の腎臓病や膀胱炎予防のためには、猫が気軽にトイレに行ける環境を作ることが大切です。それを踏まえた上で何が良くないかと言うと、
-
システムトイレであること:
システムトイレはおしっこの付いた吸水シートがトイレ下部にあり臭うので、猫にとって受け入れにくい1そうです。 -
猫砂に吸収しないタイプのチップしか使えない
システムトイレなのでこのタイプのチップしか使えません。猫が好みやすい猫砂は吸水して固まるタイプで粒度は細かめな傾向2にあり、専用品はその真逆です -
トイレが専用品でやや小さい
猫が好むトイレのサイズは体長の1.5倍程度2だそうで、この専用トイレは内寸がいくらかわかりませんが、ペットケアモニター用シートが約430mm×約290mmであることからやや小さめと推測できます。うちの猫はノルウェージャンフォレストキャット(猫平均体長300mmの1.5倍程度になる)なので、700mmくらい欲しいです。ちなみにうちでは外寸約470mm×約820mmのこのトイレを使ってます。
あくまでも傾向なので気にしない猫もいるとは思いますが、病気発見のために猫が行きにくいトイレにするのは本末転倒感ありますね。
反対に良いところは、体重と尿量の計測精度が良い点です。
健康な猫の体重はある程度一定に保たれ、1ヶ月に5%など変化した場合は病院に連れて行った方が良い3と言われています。
体重4kgの猫なら5%は200gにあたるので、体重計にはその差が分かる程度の精度が必要とされます。
リンク先の情報から、
- 専用のシステムトイレを使う
- 自動撮影機能あり
- 個体識別はバッジ等不要
- 猫砂は市販システムトイレ用チップ
- トイレサイズ(おそらく外寸): 約410mm×約523mm
- 一括購入プランの場合:税込64,000円, 月額プランの場合:本体税込9,980円+月額1,480円
シャープのペットケアモニターと同じくシステムトイレしか使えない時点でう~んって感じです。
リンク先の情報から、
- トイレ下に設置するタイプの重量計なので、トイレは任意のものが使える
- 計測上限 20kg(トイレ, 猫砂含む)
- 個体識別はバッジ等不要
- 寸法 480x335x35mm
- 本体代税込19,800円, 1頭あたり月額税込780円
ボードが小さいですが、トイレがはみ出しても設置できないことはないようなのでワイドトイレでも使えそうです。
精度はメーカーページには見当たりませんでした。レビュー4を見ると排泄量はg単位で表示されていますが、実際の精度はわかりません。
少なくとも猫トイレが自由に選べる点はシャープのペットケアモニターより良いですね。
この3つの中であれば、私はCatlog Boardを選びます。
ただ、実はこのCatlog Boardを見つける前にロードセルを買ってしまいました。
もう後には引けぬ。
2-2. 自作例
トイレに限らず猫の体重計測をしている例を探しました。ハードウェア構成などを参考にさせていただきます。
Qiita: 飼い猫が抱っこ嫌がって体重が量れないからネコベッドに24時間秒単位で体重を計測できる体重計を作ったら、猫の睡眠時間と「睡眠時の水分発散量」っぽい物が計測できた。
ハードウェア
- ロードセル:ビーム型, フルブリッジ, 定格20kg, 4個
- ADC: HX711 2個(ゲイン32倍固定のBチャンネルも使用)
- コントローラ:Raspberry Pi Zero W
データ記録・可視化
- Elasticsearch, Kibana
Amazon AWS: IoT を使ってお猫様の健康 (体重) をモニタリングしてみた
ハードウェア
- ロードセル:ビーム型, フルブリッジ, 定格5kg, 2個
- ADC: HX711 2個
- コントローラ:Raspberry Pi 4 Model B メモリ4G
- カメラ:Logicool C270n HD
データ記録・可視化
- AWS Cloud
ハードウェア
- ロードセル:薄型, ハーフブリッジ, 定格50kg, 4個
- ADC: HX711 1個
- コントローラ:M5Stack Basic
データ記録・可視化
- Googleスプレッドシート
Genki Taniguchi's Blog: 猫がウンコしたらLINEに通知が来るトイレを作った話
ハードウェア
- ロードセル:薄型, ハーフブリッジ, 定格50kg, 4個
- ADC: HX711 1個
- コントローラ:M5Stack ATOM Lite
データ記録・可視化
- IFFFT経由でLINE通知
Zenn: 市販のロードセル4つの体重計をIoT化する(ネコ様用)
ハードウェア
- ロードセル:タニタ 体重計 ホワイト HD-661-WH A4流用
- ADC: HX711 1個
- コントローラ:Raspberry Pi
- カメラ
データ記録・可視化
- 体重はGooleスプレッドシート
- 画像はSlack
調べた限り、ハードウェアは適当なロードセルを買うか、体重計を分解するかの2パターンでした。一瞬だけ以前Qiitaに投稿したサンプリングモアレ法を使って荷重計測しようかなと思いましたが、 ロードセルに比べてカメラは高い、厚みがでる、マイコンで処理するには厳しい、精度を出すのに苦労しそうなどの理由から止めました。今回は先人に習ってロードセルを使います。
データの記録にAWSやElasticsearch(初めて知った)を使うのは難しそうなので、
簡単そうなGoogleスプレッドシートでいきます
やっぱりLLMパワーを使えばウェブアプリ作れるなと思ったので変更しました。
3. 計測項目の検討
何を測るのかを考えます。製品調査から以下のような項目が計測されていることがわかりました。自作例では体重のみのものが多い印象です。
- 体重
- 排尿頻度
- 排尿量
- 排便頻度
- 排便量
- トイレ滞在時間
いきなり全部作ろうと思うと大変なので、まずは簡単に測れそうな項目を優先し、残りはできたらやるのスタンスでいきます。簡単そうなのは以下のものです。
- 体重
- 排尿・排便頻度(排尿と排便を区別しない)
- トイレ滞在時間
難しそうなのは、うんちとおしっこの区別、精度の問題から排尿量、排便量の計測です。
4. ハードウェア構成
使用するハードウェアの構成を考えます。予算は1万円以内くらいです。
4-1. ロードセル選定
まずはロードセル選定から始めます。
計測したいのがまず第一に猫の体重で、体重変化を1%単位で捉えることを目標にします。
猫を4kgとするとその1%は40gなので、体重計としてはそれを何分の一かした10g単位の精度で測れるようにしたいです。
いろいろあって入手性や組み込みやすさから以下のものを選定しました。ビーム型はねじ止めしないといけないけど、この薄型タイプだったら四隅にスペーサー入れて中心出っ張らせればいいだけなので簡単
フルスケールが10kgで、繰り返し精度は0.05% F.S(5g)です。
こういうのってメーカーによって記載が異なるそうで5、このロードセルメーカーの細かい定義は調べても出てこないのでわかりませんが、まあだいたいそれくらいのオーダーで測れるんでしょう。
4つ使うので、正規分布の加法性から$\sqrt{4 \times 5^2} = 10 \mathrm{g}$です。
また、ノイズ低減を期待して、可能な限りADCとロードセル間の配線長さが短くなるように、ハーフブリッジのロードセル4つにADC1つの構成ではなく、フルブリッジのロードセル4つにADC4つの構成にします。
これは特にやるつもりありませんが、4点それぞれで独立して荷重計測できることから重心推定もできるはずです。
4-2. ADC選定
もう秋月電子でロードセル用のADC買うならHX711一択みたいな風潮あります。他に選択肢あまりないんですよね。
AliExpressでCS1237というADCも買ったので比較をおまけに書きました。8. おまけ
分解能を概算します。
ロードセルの出力が1.0±0.2mV/V、HX711のロードセル供給電圧AVDDを4.3Vとします。
フルロード時のロードセル出力電圧は$1.0\mathrm{mV/V} \times 4.3\mathrm{V} = 4.3 \mathrm{mV}$
HX711のデータシートからAVDDが5Vでゲイン128倍設定のとき入力のフルスケールは20mVらしいので、
24bitの分解能のうち有効に使えるのは$2^{24} / \left( 20 \mathrm{mV} / 4.3 \mathrm{mV} \right) = 3607101$
1階調何グラムか計算すると$10 \mathrm{kg} / 3607101 = 0.00277 \mathrm{g}$です。
分解能は十分ですね。他は買って試します。
4-3. コントローラ選定
必要機能は、WiFi接続、IOピン8本以上で、秋月電子で買えるM5StampS3にしました。
余談ですが、Seeed Studio XIAO ESP32S3はおすすめしません。経験上WiFiアンテナ端子が接触不良になりやすいです。
4-4. ロードセル組み込みブラケットの設計製作
3D CADでロードセルを組み付けるブラケットを設計し、3Dプリンタで印刷します。
ロードセルの4隅をスペーサーが支え、中心部分を天板のボスが押す構造です。天板は回り止めも付けてあります。
秋月電子のHX711キット基板にΦ3くらいの穴があったらよかったのですが、ないので端を段差に載せて接着剤で固定します。
ロードセルとHX711がセットになったロードセルモジュールを4つ作りました。
接触不良による計測不安定を避けるため、ロードセルとHX711間の配線はターミナルやコネクタを使わず直ではんだ付けしました。一方、マイコンとHX711間はデジタルなので、アナログよりはノイズに気を使わなくても良いと考え利便性からXHコネクタにしました。
コンデンサを付けた理由はおまけに書きました。結論、ノイズ対策で付けたものの、効果なかったので不要でした。
4-5. マイコン基板の設計製作
ロードセルモジュールとマイコンを繋ぐための基板を作りました。
ユニバーサル基板用のいい感じのCADを知らないのでパワポです。
コンデンサの選定に深い意味はありません。手元にある適当なものを使ってます。
これでソフトウェアの動作確認を終えたタイミングで、プリント基板も設計して注文しました。
初めてKiCADを使ってプリント基板を設計しましたが、それほど迷うところはありませんでした。
注文してから裏面のGNDベタ忘れ、DCDC用のスルーホールが小さい、コンデンサの向きをなぜか揃えなかったりと修正点はいくつか見つかりましたが・・・
PCBWayで送料込み10枚2,987円(初回5ドル割引クーポン価格)、注文してから1週間ほどで手元に届きました。
CADは修正してあります。やっぱりDC電源はセンタープラス仕様にしました。
5. ソフトウェア構成
最終的には、猫がトイレに入ったときのみデータを記録するようにしたいですが、猫がトイレから出入りしたことの検知や排泄物の重量計算などのアルゴリズムを考える上で、重量計の時間変化を見ないことには何も始まりません。
そこで、まずはロードセルの値を1Hzほどの周期で送信し、ある程度データを貯め、それを見ながら諸々検討します。
5-1. 送信データ構造と記録フォーマット
- 送信データ構造
計測したロードセルのデータはjsonにつめて送ります。
ロードセルの死活監視は今のところ考えていませんが、一応statusも持たせておきました。
{
"timestamp": "2024-09-08T12:00:00.000+09:00",
"sensors": [
{
"id": 1,
"value": 1.23,
"status": "OK"
},
{
"id": 2,
"value": 1.23,
"status": "OK"
},
{
"id": 3,
"value": 1.23,
"status": "Error"
},
{
"id": 4,
"value": 1.23,
"status": "OK"
}
]
}
- 記録データ構造
hdf5で記録します。階層構造は適当に決めました。
/ (ルートグループ)
└── sensor_data (グループ)
├── timestamp (可変長文字列の1次元データセット)
├── sensor1 (1次元データセット)
├── sensor2 (1次元データセット)
├── sensor3 (1次元データセット)
└── sensor4 (1次元データセット)
後述の解析結果もhdf5に保存し、以下の階層構造としました。
/ (ルートグループ)
└── 2025-04-29_13_41_11_000 (グループ)
├── timestamp (可変長文字列の1次元データセット)
├── loadcell_values (2次元データセット 4,データ数)
├── tare_offset (2次元データセット 4,データ数)
├── 属性: event_start_index (スカラー: int)
├── 属性: event_end_index (スカラー: int)
├── 属性: diff_load (スカラー: float)
└── 属性: cat_weight (スカラー: float)
5-2. Flask ローカルサーバへのデータ送信
ローカルのPCにFlaskを使ってサーバを建て、マイコンからjsonをPOSTして、受信したjsonデータはhdf5に記録する方式を取りました。
from flask import Flask, request, jsonify
import sqlite3
from datetime import datetime
import pandas as pd
import numpy as np
import h5py
app = Flask(__name__)
def write_to_hdf5(timestamp, sensors):
"""
センサー生値を記録する
"""
# 1行分のデータを作成
new_data = {
"timestamp": [timestamp],
"sensor1": [sensors[0]["value"]],
"sensor2": [sensors[1]["value"]],
"sensor3": [sensors[2]["value"]],
"sensor4": [sensors[3]["value"]],
}
new_df = pd.DataFrame(new_data)
hdf5_file = "sensor_data.h5"
new_df.to_hdf(hdf5_file, key="sensor_data", mode="a", format="table", append=True)
@app.route("/post_data", methods=["POST"])
def post_data():
try:
# リクエストからJSONデータを取得
json_data = request.get_json()
print(json_data)
timestamp = json_data["timestamp"]
sensors = json_data["sensors"]
datetime_obj = datetime.fromisoformat(timestamp)
datetime_str = datetime_obj.strftime("%Y/%m/%d %H:%M:%S.%f")[:-3]
# データベースにデータを書き込む
write_to_hdf5(datetime_str, sensors) # センサー生値の記録
return jsonify({"status": "success"}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 400
if __name__ == "__main__":
app.run(host="192.168.2.204", port=5000, debug=True)
ローカルPCへのPOSTは約4msで、目標とする1Hzに対して十分速いです。
5-3. マイコンプログラム
M5StampS3に書き込むプログラムを書きます。IDEはArduino IDEではなくVisual Studio Code + Platformio使ってます。
WiFi接続して時刻同期、HX711からロードセルの値を読み、jsonにつめてPOSTします。
参考
Qiita: ESP8266のntpの設定は1行で
github: olkal/HX711_ADC
Qiita: ArduinoJson:ネストした配列に要素を追加する
#include <Arduino.h>
#include <WiFi.h>
#include <time.h>
#include <FastLED.h>
#include <HX711_ADC.h>
#include <ArduinoJson.h>
#include <HTTPClient.h>
//データ送信先のURL
const char* serverUrl = "http://192.168.3.30:5000/post_data"; // 送信先サーバーのURL
//const char* serverUrl = "https://script.google.com/macros/s/hogehoge/exec";
// WiFi接続設定
const char* ssid = "SSID";
const char* password = "PASSWORD";
//LED
const static uint8_t PIN_LED = 21; // 本体フルカラーLEDの使用端子(G21)
const static uint8_t NUM_LEDS = 1; // 本体フルカラーLEDの数
CRGB leds[NUM_LEDS]; // FastLEDで制御するLEDの数を指定して使用する準備
// ピンアウト
const uint8_t HX711_dout_1 = 7;
const uint8_t HX711_dout_2 = 13;
const uint8_t HX711_dout_3 = 1;
const uint8_t HX711_dout_4 = 44;
const uint8_t HX711_sck_1 = 5;
const uint8_t HX711_sck_2 = 15;
const uint8_t HX711_sck_3 = 3;
const uint8_t HX711_sck_4 = 43;
const uint8_t load_cell_num = 4; //ロードセル個数
//HX711 constructor (dout pin, sck pin)
HX711_ADC LoadCell_1(HX711_dout_1, HX711_sck_1); //HX711 1
HX711_ADC LoadCell_2(HX711_dout_2, HX711_sck_2); //HX711 2
HX711_ADC LoadCell_3(HX711_dout_3, HX711_sck_3); //HX711 3
HX711_ADC LoadCell_4(HX711_dout_4, HX711_sck_4); //HX711 4
float loadCell_values[load_cell_num] = {0.0};
static bool newDataReady = false;
uint32_t current_time = millis();
uint32_t last_post_time = millis();
const uint32_t post_rate_ms = 1000;
void led_setup(){
FastLED.addLeds<WS2812B, PIN_LED, GRB>(leds, NUM_LEDS); // LED型式、使用端子、LED数
// LED初期点灯色
leds[0] = CRGB(0, 0, 100);
FastLED.show();
}
void show_led(uint8_t r,uint8_t g,uint8_t b){
leds[0] = CRGB(r, g, b);
FastLED.show();
}
void init_loadCells(){
// HX711
float calibrationValue_1 = 209.2094; // calibration value load cell 1
float calibrationValue_2 = 206.5882; // calibration value load cell 2
float calibrationValue_3 = 195.6042; // calibration value load cell 3
float calibrationValue_4 = 212.2887; // calibration value load cell 4
LoadCell_1.begin();
LoadCell_2.begin();
LoadCell_3.begin();
LoadCell_4.begin();
unsigned long stabilizingtime = 2000; // 風袋引き前の静定時間
boolean _tare = true; //風袋引きしない場合はfalseにする
//ロードセルの準備完了判定用
byte loadcell_1_rdy = 0;
byte loadcell_2_rdy = 0;
byte loadcell_3_rdy = 0;
byte loadcell_4_rdy = 0;
while ((loadcell_1_rdy + loadcell_2_rdy + loadcell_3_rdy + loadcell_4_rdy) < load_cell_num) { //起動, 安定化, 風袋引きを同時に実行する
if (!loadcell_1_rdy) loadcell_1_rdy = LoadCell_1.startMultiple(stabilizingtime, _tare);
if (!loadcell_2_rdy) loadcell_2_rdy = LoadCell_2.startMultiple(stabilizingtime, _tare);
if (!loadcell_3_rdy) loadcell_3_rdy = LoadCell_3.startMultiple(stabilizingtime, _tare);
if (!loadcell_4_rdy) loadcell_4_rdy = LoadCell_4.startMultiple(stabilizingtime, _tare);
}
if (LoadCell_1.getTareTimeoutFlag()) {
USBSerial.println("Timeout, check MCU>HX711 no.1 wiring and pin designations");
}
if (LoadCell_2.getTareTimeoutFlag()) {
USBSerial.println("Timeout, check MCU>HX711 no.2 wiring and pin designations");
}
if (LoadCell_3.getTareTimeoutFlag()) {
USBSerial.println("Timeout, check MCU>HX711 no.3 wiring and pin designations");
}
if (LoadCell_4.getTareTimeoutFlag()) {
USBSerial.println("Timeout, check MCU>HX711 no.4 wiring and pin designations");
}
LoadCell_1.setCalFactor(calibrationValue_1); // user set calibration value (float)
LoadCell_2.setCalFactor(calibrationValue_2); // user set calibration value (float)
LoadCell_3.setCalFactor(calibrationValue_3); // user set calibration value (float)
LoadCell_4.setCalFactor(calibrationValue_4); // user set calibration value (float)
USBSerial.println("Startup is complete");
}
void get_data_from_loadCells(){
// check for new data/start next conversion:
if (LoadCell_1.update()) newDataReady = true;
LoadCell_2.update();
LoadCell_3.update();
LoadCell_4.update();
//get smoothed value from data set
if ((newDataReady)) {
loadCell_values[0] = LoadCell_1.getData();
loadCell_values[1] = LoadCell_2.getData();
loadCell_values[2] = LoadCell_3.getData();
loadCell_values[3] = LoadCell_4.getData();
USBSerial.print("Load_cell output val: ");
for (int i = 0; i < load_cell_num; i++) {
USBSerial.print(loadCell_values[i]);
USBSerial.print(", ");
}
USBSerial.println("");
newDataReady = false;
}
}
void init_wifi(){
WiFi.begin(ssid, password);
while (WiFi.status() != WL_CONNECTED) {
delay(1000);
USBSerial.println("Connecting to WiFi...");
}
}
String getISO8601Time() {
time_t now;
struct tm timeinfo;
char isoTime[30];
if (!getLocalTime(&timeinfo)) {
delay(3000);
}
unsigned long ms = millis() % 1000;
strftime(isoTime, sizeof(isoTime), "%Y-%m-%dT%H:%M:%S", &timeinfo);
sprintf(isoTime + strlen(isoTime), ".%03lu+0900", ms);
return String(isoTime);
}
String pack_json_data() {
JsonDocument jsonDoc;
String isoTime = getISO8601Time();
jsonDoc["timestamp"] = isoTime;
JsonArray sensors = jsonDoc.createNestedArray("sensors");
for (int i = 0; i < load_cell_num; i++) {
JsonObject sensor = sensors.createNestedObject();
sensor["id"] = i + 1;
sensor["value"] = loadCell_values[i];
sensor["status"] = "OK"; //いつか死活監視書く
}
String jsonString;
serializeJson(jsonDoc, jsonString);
return jsonString;
}
void postRequest(const String& jsonString) {
if (WiFi.status() == WL_CONNECTED) {
//USBSerial.print("IP Address: ");
//USBSerial.println(WiFi.localIP());
HTTPClient http;
http.begin(serverUrl);
http.addHeader("Content-Type", "application/json");
int httpResponseCode = http.POST(jsonString);
if (httpResponseCode == 200) {
String response = http.getString();
USBSerial.println("Response code: " + String(httpResponseCode));
USBSerial.println("Response: " + response);
show_led(0,100,0);
} else {
USBSerial.println("Error code: " + String(httpResponseCode));
USBSerial.println(http.errorToString(httpResponseCode));
show_led(100,0,0);
}
http.end();
} else {
USBSerial.println("Error: WiFi not connected");
}
}
void setup() {
USBSerial.begin(115200);
delay(1000);
led_setup();
init_wifi();
configTzTime("JST-9", "ntp.nict.jp", "ntp.jst.mfeed.ad.jp");
init_loadCells();
}
void loop() {
get_data_from_loadCells(); //読み出したデータはloadCell_values配列に保存
current_time = millis();
// 約49日に一度millis()はオーバーフローするのでcurrent_time < last_post_timeの条件を書いてある
if (current_time - last_post_time > post_rate_ms || (current_time < last_post_time && (UINT32_MAX - last_post_time + current_time > post_rate_ms))){
String jsonString = pack_json_data(); //jsonにタイムスタンプ, ロードセルのデータを詰める
postRequest(jsonString); //サーバにデータ送信
last_post_time = millis();
}
}
[env:m5stack-stamps3]
platform = espressif32
board = m5stack-stamps3
framework = arduino
monitor_speed = 115200
lib_deps =
fastled/FastLED@^3.7.6
olkal/HX711_ADC@^1.2.12
bblanchon/ArduinoJson@^7.1.0
受信するとばらつきますが1.4Hzほどです。
6. 荷重時系列分析
時系列分析というほど大したことはしませんが、計測したデータをプロットして
- 体重
- 排尿頻度
- 排尿量
- 排便頻度
- 排便量
- トイレ滞在時間
を計測するアルゴリズムを検討し、実装とテストを行います。
ルールベースで行ける気がするので機械学習は使いません。
システムが取りうる状態を列挙し、それぞれを区別する方法を考えます。
6-1. 状態
状態はこんなところです。
大まかに区別したいのは猫が乗ったのか、トイレを掃除のために動かしてまた乗せたのか、猫砂補充したのかで、ロードセルにかかる荷重に対しての閾値と、閾値を超える時間の長さを見ればなんとかなりそうです。
-
無負荷(定常, トイレも載っていない)
4つのロードセルはすべて0kg付近を示す -
トイレのみ(定常)
安定した出力を示す -
トイレ清掃中(過渡)
出力が少し振れる -
猫砂補充(過渡→定常)
出力が1kg以内程度で増加していく。その後は安定した出力になる。 -
猫がトイレに入る(過渡)
猫の体重分の増加が短い時間で起こる -
猫がトイレにいる(過渡)
トイレのみ+猫の体重の出力である程度振れる。最大でも3分程度しか滞在しない -
猫がトイレから出る(過渡)
猫の体重分の減少が短い時間で起こる -
トイレにうんこ・おしっこがある(定常)
猫がトイレに乗る前の重量+◯gになっている
6-2. アルゴリズムの実装
Jupyter lab上でアルゴリズムを実装し、テストします。
基本的には、重量が閾値を超えている区間の時間で、猫が乗っているのか猫砂やトイレの再設置かを判断します。適宜ロードセルのドリフトがあるので風袋引きします。
猫の体重と、トイレ時間と、
また、猫が乗っている間の前後の荷重も記録しておき、乗る前と乗った後の差分も記録します。
import numpy as np
from datetime import datetime
class CatLoadCell:
"""
猫トイレのロードセルから送信されてくる値を格納し、
・バッファに保存
・指定区間で静止状態を判定し、静止状態であれば風袋引きのためのオフセット値を更新する
・重量がしきい値を超える、下回るイベントを記録
・重量増加イベントがあったとき、
・猫がトイレの場合、継続時間が5分未満に荷重が下がるイベントを起こす
・猫砂補充の場合、継続時間が5分以上経っても荷重が下がらない
・イベントの前後のバッファも含めて返す。後でデータが溜まってきたらアルゴリズムを再考するため
・重心計算
tare_threshold : float 指定区間の標準偏差がこの値を下回れば静止しているとみなす
trigger_threshold : float 重量がこの値を超えれば猫が載っていると判断
"""
def __init__(self, buffer_length=3600): # 1時間×3600秒/h / 1Hz
# タイムスタンプ
self.prev_unix_timestamp = None
self.unix_timestamp_buffer = np.zeros(buffer_length) # UNIX時間で保持 [-1]側が最新
# ロードセル
self.loadcell_buffer = np.zeros((buffer_length, 4)) # 4つのロードセル
self.tare_offset = np.zeros(4,)
self.tare_offset_buffer = np.zeros((buffer_length, 4)) # 風袋引きのためのオフセット値
self.tare_threshold = 3.0 # 単位:g 指定区間の荷重のばらつき(標準偏差)がこの値以下であれば静止とみなす
self.success_tare = False # 風袋引き完了
self.ave_num = 3 # 重量変化をみるときに平滑化する点数
self.last_tare_time = 0.0 # 最後に風袋引きしたUNIX時間
self.max_last_tare_time = 60.0 * 10.0 # この時間に一度は風袋引きする
# イベント関連
self.trigger_loadup_threshold = 1000.0 # 単位:g 重量がこの値を超えたら猫が乗ったか猫砂補充と判断
self.trigger_loaddown_threshold = 500.0 # 単位:g 重量がこの値を下回ったら猫が降りたと判断
self.start_load = np.zeros(4) # 単位:g 猫が乗る前の荷重
self.end_load = np.zeros(4) # 単位:g 猫が乗ったあとの荷重
self.event_start_unix_timestamp = 0.0 # イベントスタートUNIX時刻
self.event_end_unix_timestamp = 0.0 # イベントエンドUNIX時刻
self.event_active_time = 0.0 # 単位:sec イベント継続時間
self.max_event_length = 5*60 # 単位:sec 猫がトイレに乗っていると考えられる最大時間 これ以上イベントが継続したら、猫砂補充と考える
self.settling_time = 30.0 # 単位:sec イベント終了からの静定時間
self.event_active = False # イベントフラグ
self.settling_active = False # イベント終了後の静定フラグ
self.sent_ranges = [0.0,0.0] # (start_time, end_time) のリスト
self.pre_buffer_time = 30.0 # 単位:sec イベント開始フラグの何秒前までcurrent_event_buffに追加するか
self.current_event_buff = {"timestamp":[], "loadcell_values":[],"tare_offset":[], "event_start_index":0, "event_end_index":0, "diff_load":0.0, "cat_weight":0.0, "toilet_time":0.0} # イベントフラグがONになっている間のデータ
# ロードセルの幾何配置 (単位 mm) センサー番号と実際の配置で調整
self.sensor_positions = {
1: (0, 0), # 左上
0: (600, 0), # 右上
2: (0, 400), # 左下
3: (600, 400), # 右下
}
self.current_unix = 1746098044.512 # 今のUNIX時刻
def add_sample_and_check_event(self, timestamp, loadcell_values):
"""
1サンプル追加と重量変化の監視
timestamp : str %Y/%m/%d %H:%M:%S.%f
loadcell_values : numpy array (4,)
event_buff : None or list[()]
"""
#self.current_unix += 1.0 # デバッグ時用
self.current_unix = self.__str_to_unix(timestamp)
self.unix_timestamp_buffer[:-1] = self.unix_timestamp_buffer[1:]
self.unix_timestamp_buffer[-1] = self.current_unix
self.loadcell_buffer[:-1] = self.loadcell_buffer[1:]
self.loadcell_buffer[-1] = loadcell_values
self.tare_offset_buffer[:-1] = self.tare_offset_buffer[1:]
self.tare_offset_buffer[-1] = self.tare_offset
event_buff = self.__check_for_event()
return event_buff
def __str_to_unix(self, t_str: str) -> float:
"""2025/04/28 22:08:25.730のようなフォーマットからUNIX時間へ変換"""
fmt = "%Y/%m/%d %H:%M:%S.%f"
dt = datetime.strptime(t_str, fmt)
unix_time = dt.timestamp()
return unix_time
def __unix_to_str(self, unix_time: float) -> str:
"""UNIX時間(float)を 'YYYY/MM/DD HH:MM:SS.sss' フォーマットに変換"""
dt = datetime.fromtimestamp(unix_time)
t_str = dt.strftime("%Y/%m/%d %H:%M:%S.%f")[:-3] # 小数点以下3桁にする
return t_str
def __find_timestamp_indices_in_range(self, start_time, end_time):
"""
タイムスタンプ配列から、指定時間の範囲を切り出す。
最新の時刻を基準に指定
"""
latest_time = self.unix_timestamp_buffer[-1]
if latest_time == 0.0: # バッファがまだ初期値のままのとき
return None, None
t1 = latest_time + start_time
t2 = latest_time + end_time
# 指定範囲のインデックスリスト
indices = np.where((t1 <= self.unix_timestamp_buffer) & (self.unix_timestamp_buffer <= t2))[0]
if len(indices) == 0: # 指定範囲のタイムスタンプがないとき
return None, None
start_idx = indices[0]
end_idx = indices[-1]
return start_idx, end_idx + 1 # Numpy array[start_idx:end_idx]で指定範囲が得られるように+1
def __perform_tare(self, start_time=-15.0, end_time=-10.0):
"""指定区間の標準偏差を確認し、静止していたらオフセット設定"""
start_idx, end_idx = self.__find_timestamp_indices_in_range(start_time, end_time)
if start_idx is None or end_idx is None: # 指定範囲のデータがないとき
return False
samples_array = self.loadcell_buffer[start_idx:end_idx]
if np.all(np.std(samples_array, axis=0) < self.tare_threshold): # 全部静止していたら更新
self.tare_offset = np.mean(samples_array, axis=0)
self.last_tare_time = self.unix_timestamp_buffer[-1] # 風袋引きした時刻を記録しておき、self.max_length_tare_timeを超えて風袋引きされてなかったらする
return True
return False # 更新以外はFalse
def get_corrected_value(self):
"""最新データをオフセット補正して取得"""
str_timestamp = self.__unix_to_str(self.unix_timestamp_buffer[-1])
corrected_loadcell_values = self.loadcell_buffer[-1] - self.tare_offset
return str_timestamp, corrected_loadcell_values
def calculate_centroid(self, values):
"""ロードセル値から重心位置を計算"""
total_weight = np.sum(values)
if total_weight == 0.0: # 合計重量が0.0のとき、重心は幾何中心にあるものとする
values = [1,1,1,1]
total_weight = 4
x = np.sum(values[i] * self.sensor_positions[i][0] for i in range(4)) / total_weight
y = np.sum(values[i] * self.sensor_positions[i][1] for i in range(4)) / total_weight
return x, y
def __check_for_event(self):
"""変化検出+イベントデータ記録"""
return_event_buff = None # 猫が乗ったイベントがあったとき、タイムスタンプやロードセルの値を入れた辞書を返す それ以外はNone
# 現在のロードセル値
current_load = np.sum(np.mean(self.loadcell_buffer[-self.ave_num:], axis=0)) # ロードセル生値
corrected_current_load = np.sum(np.mean(self.loadcell_buffer[-self.ave_num:] - self.tare_offset, axis=0)) # 風袋引き後のロードセル値
current_loadcell_values = self.loadcell_buffer[-1]
unix_timestamp = self.unix_timestamp_buffer[-1]
# 静定が失敗した場合はもう1ループ
if not self.success_tare:
self.success_tare = self.__perform_tare(start_time=-15.0,end_time=-5.0) # 風袋引き
return None
# 前回の風袋引きからself.max_last_tare_time経っていたら実行
if unix_timestamp - self.last_tare_time > self.max_last_tare_time:
self.success_tare = self.__perform_tare(start_time=-15.0,end_time=-5.0) # 風袋引き
# イベント未発生時の処理
if not self.event_active:
if self.settling_active: # 静定中
self.__append_event_buff(unix_timestamp, current_loadcell_values, self.tare_offset) # イベントバッファにイベント中のタイムスタンプ、風袋引き後のロードセル値、風袋引きオフセット値を追加
# 静定完了
if unix_timestamp - self.event_end_unix_timestamp > self.settling_time:
self.settling_active = False
self.end_load = np.mean(self.loadcell_buffer[-5:], axis=0) # 5サンプル前から最新までの5点平均とする
return_event_buff = self.__handle_event() # イベント種別判定
return return_event_buff
# イベント発生
if corrected_current_load > self.trigger_loadup_threshold: # イベント発生 重量がしきい値を超えている状態
self.settling_active = False # 静定中にイベント発生してしまったら、そのイベントは諦める
# バッファのクリア
self.__reset_event_buff()
self.event_active_time = 0.0
self.start_load = np.mean(self.loadcell_buffer[-15:-10], axis=0) # 15サンプル前から5点平均とする
self.event_active = True # イベント開始
self.event_start_unix_timestamp = unix_timestamp
# イベント発生中の処理
if self.event_active:
if corrected_current_load < self.trigger_loaddown_threshold: # イベント終了 イベント発生中かつ重量がしきい値を下回るとき
self.event_active = False
self.settling_active = True # 静定開始
self.event_active_time = 0.0
self.event_end_unix_timestamp = unix_timestamp
self.__append_event_buff(unix_timestamp, current_loadcell_values, self.tare_offset)
else: # イベント継続
self.__append_event_buff(unix_timestamp, current_loadcell_values, self.tare_offset)
self.event_active_time = unix_timestamp - self.event_start_unix_timestamp #イベント継続時間を計算
# イベント終了 継続時間が最大値を超えたとき 猫砂補充など
if self.event_active_time > self.max_event_length:
self.success_tare = self.__perform_tare(start_time=-10.0,end_time=-0.0) # 風袋引き
self.event_active = False
self.settling_active = False
self.event_active_time = 0.0
self.__reset_event_buff()
return return_event_buff
def __handle_event(self):
"""イベント完了後、未送信なら返す"""
if len(self.current_event_buff["timestamp"]) == 0: # イベントバッファが空
return None
start_time = self.__str_to_unix(self.current_event_buff["timestamp"][0])
end_time = self.__str_to_unix(self.current_event_buff["timestamp"][-1])
sent_s, send_e = self.sent_ranges
if sent_s <= start_time and end_time <= send_e:
# すでに送信済み
self.__reset_event_buff()
return None
# 新しい未送信イベント
self.sent_ranges = [start_time, end_time]
self.__post_process() # イベント開始前の一定期間をバッファに追加する。猫が乗っている最中のインデックスを記録する。乗る前と乗った後の重量差を記録する。乗っている最中の重量を記録する。
event_copy = self.current_event_buff.copy()
self.__reset_event_buff()
return event_copy
def __reset_event_buff(self):
self.current_event_buff["timestamp"] = []
self.current_event_buff["loadcell_values"] = []
self.current_event_buff["tare_offset"] = []
self.current_event_buff["event_start_index"] = 0
self.current_event_buff["event_end_index"] = 0
self.current_event_buff["diff_load"] = 0.0
self.current_event_buff["cat_weight"] = 0.0
self.current_event_buff["toilet_time"] = 0.0
def __append_event_buff(self, unix_timestamp, loadcell_values, tare_offset):
self.current_event_buff["timestamp"].append(self.__unix_to_str(unix_timestamp))
self.current_event_buff["loadcell_values"].append(np.copy(loadcell_values))
self.current_event_buff["tare_offset"].append(np.copy(tare_offset))
def __post_process(self):
"""呼び出し時点からself.pre_buffer_time分だけ前の値をバッファに格納する 最後に一回だけ呼び出す"""
# イベント開始-pre_buffer_timeからイベント開始までの期間のデータを格納する
mask = np.where((self.event_start_unix_timestamp - self.pre_buffer_time < self.unix_timestamp_buffer) & (self.unix_timestamp_buffer < self.event_start_unix_timestamp))
prev_unix_timestamps = self.unix_timestamp_buffer[mask]
prev_loadcellValues = self.loadcell_buffer[mask]
prev_tareOffset_buffer = self.tare_offset_buffer[mask]
prev_str_timestamps = list(map(self.__unix_to_str, prev_unix_timestamps)) # UNIX→str
# イベント開始前の一定期間のデータを追加する
self.current_event_buff["timestamp"] = prev_str_timestamps + self.current_event_buff["timestamp"]
self.current_event_buff["loadcell_values"] = list(prev_loadcellValues) + self.current_event_buff["loadcell_values"]
self.current_event_buff["tare_offset"] = list(prev_tareOffset_buffer) + self.current_event_buff["tare_offset"]
# 猫が乗っている最中のインデックスを記録する
current_event_str_timestamps = np.array(list(map(self.__str_to_unix, self.current_event_buff["timestamp"])))
in_event_idx = np.where((self.event_start_unix_timestamp <= current_event_str_timestamps)&(current_event_str_timestamps<=self.event_end_unix_timestamp))
event_start_idx = np.min(in_event_idx)
event_end_idx = np.max(in_event_idx) - self.ave_num
self.current_event_buff["event_start_index"] = event_start_idx
self.current_event_buff["event_end_index"] = event_end_idx
# 猫がトイレしてる時間を記録する
self.current_event_buff["toilet_time"] = self.event_end_unix_timestamp - self.event_start_unix_timestamp
# 猫が乗る前と乗った後の荷重の差を記録する
self.current_event_buff["diff_load"] = np.sum( np.mean(self.current_event_buff["loadcell_values"][-2:-1],axis=0) - np.mean(self.current_event_buff["loadcell_values"][0:2],axis=0) )
# 猫の体重を記録する
corrected_load = np.array(self.current_event_buff["loadcell_values"][event_start_idx:event_end_idx]) - np.array(self.current_event_buff["tare_offset"][event_start_idx:event_end_idx])
self.current_event_buff["cat_weight"] = np.sum(np.mean(corrected_load,axis=0))
if __name__ == "__main__":
pass
6-3. 動作確認
動作テストのためにしばらく稼働させました。猫が乗ったときの実際のデータです。乗った後にゼロ点からかなりずれてます。ロードセルがズレたか、設置場所が狭く壁に触れているかです。
作成したプログラムで風袋引きされた結果はこれです。乗ってしばらくは風袋引きしないようにしてあるので不自然な重量変化があります。
抽出されたイベント時の時系列データです。乗る前と乗った後の重量差は、一回目のイベントは-51.3g, 2回目は9.54gでした。プロットを見てもわかりますが、重量がマイナスになっちゃってますね・・・
精度向上が課題です。
猫の体重は4106.2g, 4194.1と計測できたので、比例係数は後で調整いりそうですが、概ね問題なさそうです。
トイレ時間は50.0秒, 143.7秒でした。おしっこかうんちかの区別はトイレ時間で簡単にわかりそうですね。
7. ウェブアプリの作成
何がしたいかChatGPTに伝えたらだいたい全部やってくれました。すごいですね。
出来上がったものはこちら
時々おかしな値になったとき、なぜおかしいのか見るために直近のイベント時の重量変化もプロットするようにしました。始めはJupyterでグラフ書いて確認すればいいやと思っていましたが、こうやって勝手にグラフができてくれると確認が楽でいいです。
from flask import Flask, request, jsonify
import sqlite3
from datetime import datetime
import pandas as pd
import numpy as np
import h5py
from cat_detect import CatLoadCell
app = Flask(__name__)
catLoadCell = CatLoadCell()
def write_to_hdf5_result_h5py(result_dict, hdf5_file="result_data.h5"):
# グループ名を "2025-04-29_13_41_11_000" 形式に変換
timestamp0 = result_dict["timestamp"][0]
group_name = timestamp0.replace("/", "-").replace(" ", "_").replace(":", "_").replace(".", "_")
with h5py.File(hdf5_file, "a") as f:
if group_name in f:
print(f"Group {group_name} already exists. Overwriting.")
grp = f.require_group(group_name)
# 可変長文字列型
str_dtype = h5py.string_dtype(encoding='utf-8')
grp.create_dataset("timestamp", data=result_dict["timestamp"], dtype=str_dtype)
grp.create_dataset("loadcell_values", data=np.array(result_dict["loadcell_values"]))
grp.create_dataset("tare_offset", data=np.array(result_dict["tare_offset"]))
# スカラー値を属性として保存
grp.attrs["event_start_index"] = result_dict["event_start_index"]
grp.attrs["event_end_index"] = result_dict["event_end_index"]
grp.attrs["diff_load"] = result_dict["diff_load"]
grp.attrs["cat_weight"] = result_dict["cat_weight"]
@app.route("/post_data", methods=["POST"])
def post_data():
try:
# リクエストからJSONデータを取得
json_data = request.get_json()
print(json_data)
timestamp = json_data["timestamp"]
sensors = json_data["sensors"]
datetime_obj = datetime.fromisoformat(timestamp)
datetime_str = datetime_obj.strftime("%Y/%m/%d %H:%M:%S.%f")[:-3]
loadcell_values = np.array([sensors[0]["value"],sensors[1]["value"],sensors[2]["value"],sensors[3]["value"]])
event_buff = catLoadCell.add_sample_and_check_event(datetime_str, loadcell_values)
if event_buff is not None:
write_to_hdf5_result_h5py(event_buff)
return jsonify({"status": "success"}), 200
except Exception as e:
return jsonify({"status": "error", "message": str(e)}), 400
if __name__ == "__main__":
app.run(host="192.168.2.204", port=5000, debug=True)
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse, JSONResponse
from fastapi.templating import Jinja2Templates
import sqlite3
import pandas as pd
import h5py
from datetime import datetime, timedelta
import math
app = FastAPI(debug=True)
templates = Jinja2Templates(directory="templates")
def safe_float(value):
return 0.0 if isinstance(value, float) and math.isnan(value) else float(value)
# --- 時系列データJSONエンドポイント ---
@app.get("/api/graph_data", response_class=JSONResponse)
async def graph_data_api():
df = pd.read_hdf("sensor_data.h5", key="sensor_data",start=-100)
# タイムスタンプをDatetime型に変換
df["timestamp"] = pd.to_datetime(df["timestamp"])
# データを返却
return df.to_dict(orient="records")
# --- 解析結果JSONエンドポイント ---
@app.get("/api/event_data", response_class=JSONResponse)
async def get_event_data(start_idx: int = 0, end_idx: int = None):
"""
最新順で start_idx ~ end_idx の範囲のイベントを読み出す。
例: start_idx=0, end_idx=5 → 最新5件
"""
hdf5_file = "result_data.h5"
with h5py.File(hdf5_file, "r") as f:
group_names = sorted(f.keys(), reverse=False)
if end_idx is None:
end_idx = len(group_names)
selected_groups = group_names[start_idx:end_idx]
results = []
for group_name in selected_groups:
grp = f[group_name]
result = {
"group_name": group_name,
"timestamp": [str(t.decode()) if isinstance(t, bytes) else str(t) for t in grp["timestamp"][()]],
"loadcell_values": grp["loadcell_values"][()].tolist(),
"tare_offset": grp["tare_offset"][()].tolist(),
"event_start_index": int(grp.attrs["event_start_index"]),
"event_end_index": int(grp.attrs["event_end_index"]),
"diff_load": safe_float(grp.attrs["diff_load"]),
"cat_weight": safe_float(grp.attrs["cat_weight"])
}
results.append(result)
return results
# --- 重心位置JSONエンドポイント ---
@app.get("/api/centroid_data", response_class=JSONResponse)
async def centroid_data_api():
df = pd.read_hdf("sensor_data.h5", key="sensor_data")
# 重心計算用定数: センサーの座標配置 (単位: cm)
coordinates = {
"sensor1": {"x": 0, "y": 0}, # 左上
"sensor2": {"x": 100, "y": 0}, # 右上
"sensor3": {"x": 0, "y": 50}, # 左下
"sensor4": {"x": 100, "y": 50}, # 右下
}
centroid_data = [] # 重心位置を保存するリスト
row = df.iloc[-1]
total_weight = row["sensor1"] + row["sensor2"] + row["sensor3"] + row["sensor4"]
x_center = (
row["sensor1"] * coordinates["sensor1"]["x"]
+ row["sensor2"] * coordinates["sensor2"]["x"]
+ row["sensor3"] * coordinates["sensor3"]["x"]
+ row["sensor4"] * coordinates["sensor4"]["x"]
) / total_weight
y_center = (
row["sensor1"] * coordinates["sensor1"]["y"]
+ row["sensor2"] * coordinates["sensor2"]["y"]
+ row["sensor3"] * coordinates["sensor3"]["y"]
+ row["sensor4"] * coordinates["sensor4"]["y"]
) / total_weight
centroid_data.append({
"timestamp": row["timestamp"],
"x_center": x_center,
"y_center": y_center
})
return centroid_data
############################### ページ ######################################################
# --- 解析結果ページ ---
@app.get("/result", response_class=HTMLResponse)
async def result_page(request: Request):
return templates.TemplateResponse("result.html", {"request": request})
# --- 時系列グラフページ ---
@app.get("/graph", response_class=HTMLResponse)
async def graph_page(request: Request):
return templates.TemplateResponse("graph.html", {"request": request})
# --- ホームページ ---
@app.get("/", response_class=HTMLResponse)
async def home_page(request: Request):
return templates.TemplateResponse("home.html", {"request": request})
if __name__ == "__main__":
import uvicorn
uvicorn.run("webapp:app", host="0.0.0.0", port=8001, reload=True)
<!DOCTYPE html>
<html lang="ja">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>猫の体重の時系列</title>
<script src="https://cdn.plot.ly/plotly-2.27.0.min.js"></script>
</head>
<body>
<h1>猫の体重の時系列</h1>
<div id="cat_weight_graph" style="height: 400px;"></div>
<div id="diff_load_graph" style="height: 300px;"></div>
<div id="net_weight_graph" style="height: 400px;"></div>
<script>
async function updateGraph() {
const res = await fetch("/api/event_data");
const allEvents = await res.json(); // allEvents: [event1, event2, ..., eventN]
// 各イベントの代表値を抽出(最初のtimestampを使う)
// const timestamps = allEvents.map(event => event.timestamp[0]); // timestampは配列
const timestamps = allEvents.map(event =>
new Date(event.timestamp[0]).toISOString()
);
const catWeights = allEvents.map(event => event.cat_weight);
const diffLoads = allEvents.map(event => event.diff_load);
// --- loadcell_values と tare_offset を抽出 ---
const rawLoadcellMatrix = allEvents.map(event => event.loadcell_values);
const tareOffsetMatrix = allEvents.map(event => event.tare_offset);
const sensorCount = 4;
const elapsedSeconds = []; // 横軸: 経過秒数
const netValuesPerSensor = Array.from({ length: sensorCount }, () => []);
const event = allEvents[allEvents.length - 1]; // 最新1件だけを取得
const loadcellValues = event.loadcell_values; // (65, 4)
const tareOffset = event.tare_offset; // (65, 4)
const timeArray = event.timestamp;
// 開始時刻を基準に経過秒を計算
const baseTime = new Date(timeArray[0]);
for (let i = 0; i < loadcellValues.length; i++) {
const currentTime = new Date(timeArray[i]);
const deltaSec = (currentTime - baseTime) / 1000; // 秒に変換
elapsedSeconds.push(deltaSec);
for (let sensorIdx = 0; sensorIdx < sensorCount; sensorIdx++) {
const net = loadcellValues[i][sensorIdx] - tareOffset[i][sensorIdx];
netValuesPerSensor[sensorIdx].push(net);
}
}
const traces = [];
for (let sensorIdx = 0; sensorIdx < sensorCount; sensorIdx++) {
traces.push({
x: elapsedSeconds,
y: netValuesPerSensor[sensorIdx],
type: 'scatter',
mode: 'lines+markers',
name: `センサー${sensorIdx + 1} 正味重量 (g)`
});
}
// --- 合計正味重量の算出 ---
const totalNetValues = [];
for (let i = 0; i < loadcellValues.length; i++) {
let total = 0;
for (let sensorIdx = 0; sensorIdx < sensorCount; sensorIdx++) {
total += netValuesPerSensor[sensorIdx][i];
}
totalNetValues.push(total);
}
traces.push({
x: elapsedSeconds,
y: totalNetValues,
type: 'scatter',
mode: 'lines+markers',
name: '合計正味重量 (g)',
line: { color: 'gray', width: 3, dash: 'dot' }
});
const weightTrace = {
x: timestamps,
y: catWeights,
type: 'scatter',
mode: 'lines+markers',
name: '猫の体重 (g)',
line: { color: 'blue' }
};
const diffTrace = {
x: timestamps,
y: diffLoads,
type: 'scatter',
mode: 'lines+markers',
name: '重量差分 (g)',
line: { color: 'green' }
};
Plotly.newPlot("cat_weight_graph", [weightTrace], {
title: "猫の体重の時系列変化",
xaxis: {
title: "時間",
type: "date"
},
yaxis: { title: "体重 (g)" }
});
Plotly.newPlot("diff_load_graph", [diffTrace], {
title: "重量差分の時系列変化",
xaxis: {
title: "時間",
type: "date"
},
yaxis: { title: "重量 (g)" }
});
// 正味重量グラフ(新規に追加)
Plotly.newPlot("net_weight_graph", traces, {
title: "直近のイベント時の正味重量の時系列変化",
xaxis: { title: "時間" },
yaxis: { title: "重量 (g)" }
});
}
updateGraph();
setInterval(updateGraph, 10000); // 10秒ごとに更新
</script>
</body>
</html>
8. まとめ
猫の成長記録や病気発見を目的として、猫トイレにロードセルを取り付け、ルールベースの処理によって体重、排尿・排便量、頻度、トイレ滞在時間が計測できるシステムを製作しました。
今後の課題として、誤検知データの削除や、排尿・排便量が測れるように精度向上を目指したいです。
9. おまけ
9-1. 部品一覧
今回使用した部品一覧です。合計金額はだいたい8,500円です。
支払い金額の実際は、基板10枚や部品予備等注文しているので約1万円です。
- ロードセルモジュール
部品名 | 型番 | 個数 | 単価(税込) | 合計(税込) |
---|---|---|---|---|
ロードセル | SC134-10kg-CTH | 4 | 750 | 3,000 |
ロードセル用ADC | AE-HX711-SIP | 4 | 350 | 1,400 |
コンデンサ 100uF | 25PX100MEFC5X11 | 4 | 10 | 40 |
XHコネクタ 6pin | S6B-XH-A (LF)(SN) | 4 | 15 | 60 |
合計(税込) | 4,500 |
- マイコン基板
部品名 | 型番 | 個数 | 単価(税込) | 合計(税込) |
---|---|---|---|---|
M5StampS3 | M5STACK-S007 | 1 | 1,580 | 1,580 |
DCDC | M78AR05-1 | 1 | 730 | 730 |
コンデンサ 100uF | 16SEPC100M | 2 | 40 | 80 |
コンデンサ 10uF | RDEC71H106K3K1H03B | 1 | 50 | 50 |
ピンソケット(メス) 1×6(6P) | FH-1x6SG/RH | 1 | 20 | 20 |
ピンソケット(メス) 1×9(9P) | FH-1x9SG/RH | 1 | 35 | 35 |
XHコネクタ 4pin | B4B-XH-A(LF)(SN) | 4 | 10 | 40 |
2.1mm標準DCジャック | MJ-179PH | 1 | 40 | 40 |
シールドケーブル 10m | 1 | 1,097 | 1,097 | |
プリント基板 | 1 | 299 | 299 | |
合計(税込) | 3,971 |
9-2. 秋月電子 HX711モジュール基板のロードセル電源部コンデンサについて
Twitterで秋月電子HX711モジュール基板のC2コンデンサ容量不足疑惑を見かけました。
秋月のHX711基板に電解22uF追加したら標準偏差が改善した。
— 天球儀 (@hanai_y) April 23, 2019
データシートは10uFだったからセラコンじゃ容量足りんのか。 pic.twitter.com/XeVNdVdAzZ
強誘電体を用いたセラミックコンデンサはDC電圧がかかっていると容量が減少する特性を持っており、秋月電子のモジュールにその種類のコンデンサが使われているならば、HX711のデータシート上の推奨値10uFを満たさない可能性があるとの話です。
手元に10uFと100uFのアルミ電解コンデンサがあるので実際に追加して試してみました。
ツイートでは34%ほど標準偏差が小さくなっていましたが、結果こちらの環境ではわずかにしか変わりませんでした。
0uF | 10uF | 100uF | |
---|---|---|---|
標準偏差 g | 0.1339 | 0.1368 | 0.1360 |
変化率 % | 0(基準) | 2.168 | 1.527 |
このデータは5-4. マイコンプログラムで取得していますが、使用しているライブラリolkal/HX711_ADCのconfig.hを編集して移動平均を取らないようにしてあります。
若干温度ドリフトなど低周波で揺れ動いてしまったので、
1Hzのハイパスフィルタをかけてから標準偏差を計算しましたがそんなには変わりません。
0uF | 10uF | 100uF | |
---|---|---|---|
標準偏差 g | 0.09991 | 0.09893 | 0.09778 |
変化率 % | 0(基準) | -0.9848 | -2.139 |
ツイート時点と現ロットで部品が違うのかな?とか配線が長かったりしたら影響するのかな?と思いつつ、当環境で影響ないならまあ良いかで深追いしないことにします。何かわかったことがあればお教えください。
9-3. ロードセル用ADC CS1237
今回の用途ではHX711で全く問題ないのですが、もっとノイズの少ないADCはないのかなと思って探していたら次のような記事を見つけました。どうやらCS1237というADCが良さそうです。
東京大学 桑野研究室: 24bit ADCのちょっとした調査結果
というわけで物は試しに買いました。
AliExpressにて189円/個でした(10個購入割引時)。
HX711と同じようにロードセルに配線して、こちらtremaru/iarduino_ADC_CS1237のサンプルプログラムほぼそのまま使ってデータを取りました。CS1237はいくつかの設定ができ、ここではゲインは128、サンプリングレート10Hzにしました。
長期間のドリフトも知りたいので7時間ほど計測しました。
HX711とCS1237はどちらもローパスフィルタなどの処理はしていません。
線の太さからお察しですが、標準偏差はHX711が約0.15g、CS1237が約0.63gでした(100点分の標準偏差を200回平均)。
うーん
9-4. スイッチングDCDCとシリーズレギュレータの比較 ノイズ, 消費電流
本記事で使用している基板は、9VスイッチングACアダプタから給電され、マイコンやADCのために5Vへ降圧しています。ノイズに困っているわけではないですが、この降圧するときにDCDCでなくてシリーズレギュレータを使えばもっとノイズが減るのではと試してみました。ついでに消費電流も測ります。
オシロスコープはAnalog Discovery 2、電流は秋月電子のLT6105使用ハイサイド電流センシングモジュールキットで電圧に変換して測ります。
マイコンはHX711を4個分、1Hzでデータを送信する条件です。
電圧は降圧後の5V電圧ライン、電流はスイッチングACアダプタ出力で測ります。
まずは最初にユニバーサル基板でも作った仕様であるDCDC(M78AR05-1)を試します。
青が電圧、オレンジが電流です。電流の電圧への換算感度は1V/1Aです。
消費電流はそこそこ変動します。ピークは100mA程度で、実効値は43.6mA(0.39W)でした。
次はシリーズレギュレータ(NJM7805FA)です。
思っていたよりノイズが多いです。
それと消費電流のピークは270mA程度で、実効値は85.3mA(0.77W)でした。
スイッチングDCDCよりシリーズレギュレータの方が低ノイズと書かれたものはよく目にしますが6、負荷が変動する場合は必ずしもそうでもないのかもしれません。
データシートを見ると、負荷が変動した際に出力電圧がどの程度変動するかを示す値としてロードレギュレーションという項目が記載されており、NJM7805FAは標準15mV最大60mV、M78AR05-1は20mV最大30mVですが、こういうのって実際やってみるまでどっちが良いかよくわからないんですよね。
それとHX711の計測データの標準偏差も計測しました。
スイッチングDCDCの方がちょっと良いくらいです。
スイッチングDCDC | シリーズレギュレータ | |
---|---|---|
標準偏差 g | 0.141 | 0.1500 |