LoginSignup
3
7

More than 3 years have passed since last update.

M5Stack UIFlow で BLE UART 相互連携

Last updated at Posted at 2021-02-14

概要

M5Stack UIFlow (最新版は 1.7.2) で BLE が使えるようになっていますが、Blockly で用意されているのは Peripheral 側のみ。それでも MicroPython としては Central 側の実装も可能なので、手持ちの M5Stack Fire と Core2 で BLE UART による簡単な相互連携を行ってみました。

  • M5Stack Fire 側は Peripheral として Blockly で実装
  • M5Stack Core2 側は Central として MicroPython で実装
    Central 側の実装はほとんど MicroPython 公式のサンプルコード ble_simple_central.py の流用です。

  • ボタン押下で相手に文字を送信し、受信した文字を表示

今回、最初は Peripheral 側を Arduino で実装したもので試していたため、Arduino と MicroPython の BLE 実装の違いでいくつか悩まされた点がありました。

  • Arduino が Peripheral の場合は Notify を Enable してやる必要がある
    MicroPython の BLE では何もしなくても Notify が送信されてくる

  • 現在の UIFlow(v1.7.2) の MicroPython は 1.12 で BLE の scan で Active Scan が行えない
    Passive Scan の場合、アドバタイズパケットに名前が含まれない場合は名前指定ので接続ができない
    Active Scan を行えれば、その応答の Scan Response に名前が含まれている
    MicroPython は 1.13 から Active Scan をサポート
    ESP32 BLE Arduino の場合、デフォルトでは名前が含まれず setScanResponse(false) を呼び出すことで含まれた
    NimBLE-Arduino の場合、余裕があれば名前がアドバタイズパケットに含まれる

環境

  • M5Stack Fire (UIFlow 1.7.2)
  • M5Stack Core2 (UIFlow 1.7.2)

実行結果

プログラム

Peripheral 側 UIFlow の Blockly で作成したプログラム

スクリーンショット 2021-02-14 16.15.51.png

生成された MicroPython コード(M5Stack Fire用)

from m5stack import *
from m5ui import *
from uiflow import *
from ble import ble_uart

setScreenColor(0x222222)

label0 = M5TextBox(140, 85, "-", lcd.FONT_DejaVu72, 0xFFFFFF, rotate=0)

def on_recv():
  # global params
  label0.setText(str((uart_ble.read()).decode()))
  pass

def buttonA_wasPressed():
  # global params
  label0.setText('A')
  uart_ble.write('A')
  pass
btnA.wasPressed(buttonA_wasPressed)

def buttonB_wasPressed():
  # global params
  label0.setText('B')
  uart_ble.write('B')
  pass
btnB.wasPressed(buttonB_wasPressed)

def buttonC_wasPressed():
  # global params
  label0.setText('C')
  uart_ble.write('C')
  pass
btnC.wasPressed(buttonC_wasPressed)

uart_ble = ble_uart.init('BLE UART Demo')
uart_ble.irq(on_recv)

Central 側 MicroPython で作成したプログラム

MicroPython 公式のサンプルコード ble_simple_central.py からの変更点

  • 接続対象を名前で指定可能
  • Active Scan に対応した時のために Scan Response に名前が含まれるケースに対応
  • Scan の時間を指定可能
  • 切断時の Callback 指定を追加
  • 調査用に Descriptor の出力を追加
  • Notify を Enable にする処理を追加
  • connect でアドレス指定で接続する場合にアドレスタイプが 0 (パブリック・デバイス・アドレス) の場合に接続できないバグ?の対応
# This example finds and connects to a peripheral running the
# UART service (e.g. ble_simple_peripheral.py).

import bluetooth
import random
import struct
import time
import micropython
import ubinascii

try:
    from ble_advertising import decode_services, decode_name
except:
    from ble.ble_advertising import decode_services, decode_name

from micropython import const

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)

_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)
_SCAN_RSP = const(0x04)

_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")


class BLEUARTCentral:
    def __init__(self, ble, debug=False):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        self._debug = debug
        self._reset()

    def _reset(self):
        # Cached name and address from a successful scan.
        self._name = None
        self._addr_type = None
        self._addr = None

        # Callbacks for completion of various operations.
        # These reset back to None after being invoked.
        self._scan_callback = None
        self._conn_callback = None
        self._read_callback = None

        # Persistent callback for when new data is notified from the device.
        self._notify_callback = None

        # Connected device.
        self._conn_handle = None
        self._start_handle = None
        self._end_handle = None
        self._tx_handle = None
        self._rx_handle = None

        self._target_name = None
        self._scanning = False

    def _irq(self, event, data):
        #print(event, data)
        if event == _IRQ_SCAN_RESULT:
            addr_type, addr, adv_type, rssi, adv_data = data
            if self._debug: print(event, addr_type, ubinascii.hexlify(addr), adv_type, decode_name(adv_data), rssi, ubinascii.hexlify(adv_data))
            if adv_type in (_ADV_IND, _ADV_DIRECT_IND, _SCAN_RSP) and (
                (self._target_name and self._target_name == decode_name(adv_data)) or
                (not self._target_name and _UART_SERVICE_UUID in decode_services(adv_data))
              ):
                # Found a potential device, remember it and stop scanning.
                self._addr_type = addr_type
                self._addr = bytes(
                    addr
                )  # Note: addr buffer is owned by caller so need to copy it.
                self._name = decode_name(adv_data) or "?"
                self._ble.gap_scan(None)

        elif event == _IRQ_SCAN_DONE:
            if self._scan_callback:
                if self._addr:
                    # Found a device during the scan (and the scan was explicitly stopped).
                    self._scan_callback(self._addr_type, self._addr, self._name)
                    self._scan_callback = None
                else:
                    # Scan timed out.
                    self._scan_callback(None, None, None)

        elif event == _IRQ_PERIPHERAL_CONNECT:
            # Connect successful.
            conn_handle, addr_type, addr = data
            if addr_type == self._addr_type and addr == self._addr:
                self._conn_handle = conn_handle
                self._ble.gattc_discover_services(self._conn_handle)

        elif event == _IRQ_PERIPHERAL_DISCONNECT:
            # Disconnect (either initiated by us or the remote end).
            conn_handle, _, _ = data
            if conn_handle == self._conn_handle:
                # If it was initiated by us, it'll already be reset.
                self._reset()
            if self._disconnect_callback:
                self._disconnect_callback()

        elif event == _IRQ_GATTC_SERVICE_RESULT:
            # Connected device returned a service.
            conn_handle, start_handle, end_handle, uuid = data
            if self._debug: print("service", data)
            if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID:
                self._start_handle, self._end_handle = start_handle, end_handle

        elif event == _IRQ_GATTC_SERVICE_DONE:
            # Service query complete.
            conn_handle, status = data
            if self._debug: print("service done", data)
            if self._start_handle and self._end_handle:
                self._ble.gattc_discover_characteristics(
                    self._conn_handle, self._start_handle, self._end_handle
                )
            else:
                print("Failed to find uart service.")

        elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
            # Connected device returned a characteristic.
            conn_handle, def_handle, value_handle, properties, uuid = data
            if self._debug: print("characteristic", data)
            if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID:
                self._rx_handle = value_handle
            if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID:
                self._tx_handle = value_handle

        elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
            # Characteristic query complete.
            conn_handle, status = data
            if self._debug: print("characteristic done", data)
            if self._tx_handle is not None and self._rx_handle is not None:
                # We've finished connecting and discovering device, fire the connect callback.
                if self._conn_callback:
                    self._conn_callback()
                self._ble.gattc_discover_descriptors(
                    self._conn_handle, self._start_handle, self._end_handle
                )
            else:
                print("Failed to find uart rx characteristic.")

        elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
            # Connected device returned a descriptor.
            conn_handle, dsc_handle, uuid = data
            if self._debug: print("descriptor", data)

        elif event == _IRQ_GATTC_DESCRIPTOR_DONE:
            # Descriptor query complete.
            conn_handle, status = data
            if self._debug: print("descriptor done", data)

        elif event == _IRQ_GATTC_WRITE_DONE:
            conn_handle, value_handle, status = data
            if self._debug: print("TX complete")

        elif event == _IRQ_GATTC_NOTIFY:
            conn_handle, value_handle, notify_data = data
            if conn_handle == self._conn_handle and value_handle == self._tx_handle:
                if self._notify_callback:
                    self._notify_callback(notify_data)

    # Returns true if we've successfully connected and discovered characteristics.
    def is_connected(self):
        return (
            self._conn_handle is not None
            and self._tx_handle is not None
            and self._rx_handle is not None
        )

    # Find a device advertising the environmental sensor service.
    #def scan(self, callback=None):
    def scan(self, addr=None, name=None, duration=2000, callback=None):
        self._addr_type = None
        self._addr = None
        self._target_name = name
        self._scan_callback = callback
        self._scanning = True
        #self._ble.gap_scan(2000, 30000, 30000)
        self._ble.gap_scan(duration, 30000, 30000)

    # Connect to the specified device (otherwise use cached address from a scan).
    def connect(self, addr_type=None, addr=None, callback=None):
        self._addr_type = addr_type if addr_type is not None else self._addr_type
        self._addr = addr or self._addr
        self._conn_callback = callback
        if self._addr_type is None or self._addr is None:
            return False
        self._ble.gap_connect(self._addr_type, self._addr)
        return True

    # Disconnect from current device.
    def disconnect(self):
        if not self._conn_handle:
            return
        self._ble.gap_disconnect(self._conn_handle)
        self._reset()

    def on_disconnect(self, callback=None):
        self._disconnect_callback = callback

    # Send data over the UART
    def write(self, v, response=False):
        if not self.is_connected():
            return
        self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0)

    # Set handler for when data is received over the UART.
    def on_notify(self, callback):
        self._notify_callback = callback

    def enable_notify(self, enable):
        if self.is_connected():
            self._ble.gattc_write(self._conn_handle, self._tx_handle + 1, struct.pack('<h', enable), 1) # b'\x01\x00'


def demo():
    screen = M5Screen()
    screen.clean_screen()
    screen.set_screen_bg_color(0xFFFFFF)

    label0 = M5Label('-', x=140, y=100, color=0x000, font=FONT_MONT_48, parent=None)
    label1 = M5Label('disconnected', x=75, y=50, color=0x000, font=FONT_MONT_24, parent=None)

    def buttonA_wasPressed():
      # global params
      label0.set_text('X')
      central.write('X', False)
      pass
    btnA.wasPressed(buttonA_wasPressed)

    def buttonB_wasPressed():
      # global params
      label0.set_text('Y')
      central.write('Y', False)
      pass
    btnB.wasPressed(buttonB_wasPressed)

    def buttonC_wasPressed():
      # global params
      label0.set_text('Z')
      central.write('Z', False)
      pass
    btnC.wasPressed(buttonC_wasPressed)

    def on_connect():
        label1.set_text('connected')
        central.on_notify(callback=on_notify)
        central.enable_notify(1)

    def on_disconnect():
        label1.set_text('disconnected')

    def on_scan(addr_type, addr, name):
        if addr_type is not None:
            print("on_scan -- Found peripheral:", addr_type, ubinascii.hexlify(addr), name)
            central.connect(callback=on_connect)
        else:
            print("on_scan -- No peripheral found.")

    def on_notify(data):
        label0.set_text(data.decode())

    ble = bluetooth.BLE()
    central = BLEUARTCentral(ble, debug=True)

    central.on_disconnect(callback=on_disconnect)
    central.scan(name='BLE UART Demo', callback=on_scan)
    #central.connect(0, ubinascii.unhexlify(b'xxxxxxxxxxxx'), on_connect)

#UIFlow から動かした場合 __name__ は "flow.m5cloud" になる
#if __name__ == "__main__":
if __name__ == "flow.m5cloud":
    from m5stack import *
    from m5stack_ui import *
    from uiflow import *

    demo()
3
7
3

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
7