Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
1
Help us understand the problem. What is going on with this article?
@inachi

WioLTEで心拍数計測

More than 1 year has passed since last update.

この記事は SORACOM Advent Calendar 2019 ふたつめの15日目の記事です。SORACOM UG 札幌 #1のLTでの心拍数センサーネタを元にしてます。

Grove 心拍数センサーを使ってみる

SORACOM のAdvent Calendar 2019 ひとつめの方に書いたとおり、
WioRTC を MicroPython から使えるようになりました。

比較的長い期間、モバイルバッテリーのみで計測するネタを探していたのですが、けっこうなジジイになってきていて健康管理に利用することも考えて、今回はGROVE - 心拍センサを使ってみることにしました。

Grove - 心拍センサー

この心拍センサーはあまりインテリジェントなものではなくて、脈拍による光透過具合の変化を光センサーで捉えてデジタル値 1 or 0 を返すだけのものです。なので心拍数はプログラム上で計測・算出する必要があります。

image.png

今回は WioLTE, WioRTC, モバイルバッテリー、心拍センサーを以下のようにつないで使います。

image.png

こうした上で、デジタル値が 0 から 1 になる Rising を 20 回検出した時間を計測します。

image.png

計測した時間から、以下の計算式で心拍数を算出できます。

20回 : 20回検出にかかった時間 = 心拍数 : 1分間

心拍数 = 20回 x 1分間 ÷ 20回検出にかかった時間 

Rising の検出にはデジタルピンの irq 割り込みメソッドが使えます。

# ピンからの入力がRisingになったときに呼び出されるコールバック関数
def callback(pin):
    # 20回呼びだれた時間を計測
    

pin = Pin(D38, Pin.IN)
pin.irq(callback, Pin.IRQ_RISING) # デジタル入力割込み

以上から、心拍数を計測するための heartrate.py を作成しました。

from machine import Pin
from micropython import const, alloc_emergency_exception_buf, schedule
from time import ticks_ms, ticks_diff, sleep

alloc_emergency_exception_buf(100)

class HeartRateSensor(object):
    """
    This program can be used to measure heart rate, the lowest
    pulse in the program be set to 30. Use an external interrupt 
    to measure it.
    """

    MAX_HEARTPLUSE_DUTY = const(2000)

    def __init__(self, pin=None):
        if pin is None:
            self._pin = Pin('D38', Pin.IN)
        else:
            self._pin = pin
        self._handler_ref = self._handler
        self._counter = 0
        self._ticks_buf = [0 for _ in range(21)]
        self._heart_rate = None

    def start_measure(self):
        self._pin.irq(self._interruptr, Pin.IRQ_RISING)

    def stop_measure(self):
        self._pin.irq()

    def value(self):
        return self._heart_rate

    def _interruptr(self, pin):
        schedule(self._handler_ref, 0)

    def _handler(self, arg):
        self._ticks_buf[self._counter] = ticks_ms()
        if self._counter == 0:
        sub = ticks_diff(self._ticks_buf[self._counter],
                             self._ticks_buf[20])
        else:
        sub = ticks_diff(self._ticks_buf[self._counter],
                             self._ticks_buf[self._counter-1])
        if sub > HeartRateSensor.MAX_HEARTPLUSE_DUTY:
            # set 2 seconds as max heart pluse duty
        self._counter = 0
            for i in range(20):
                self._ticks_buf[i] = 0
            self._ticks_buf[20] = ticks_ms()
            return

        if self._counter == 20:
        self._counter = 0
            self._heart_rate = (1200000 //    # 60*20*1000/20_total_time 
                                ticks_diff(self._ticks_buf[20],
                                           self._ticks_buf[0]))
        else:
            self._counter += 1

この heartrate.py を使って、3分おきに心拍数を計測して SORACOM Hervest にデータをあげるスクリプトが以下になります。

from wiolte import wiolte
from wiortc import WioRTC
from heartrate import HeartRateSensor
import uasyncio as asyncio
from machine import Pin, I2C
import micropython
import time

BOOT_INTERVAL = const(180)  # [sec.]

# Initialize Wio LTE module
wiolte.initialize()
wiolte.set_grove_power(True)
time.sleep_ms(500)

print("Please ready your chest belt.")
time.sleep(5)

heartrate = HeartRateSensor()
heartrate.start_measure()
asyncio.sleep_ms(20000)

i2c = I2C('I2C')

# Initialize Wio Ext. RTC
rtc = WioRTC(i2c)
rtc.begin()

#oled = SSD1306_I2C(128, 64, i2c)
#oled.text('Heart Rate:', 0, 0)
#oled.poweron()

# Initialize LTE modem
m = wiolte.get_comm()
m.initialize()
time.sleep_ms(500)
m.set_supply_power(True)
time.sleep_ms(500)

async def main_task():
    try:
        # Wait until the LTE module gets ready to communicate.
        while not await m.turn_on_or_reset():
            await asyncio.sleep_ms(1000)

        print('LTE connection is now available.')
        rssi = await m.get_RSSI()
        print('RSSI: {}'.format(str(rssi)))

        # Activate LTE network.
        while not await m.activate('soracom.io', 'sora', 'sora', timeout=5000):
            pass
        print('LTE network has been activated.')

        buffer = bytearray(1024)
        connected = False
        # Connect to SORACOM Harvest endpoint.
        while not connected:
            conn = await m.socket_open('harvest.soracom.io', 8514, m.SOCKET_UDP)
            print('Connection to SORACOM Harvest = {0}'.format(conn))
            if conn is None:
                # Failed to connect. Retry after 10[s]
                await asyncio.sleep_ms(10000)
                continue
            connected = True

        if m.socket_is_connected(conn):
            value =  heartrate.value()
            if value is not None:
                # Construct data to transmit to SORACOM Harvest
                payload = '{{"hrate":{:d}}}\n'.format(value)
                payload_bytes = bytes(payload, 'utf-8')

                # Transmit data
                print("Send: {}".format(payload))
                if await m.socket_send(conn, payload_bytes, length=len(payload_bytes), timeout=5000):
                    # Receive response.
                    length = await m.socket_receive(conn, buffer, timeout=5000)
                    if length is None:
                        print('ERROR: invalid response')
                else:
                    print('ERROR: send failed')

        await m.socket_close(conn)
        rtc.set_wakeup_period(BOOT_INTERVAL)
        rtc.shutdown()
    finally:
        await asyncio.sleep_ms(BOOT_INTERVAL * 1000)

loop = asyncio.get_event_loop()
loop.run_until_complete(main_task())

heartrate.stop_measure()
wiolte.set_grove_power(False)

以下は Hearvest にあげたデータの例です。

image.png

これでは、単にデータをあげてグラフ化できるだけです。心拍数が高すぎる/低すぎるときにアラートをあげてもらいものです。このためには SORACOM Lagoon が使えます。

image.png

SORACOM Lagoon にはアラート設定があるので、条件を設定してメールなどでアラームをあげることができます。

image.png

1
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
inachi

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
1
Help us understand the problem. What is going on with this article?