はじめに
心拍データをリアルタイムで取得して、可視化したり他システムと連携するのをやりたくて、
接続テストからMQTTブローカーに送信するところまでをまとめました。
今回はBLE対応の心拍センサー3種類を購入し、検証しています。
環境
- 機種:RaspberryPi 3B+
- ディストリビューション:Raspbian Buster Lite
- OSカーネルバージョン:4.19.50
- 言語:Python 2.7.16
試したBLE対応の心拍センサー
- Fitbit Charge 3 約19,000円
- Polar H10 約13,000円
- ALATECH CS010(センチュリー ハカロー!シリーズ) セールで500円
先に結論
-
Fitbit Charge 3
リアルタイムデータ取得できない。
Fitbit SDKがあるが、取得可能なデータは約15分遅れとなる。リアルタイムAPIは一般公開していない。
BLEを解析すれば取得できる可能性はあるが、認証があり手間。 -
Polar H10
リアルタイムデータ取得できる。
BLEで比較的簡単に取得可能。手順は後述。
なお、Polar TeamPro APIは未調査 -
ALATECH CS010
リアルタイムデータ取得できる。
BLEで比較的簡単に取得可能。手順は後述。
#データ取得手順
事前準備
ライブラリ等のインストール
以下のインストールが必要です。
- Python 2.7.x
- python-pip
- bluepy
$ sudo apt-get install python-pip
$ sudo apt-get install bluetooth bluez libbluetooth-dev libudev-dev
$ sudo pip install paho-mqtt
$ sudo pip install bluepy
Polar H10
- LocalName:Polar H10 abcdef01
事前準備
コマンドラインツールでMACアドレスを調べておき、接続テストを行う。
- MACアドレスを調べる
sudo hcitool lescan | grep "Polar H10"
AA:AA:AA:AA:AA:AA Polar H10 abcdef01
- 接続~受信テスト
$ gatttool -b AA:AA:AA:AA:AA:AA -I -t random
[AA:AA:AA:AA:AA:AA][LE]> char-write-req 0x0011 0100
Notification handle = 0x0010 value: 10 57 7e 02
Notificationで約1秒おきに送られてきます。
止めるときはchar-write-req 0x0011 0000を送ればOK。
上記の例では2バイト目が0x57 なので、心拍は87となります。
### Pythonでデータを取得しMQTTブローカーに送信
- プログラムサンプル(【】で囲まれた箇所を置き換える)
# -*- coding: utf-8 -*-
from bluepy.btle
import Peripheral
import bluepy.btle as btle
import binascii
import time
import paho.mqtt.client as mqtt
from datetime import datetime
import json
H10_RAW = "AA:AA:AA:AA:AA:AA"
class MyDelegate(btle.DefaultDelegate):
def __init__(self, params):
btle.DefaultDelegate.__init__(self)
def handleNotification(self, cHandle, data):
c_data = str(binascii.b2a_hex(data))
#print(c_data)
hr = int(c_data[2:4], 16)
print(hr)
data = {
"createtime" : '',
"heart" : 0
}
data['createtime'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")
data['heart'] = hr
message = json.dumps(data)
print(message)
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.username_pw_set(【MQTTブローカーID】, 【MQTTブローカーPASS】)
client.connect(【MQTTブローカーURL】, 【MQTTブローカーポート番号】, keepalive=60)
client.publish("heart", message)
client.disconnect()
class SensorMedal(Peripheral):
def __init__(self, addr):
Peripheral.__init__(self, addr, addrType="random")
def main():
medal = SensorMedal(H10_RAW)
medal.setDelegate(MyDelegate(btle.DefaultDelegate))
medal.writeCharacteristic(0x0011, "\x01\x00", True)
while True:
if medal.waitForNotifications(1.0):
continue
if __name__ == "__main__":
main()
ALATECH CS010
- LocalName:HRM sensor V24
事前準備
コマンドラインツールでMACアドレスを調べておき、接続テストを行う。
- MACアドレスを調べる
sudo hcitool lescan | grep "HRM sensor V24"
BB:BB:BB:BB:BB:BB HRM sensor V24
- 接続~受信テスト
$ gatttool -b BB:BB:BB:BB:BB:BB -I
[BB:BB:BB:BB:BB:BB][LE]> char-write-req 0x0013 0100
Notification handle = 0x0012 value: 16 5a e7 02 dd 02
Notificationで約1秒おきに送られてきます。
止めるときはchar-write-req 0x0013 0000を送ればOK。
上記の例では2バイト目が0x5a なので、心拍は90となります。
Pythonでデータを取得しMQTTブローカーに送信
- プログラムサンプル(【】で囲まれた箇所を置き換える)
# -*- coding: utf-8 -*-
from bluepy.btle
import Peripheral
import bluepy.btle as btle
import binascii
import time
import paho.mqtt.client as mqtt
from datetime import datetime
import json
HRM_RAW = "BB:BB:BB:BB:BB:BB"
class MyDelegate(btle.DefaultDelegate):
def __init__(self, params):
btle.DefaultDelegate.__init__(self)
def handleNotification(self, cHandle, data):
c_data = str(binascii.b2a_hex(data))
#print(c_data)
hr = int(c_data[2:4], 16)
print(hr)
data = {
"createtime" : '',
"heart" : 0
}
data['createtime'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")
data['heart'] = hr
message = json.dumps(data)
print(message)
client = mqtt.Client(protocol=mqtt.MQTTv311)
client.username_pw_set(【MQTTブローカーID】, 【MQTTブローカーPASS】)
client.connect(【MQTTブローカーURL】, 【MQTTブローカーポート番号】, keepalive=60)
client.publish("heart", message)
client.disconnect()
class SensorMedal(Peripheral):
def __init__(self, addr):
Peripheral.__init__(self, addr, addrType="random")
def main():
medal = SensorMedal(HRM_RAW)
medal.setDelegate(MyDelegate(btle.DefaultDelegate))
medal.writeCharacteristic(0x0013, "\x01\x00", True)
while True:
if medal.waitForNotifications(1.0):
continue
if __name__ == "__main__":
main()