8
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

脈拍センサから心拍数をリアルタイムに計算する

Last updated at Posted at 2022-02-26

概要

  • 市販の脈拍センサを指に装着, arduinoで読み込み, シリアル通信でPCに送信
  • Windows PC上のpythonで, シリアルを受け取り, リアルタイムに解析して心拍数を計算
  • 実装はこちら: https://github.com/dokkozo/heartbeat_detection

できたこと

  • センサを指に装着
    • IMG_4902.png
  • 脈拍を計測して (arduino)
    • image.png
  • PCにとりこんで処理して脈拍を逐次表示 (python)
    • image.png

脈拍センサについて

  • 今回はKKHMF製 MAX30102互換? を使用
    • IMG_4895.png
    • インターフェースはI2Cで4端子を使用してArduinoに接続. 5V, GND, SDA(A4), SCL(A5)
  • アナログの値をセンサ出力として出すものもある
  • 仕組み
  • どこではかるのがよいか?(感触)
    • 信号の大きさ: 指先,腹 > 指先,爪 > 指中間,腹 > 指中間,背 > 手首外側 > 手首内側
      • 指先が一番よくとれるが、指先を使ってしまうと例えば心拍をゲームの入力にするなどは難しい
      • 手首は固定をうまくやったり信号処理を頑張るか, 長時間計測して1回推定にするなどしないと相当厳しいか
    • 首は取れたけど, 脈に応じた圧力の変化を検出してしまってるかも?そして呼吸と発話の影響を受けてしまうので難しい

ハードの作成

  • マウスを握ったりなど, 別の操作をしながら計測したかったため, 親指の中間, 配線の取り回しを考えて手の甲側に装着
  • ケーブルを束ねる用のマジックテープを使って固定
    • IMG_4903.png
  • センサの指が触れる面をセロハンテープで覆っている
    • 回路に直接指が触れることでショートしてセンサの値が飛ぶことがあったため。
    • IMG_4904.png
    • 装着するとこんな感じ
      • IMG_4901.png

Arduino側の実装

配線

  • Arduino UNOであればVIN: 5V, GND: GND, SDA: A4, SCL: A5に接続

コード

  • ライブラリMAX30105.hを追加してインクルード
  • サンプルコードがスケッチ例 > カスタムライブラリのスケッチ例 > Sparkfun MAX3010xにあるので参考に下記を実装
  • ちなみに, 赤色と赤外光 (と緑も?)使えるようですが, 赤外光の方がBPMの検出がしやすかったので赤外光を使っています。あと赤色だとまぶしいので。
#include <Wire.h>
#include "MAX30105.h"
// doc https://learn.sparkfun.com/tutorials/max30105-particle-and-pulse-ox-sensor-hookup-guide/all
MAX30105 particleSensor;

void setup()
{
  Wire.begin(); 
  Serial.begin(115200);
  
  Serial.println("");
  Serial.println("Initializing...");
 
  // Initialize sensor
  while(!particleSensor.begin(Wire, I2C_SPEED_FAST)){
    Serial.print(".");
  }
  Serial.println("OK!");
  
  //Setup to sense a nice looking saw tooth on the plotter
  byte ledBrightness = 0x1F; //Options: 0=Off to 255=50mA
  int sampleRate = 800; //Options: 50, 100, 200, 400, 800, 1000, 1600, 3200
  byte sampleAverage = 4; //Options: 1, 2, 4, 8, 16, 32
  // 結果的に800/4でサンプルレートは200になります
  // redとirが逆になっていて, ここが1だと赤外線LEDが発光します(見えない)
  byte ledMode = 1; //Options: 1 = Red only, 2 = Red + IR, 3 = Red + IR + Green
  int pulseWidth = 411; //Options: 69, 118, 215, 411
  int adcRange = 4096; //Options: 2048, 4096, 8192, 16384
 
  particleSensor.setup(ledBrightness, sampleAverage, ledMode, sampleRate, pulseWidth, adcRange); //Configure sensor with these settings
 
  Serial.println("red");  
}
 
void loop()
{
  //redとirが逆になっているらしく, getRed()でirの強度をとってきています 
  //uint32_t sensor_ir = particleSensor.getIR();
  uint32_t sensor_red = particleSensor.getRed();

  Serial.println(sensor_red);
}
  • なおアナログのセンサを使う場合は, +を5V, -をGND, SをAnalogReadに接続して単に下記のようなanalogReadのコードを作ればよいです.
#define SENSOR A0

int sensorValue;

void setup() {
  // put your setup code here, to run once:
  Serial.begin(9600);
}

void loop() {
  // put your main code here, to run repeatedly:
  sensorValue = analogRead(SENSOR);
  Serial.println(sensorValue);
  delay(1);
}

参考

PC側: Python

  • pythonでserial portから入ってきたデータを読み込みバッファリング
  • 定期的にバッファの中身を見て心拍数を推定します

コード

  • Windows10, python3.9.7で動作確認しています
  • importしているライブラリをpipで入れてください
  • 現状の実装は心拍数を検出したらprintで出力するものになっています
#!/usr/bin/env python3
from argparse import ArgumentParser
from pythonosc import udp_client
import serial
import time
import numpy as np
from typing import Tuple, Union, Optional
from matplotlib import pyplot as plt
from biosppy.signals import ecg
from scipy import signal
import threading
from copy import copy

def main():
    args = getargs()

    history_buf = HistoryBuffer(
        serial_port = args.serial_port,
        serial_speed = args.serial_speed,
        fs_supposed = args.fs_supposed,
        fs_torelance = args.fs_torelance,
        history_buf_size = args.history_buf_size,
        bpm_estim_cycletime_sec = args.bpm_estim_cycletime_sec,
        sensor_value_thres = args.sensor_value_thres,
    )

    thread_updatebuf = threading.Thread(target=history_buf.update_repeat)
    thread_calculation = threading.Thread(target=history_buf.calc_repeat)

    thread_updatebuf.start()
    thread_calculation.start()


class HistoryBuffer():
    """
    Class for buffering and processing incoming sensor data from serial port.
    For calculating BPM from buffered data, this class calls BeatAnalyzer.analyze() method.
    Buffer update cycletime depends on sensor sampling rate; if sensor send data every 1ms, __receive_onedata() method wait for and receive data every 1ms.
    """
    def __init__(
        self,
        serial_port: str,
        serial_speed: int = 115200,
        fs_supposed: int = 200,
        fs_torelance: int = 2,
        history_buf_size: int = 800,
        bpm_estim_cycletime_sec: float = 1.0,
        sensor_value_thres: int = 50000,
    ):
        self.ser = serial.Serial(serial_port, serial_speed, timeout=0.1) 
        self.history_buf_size = history_buf_size
        self.bpm_estim_cycletime_sec = bpm_estim_cycletime_sec
        self.fs_supposed = fs_supposed
        self.fs_torelance = fs_torelance
        self.sensor_value_thres = sensor_value_thres

        self.data = {
            "time": np.zeros([history_buf_size]),
            "value": np.zeros([history_buf_size]),
        }

        self.is_buf_full = False
        self.datacnt = 0

        self.beatanalyzer = BeatAnalyzer(fs = fs_supposed, Nsamples=history_buf_size)

    ################### data aquisition related #################
    def update_repeat(self):
        while True:
            self.__update()

    def __update(self):
        # get incoming data
        sensor_value, sensor_time = self.__receive_onedata()
        if sensor_value is not None:
            # update ring buffer
            self.data["value"] = np.roll(self.data["value"], 1)
            self.data["value"][0] = sensor_value
            self.data["time"] = np.roll(self.data["time"], 1)
            self.data["time"][0] = sensor_time
            
            # update count
            if not self.is_buf_full and self.data["time"][-1] > 0:
                self.is_buf_full = True

    def __receive_onedata(self) -> Tuple[Union[int, None], Union[float, None]]:
        buf = ''
        while True:
            readtime = time.time()
            data = self.ser.read_all()

            if len(data) > 0: 
                # if something is received
                try:
                    data_str = data.decode()
                except:
                    print(f'skip {data}')
                    return None, None
                
                buf += data_str

                if '\n' == data_str[-1]:
                    # if data completed
                    buf_datas = buf.split('\r\n')
                    last_buf_data = buf_datas[-2] # because buf_datas is typically like [..., '1214', '1213', '']
                    
                    # buffed data can somhow be incomplete
                    try:
                        sensor_value = int(last_buf_data)
                    except:
                        print(f'skip {buf}')
                        return None, None

                    sensor_time = time.time()
                    break
            
            time.sleep(0.1/1000)

        return sensor_value, sensor_time

    ################### bpm calculation related #################
    def calc_repeat(self):
        while True:
            self.__calc_bpm()
            time.sleep(self.bpm_estim_cycletime_sec)

    def __calc_bpm(self):
        # get buffered data
        buf_data = copy(self.data['value'])
        buf_time = copy(self.data['time'])

        if not self.is_buf_full:
            return

        # calculate measured fs based on data aquisition time
        fs_actual = self.history_buf_size / (buf_time[0] - buf_time[-1])
        if abs(fs_actual - self.fs_supposed) > self.fs_torelance:
            print(f"Measured fs: {fs_actual} is too different from fs: {self.fs_supposed}")
            return
        
        # if any of bufferred sensor value are less than threshold, skip culculation. It seems nothing is attached to the sensor.
        if np.min(self.data["value"]) < self.sensor_value_thres:
            print(f"Min bufferred sensor value: {np.min(buf_data)} indicates nothing is attached to the sensor.")
            return

        ### ecg toolkit based bpm estimation
        # analyzed = ecg.ecg(signal=self.data["value"][::-1], sampling_rate=fs_actual, show=False)
        # estim_bpm = np.median(analyzed["heart_rate"])
        ### custom bpm estimation
        estim_bpm = self.beatanalyzer.analyze(buf_data[::-1], actual_fs = fs_actual)

        print(f"estimated bpm: {estim_bpm:.01f}")

class BeatAnalyzer():
    """
    Class for calculating beat from time series sensor data.
    """
    def __init__(
        self,
        fs: int,
        Nsamples: int,
        min_bpm: float = 40.0,
        max_bpm: float = 180.0,
    ):
        self.min_bpm = min_bpm
        self.max_bpm = max_bpm
        self.fs = fs
        self.Nsamples = Nsamples
        self.datalen_sec = Nsamples/fs
        self.fft_bpm_resolution = fs/Nsamples*60
        self.bpms = np.arange(np.ceil(min_bpm/self.fft_bpm_resolution), np.floor(max_bpm/self.fft_bpm_resolution)+1) * self.fft_bpm_resolution
        basis_phase = 2 * np.pi / fs * np.dot(self.bpms[:,None]/60, np.arange(0,Nsamples)[None,:])
        self.basis_sin = np.sin(basis_phase)
        self.basis_cos = np.cos(basis_phase)

    def analyze(self, data: np.ndarray, actual_fs: float) -> float:
        # step1 detect rough bpm based on fft
        cor = np.sqrt((np.dot(self.basis_sin, data)/self.Nsamples) ** 2 + (np.dot(self.basis_cos, data)/self.Nsamples) ** 2)
        rough_bpm = self.bpms[np.argmax(cor)]
        print(f"rough_bpm: {rough_bpm}")
        rough_fs_range = [max(rough_bpm/60 - 0.5, self.min_bpm/60), min(rough_bpm/60 + 0.5, self.max_bpm/60)]

        # step2 filter using rough bpm range
        filter_coef = signal.firwin(numtaps=91, cutoff=rough_fs_range, fs=actual_fs, pass_zero=False)
        filterd_signal = signal.lfilter(filter_coef, 1, data)[92:]

        # step3 detect local extremums
        extremums = np.diff((np.diff(filterd_signal) > 0).astype(int)) 
        local_maxima = np.where(extremums == -1)[0] + 1
        local_minima = np.where(extremums == 1)[0] + 1
        # print(local_maxima/200)
        # print(local_minima/200)

        # step4 check if each local maxima/minima is max/min within a certain range
        is_local_maxima_max = np.zeros(len(local_maxima))
        is_local_minima_min = np.zeros(len(local_minima))
        detect_width = int(actual_fs/(self.max_bpm/60) / 2)
        for i, idx in enumerate(local_maxima):
            target_range = [max(0, idx - detect_width), idx + detect_width]

            argmax_within_range = np.argmax(filterd_signal[target_range[0]:target_range[1]]) + idx - detect_width
            if idx == argmax_within_range:
                is_local_maxima_max[i] = 1

        for i, idx in enumerate(local_minima):
            target_range = [max(0, idx - detect_width), idx + detect_width]

            argmin_within_range = np.argmin(filterd_signal[target_range[0]:target_range[1]]) + idx - detect_width
            if idx == argmin_within_range:
                is_local_minima_min[i] = 1
        
        max_candidate = local_maxima[np.where(is_local_maxima_max == 1)[0]]
        min_candidate = local_minima[np.where(is_local_minima_min == 1)[0]]
        # print(max_candidate/200)
        # print(min_candidate/200)
        if len(max_candidate) < 2 or len(min_candidate) < 2:
            print(f"No enough pulse detected.")
            return None

        # step5 calculate period
        period_max = np.diff(max_candidate)
        period_min = np.diff(min_candidate)

        # step6 calculate average bpm
        if len(period_max) >= 3:
            period_max = period_max[1:-1]
        if len(period_min) >= 3:
            period_min = period_min[1:-1]
        period_mean = np.mean(np.hstack([period_max, period_min])/actual_fs)

        return 1/period_mean * 60

def getargs():
    argparser = ArgumentParser()
    argparser.add_argument('--serial_port', type=str, required=True, help='Arduino serial port. Specify like "COM4"')
    argparser.add_argument('--serial_speed', type=int, default=115200, help='Serial speed in bps')
    argparser.add_argument('--history_buf_size', type=int, default=800, help='Sensor data buffer size')
    argparser.add_argument('--bpm_estim_cycletime_sec', type=float, default=1.0, help='Estimate BPM using bufferred data every specified cycles')
    argparser.add_argument('--fs_supposed', type=int, default=200, help='Sampling rate of the sensor.')
    argparser.add_argument('--fs_torelance', type=int, default=2, help='If actual sample rate goes below this value, stop updating BPM since something seems wrong...')
    argparser.add_argument('--sensor_value_thres', type=int, default=50000, help='If data buf contains value less than this threshold, skip BPM estimation.')
    args = argparser.parse_args()
    return args

if __name__ == '__main__':
    main()

解説

  • 全体としては、シリアルポートの入力を順次バッファに追加する処理と、一定時間ごとにバッファにあるデータを分析してBPMを算出する処理とを別々のスレッドで実行しています。
  • BPM推定のアルゴリズムの本体はBeatanalyzer.analyze()です. 複数ステップのアルゴリズムになっています
    • まず離散フーリエ変換で解像度の低い心拍推定をします。10秒の長さのデータがバッファにあればBPM6刻みで検出できます。5秒なら12刻みです。体感このラフな推定は解像度が低いもののそこそこ頑健に動作する気がします。
    • 続いて、そこでの推定値を用いてセンサデータのフィルタリングをします。
    • フィルタしたデータに対して極値の検出をして脈波の山と谷を検出します
    • 誤検出をできるだけはじいたりなどして最終的な心拍数を割り出します
  • 手がマウスを動かしているとやはり推定を誤ることがあります. こうした場合にBPMの推定をパスする処理を入れるとよりよさそうです
  • 心拍数解析の参考
8
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?