LoginSignup
14

More than 3 years have passed since last update.

Raspberry Piで心拍データをBLEでリアルタイムに受信する

Last updated at Posted at 2019-09-02

はじめに

心拍データをリアルタイムで取得して、可視化したり他システムと連携するのをやりたくて、
接続テストからMQTTブローカーに送信するところまでをまとめました。

今回はBLE対応の心拍センサー3種類を購入し、検証しています。

環境

  • 機種:RaspberryPi 3B+
  • ディストリビューション:Raspbian Buster Lite
  • OSカーネルバージョン:4.19.50
  • 言語:Python 2.7.16

試したBLE対応の心拍センサー

  • Fitbit Charge 3 約19,000円
  • Polar H10 約13,000円
  • ALATECH CS010(センチュリー ハカロー!シリーズ) セールで500円

先に結論

  • Fitbit Charge 3
    リアルタイムデータ取得できない
    Fitbit SDKがあるが、取得可能なデータは約15分遅れとなる。リアルタイムAPIは一般公開していない。
    BLEを解析すれば取得できる可能性はあるが、認証があり手間。

  • Polar H10
    リアルタイムデータ取得できる
    BLEで比較的簡単に取得可能。手順は後述。
    なお、Polar TeamPro APIは未調査

  • ALATECH CS010
    リアルタイムデータ取得できる
    BLEで比較的簡単に取得可能。手順は後述。

データ取得手順

事前準備

ライブラリ等のインストール

以下のインストールが必要です。

  • Python 2.7.x
  • python-pip
  • bluepy
$ sudo apt-get install python-pip
$ sudo apt-get install bluetooth bluez libbluetooth-dev libudev-dev
$ sudo pip install paho-mqtt
$ sudo pip install bluepy

Polar H10

  • LocalName:Polar H10 abcdef01

事前準備

コマンドラインツールでMACアドレスを調べておき、接続テストを行う。

  • MACアドレスを調べる
sudo hcitool lescan | grep "Polar H10"
AA:AA:AA:AA:AA:AA Polar H10 abcdef01


  • 接続~受信テスト
$ gatttool -b AA:AA:AA:AA:AA:AA -I -t random 
[AA:AA:AA:AA:AA:AA][LE]> char-write-req 0x0011 0100
Notification handle = 0x0010 value: 10 57 7e 02

Notificationで約1秒おきに送られてきます。
止めるときはchar-write-req 0x0011 0000を送ればOK。
上記の例では2バイト目が0x57 なので、心拍は87となります。


Pythonでデータを取得しMQTTブローカーに送信

  • プログラムサンプル(【】で囲まれた箇所を置き換える)
# -*- coding: utf-8 -*-
from bluepy.btle 
import Peripheral
import bluepy.btle as btle
import binascii
import time
import paho.mqtt.client as mqtt
from datetime import datetime
import json

H10_RAW = "AA:AA:AA:AA:AA:AA"

class MyDelegate(btle.DefaultDelegate):
    def __init__(self, params):
        btle.DefaultDelegate.__init__(self)

    def handleNotification(self, cHandle, data): 
        c_data = str(binascii.b2a_hex(data))
        #print(c_data)
        hr = int(c_data[2:4], 16)
        print(hr)

        data = {
            "createtime" : '',
            "heart" : 0
        }

        data['createtime'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")
        data['heart'] = hr
        message = json.dumps(data)
        print(message)
        client = mqtt.Client(protocol=mqtt.MQTTv311)
        client.username_pw_set(MQTTブローカーID, MQTTブローカーPASS)
        client.connect(MQTTブローカーURL, MQTTブローカーポート番号, keepalive=60)
        client.publish("heart", message)
        client.disconnect()

class SensorMedal(Peripheral):
    def __init__(self, addr):
        Peripheral.__init__(self, addr, addrType="random")

def main():
    medal = SensorMedal(H10_RAW)
    medal.setDelegate(MyDelegate(btle.DefaultDelegate))
    medal.writeCharacteristic(0x0011, "\x01\x00", True)

    while True:
        if medal.waitForNotifications(1.0):
            continue

if __name__ == "__main__":
    main()

ALATECH CS010

  • LocalName:HRM sensor V24

事前準備

コマンドラインツールでMACアドレスを調べておき、接続テストを行う。

  • MACアドレスを調べる
sudo hcitool lescan | grep "HRM sensor V24"
BB:BB:BB:BB:BB:BB HRM sensor V24


  • 接続~受信テスト
$ gatttool -b BB:BB:BB:BB:BB:BB -I
[BB:BB:BB:BB:BB:BB][LE]> char-write-req 0x0013 0100
Notification handle = 0x0012 value: 16 5a e7 02 dd 02

Notificationで約1秒おきに送られてきます。
止めるときはchar-write-req 0x0013 0000を送ればOK。
上記の例では2バイト目が0x5a なので、心拍は90となります。


Pythonでデータを取得しMQTTブローカーに送信

  • プログラムサンプル(【】で囲まれた箇所を置き換える)
# -*- coding: utf-8 -*-
from bluepy.btle 
import Peripheral
import bluepy.btle as btle
import binascii
import time
import paho.mqtt.client as mqtt
from datetime import datetime
import json

HRM_RAW = "BB:BB:BB:BB:BB:BB"

class MyDelegate(btle.DefaultDelegate):
    def __init__(self, params):
        btle.DefaultDelegate.__init__(self)

    def handleNotification(self, cHandle, data): 
        c_data = str(binascii.b2a_hex(data))
        #print(c_data)
        hr = int(c_data[2:4], 16)
        print(hr)

        data = {
            "createtime" : '',
            "heart" : 0
        }

        data['createtime'] = datetime.now().strftime("%Y/%m/%d %H:%M:%S.%f")
        data['heart'] = hr
        message = json.dumps(data)
        print(message)
        client = mqtt.Client(protocol=mqtt.MQTTv311)
        client.username_pw_set(MQTTブローカーID, MQTTブローカーPASS)
        client.connect(MQTTブローカーURL, MQTTブローカーポート番号, keepalive=60)
        client.publish("heart", message)
        client.disconnect()

class SensorMedal(Peripheral):
    def __init__(self, addr):
        Peripheral.__init__(self, addr, addrType="random")

def main():
    medal = SensorMedal(HRM_RAW)
    medal.setDelegate(MyDelegate(btle.DefaultDelegate))
    medal.writeCharacteristic(0x0013, "\x01\x00", True)

    while True:
        if medal.waitForNotifications(1.0):
            continue

if __name__ == "__main__":
    main()

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14