はじめに
皆さん、こんにちわ。ABEJAアドベントカレンダー2020の1日目の記事です。
ABEJAでは今はデータ基盤芸人をやっております。
業務内容とは一切関係無いのですが、最近猫(保護猫)を飼い始めました!
保護猫を引き取ってみて感じた課題をTechで解決できないか思いを馳せてみたので、今回はその話ができればと思いますー
2コマ漫画 pic.twitter.com/w543wRFG0G
— たぐろまる (@xecus) September 23, 2020
課題整理
家猫として保護して二日目、ゲージ開けとるけど一向に出る気配なし pic.twitter.com/XRhEChG881
— たぐろまる (@xecus) September 22, 2020
-
保護したての猫が家の中で全然動かない (保護当時)
- 人間にビビりすぎて、家に人がいる時はずっと隠れている
- リモートワークで在宅がおおいので、ずっと隠れてる
- 全く定位置(ソファーの下)から動かずに数日経過。
- 基本的に人間が部屋にいるとご飯もトイレも行かない
- けど、外出時と夜間は出歩いているっぽい
- トイレに行ってるか不明
- 2日ぐらい出ない事も
- ご飯食べてるか不明
- 全然食べないけど、たまに無くなっているので食べているっぽい
- 人間にビビりすぎて、家に人がいる時はずっと隠れている
-
猫の行動をデータドリブンにちゃんと知りたい
- 長期戦になりそうなので、猫の活動量を日々トラッキングしていきたい気持ち
※ちなみに執筆時の今は、かなりマシになっています
考えたことメモ
案①: 猫行動監視用のカメラを設置する
-
実際にやってみたものの、猫の行動範囲に対してカメラで全部カバーするのは無理
- 5000円ぐらい出せば、暗闇対応の激安スマートカメラが購入できた
- 台数がかなり多くなるのとカメラ自体が設置困難なところがいくつかある
-
カメラ映像を蓄積したり、それを見るための仕組みを作るのがめんどくさかった
案②: 猫の首輪にデータロガーの装着をする
- 家に、手頃なモノが当時見つからなかった
- 加速度センサー等をベースに自作を検討したが、装着負荷が高そう
- 定期的に充電したりするの大変そう
-
12/1現在はCatlogと呼ばれるIoT見守り首輪を装着中
- 後から適材が見つかったパターン。
- https://rabo.cat/catlog
案③: 猫の首輪にBLEタグを仕込む (今回採用)
-
家に手頃なモノ(落とし物防止タグ)があった
- メーカー名は内緒。
- 小さく十数グラム?ぐらいしかないので、猫への装着負荷を最小限にできそう。
-
BLEタグから定期的に送信される電波の強度を計測し続ける事で、なんとなく動きがわかるかも
- 単点計測で「何らかの動き」、複数点計測で「位置」がわかるかも。
今回作ったもの
今回は、RaspberryPiを使ってBLEタグから送信される電波をキャッチして距離を算出し、猫の動きを見守るようなモノをラフに作りました。
また、BQにストリーミングインサートして行動履歴を蓄積したり、ある一定のイベントを検知した場合(急に電波強度=猫の位置が移動した時等)にSlack等のチャットに連携できるような機能も備えています。
内部的には、「btmon」と「hcitool」と呼ばれるコマンドを使っており、これらのコマンドの標準出力をPythonスクリプトで拾い上げる事でBLEタグの情報や距離を算出しています。(上記画像はコマンドから出力される生データ)
下記は、今回作ったもののコアロジックとなる部分です。2つのコマンドが同時に起動していないとbtmonが使えないみたいだったので、multiprocessingやsubprocessを使って同時に起動させています。
import json
import datetime
import time
import sys
import subprocess
import multiprocessing
class DataLogger():
def __init__(self, fn):
self.fn = fn
def append_line(self, line):
with open(self.fn, "a") as f:
f.write(line + '\n')
class LeAdvertisingReport():
def __init__(self):
self.company = None
self.type = None
self.mac_address = None
self.rssi = None
self.tx_power = None
self.timestamp = datetime.datetime.now()
def set_company(self, line):
if line.startswith('Company: '):
self.company = line.split(': ')[1]
def set_type(self, line):
if line.startswith('Type: '):
self.type = line.split(': ')[1]
def set_mac_address(self, line):
if line.startswith('Address: '):
self.mac_address = line.split(' ')[1]
def set_tx_power(self, line):
if line.startswith('TX power: '):
self.tx_power = int(line.split(': ')[1].split(' ')[0])
def set_rssi(self, line):
if line.startswith('RSSI: '):
self.rssi = int(line.split(': ')[1].split(' ')[0])
def event_detected(self):
# 特定のMacアドレスを含むモノ以外は除外する
#if 'XX:XX' not in self.mac_address:
# return
# 検知レポートを表示
print('<LeAdvertisingReport@{}>'.format(self.timestamp))
print('company={}'.format(self.company))
print('type={}'.format(self.type))
print('mac_address={}'.format(self.mac_address))
print('tx_power={} dB'.format(self.tx_power))
print('rssi={} dBm'.format(self.rssi))
# 距離の算出
d = None
if self.tx_power and self.rssi:
d = pow(10.0, (self.tx_power - self.rssi) / 20.0)
print('Distance = {} m'.format(d))
# ログとしてローカルに書き出す
tmp = {
'timestamp': self.timestamp.isoformat(),
'company': self.company,
'mac_address': self.mac_address,
'tx_power': self.tx_power,
'rssi': self.rssi,
'distance': d
}
DataLogger('result.txt').append_line(json.dumps(tmp))
def run_lescan():
while True:
process = subprocess.Popen(['hcitool', 'lescan', '--duplicates'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while True:
output = process.stdout.readline()
if process.poll() is not None:
break
if output:
#print('lescan >>>', output.strip())
pass
def run_btmon():
def _is_new_event(line):
return '> HCI Event: LE Meta Event' in line
tmp = None
while True:
process = subprocess.Popen(['btmon'], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
while True:
output = process.stdout.readline()
if process.poll() is not None:
break
if output:
line = output.decode('utf-8').strip()
# HCI Eventを拾い上げる
if _is_new_event(line):
if tmp is not None:
tmp.event_detected()
tmp = LeAdvertisingReport()
continue
# イベントが検知されるまで待つ
if tmp is None:
continue
# コマンド出力をパースする
try:
tmp.set_company(line)
tmp.set_type(line)
tmp.set_mac_address(line)
tmp.set_tx_power(line)
tmp.set_rssi(line)
except Exception:
print('Failed to parse.')
if __name__ == '__main__':
p1 = multiprocessing.Process(target=run_lescan, args=())
p1.daemon = True
p1.start()
p2 = multiprocessing.Process(target=run_btmon, args=())
p2.daemon = True
p2.start()
while True:
time.sleep(1)
データ解析結果
受信される電波の強度RSSI(Received Signal Strength Indication)とTxPower(BLEタグの送信電力)の2つがわかると、タグとRaspberryPiの間の距離を算出する事ができます。(上記のソースコードにも計算部分があります)
参考: https://qiita.com/shu223/items/7c4e87c47eca65724305
この距離の時間変動=猫の活動(移動)になるので、一旦、1日分のデータをさくっとプロットしてみました。
MVA(移動平均)を適用したあとの方がわかりやすいのですが、1日を通していくつかグラフに変化があるのが確認できると思います。RaspberryPiの位置(BLE電波の観測点)から察するに、ソファーの下にもぐったり、ご飯を食べに移動している様子を確認する事ができました。
最後に
今回は、BLEタグを使って猫の活動を緩く追いかける仕組みを作ってみました。
低コストでかつ装着負荷が少なく、ざくっと活動をしる事ができるモノをライトに作る事ができました。
展望として、RaspberryPiの数を増やして猫の大まかな位置推定をしてみたり、色々やってみたいなと思っております。