2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

M5Stack UIFlow MicroPython でダイソーのリモートシャッターを接続

Last updated at Posted at 2021-02-18

概要

M5Stack UIFlow (最新版は 1.7.2) で(Fire/Core2限定ですが)BLE が使えるようになったので、いろいろ接続確認。今回はダイソーのリモートシャッターを接続し、ボタンを認識する例です。リソースが厳しいのか、接続時に全Characteristicや全Descriptorの確認等をやっていたらリセットされたりハングしたりしたので、最低限の処理にしています。

環境

  • M5Stack Core2 (UIFlow 1.7.2)

実行結果

プログラム

プログラムの bluetooth 関連の実装は、公式 MicroPython の例 examples/bluetooth を参考にしています。

2021-03-08 更新

  • 前の版で Notify Enable を送信する前に wait しないとリセットされていたのは CHARACTERISTIC_DONE を受信前に送信していたのが原因だったようなので CHARACTERISTIC_DONE 受信後に行うように修正
  • CHARACTERISTIC_DONE 受信後に Notify Enable を送信することでリセットされなくなったが、Enable にならない(ボタンを押してもデータが送信されてこない)現象だったため、gattc_discover_descriptors() の呼び出しを追加し DESCRIPTOR_DONE 受信後に Notify Enable を送信するように修正。本来は Report の Value Handle + 1 ではなく 0x2902 の Descriptor の Handle に対して Notify Enable を送信すべきもののようだが、Characteristic との紐付けが面倒なのでそこまでの対応は無し
  • gap_connect() の呼び出しを SCAN_DONE 受信後に移動
  • 接続処理完了(DESCRIPTOR_DONE受信)まで待つ処理を追加
  • 切断後は再接続するループに変更
import bluetooth
import struct
import time
import ubinascii
from ble.ble_advertising import decode_services, decode_name
from micropython import const

_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)

_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)

_HID_Service_UUID = bluetooth.UUID(0x1812)
_Report_UUID = bluetooth.UUID(0x2a4d)


class BLECentral:
    def __init__(self, ble):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)

        self._reset()

    def _reset(self):
        # Cached name and address from a successful scan.
        self._name = None
        self._addr_type = None
        self._addr = None

        # Persistent callback for when new data is notified from the device.
        self._notify_callback = None

        # Connected device.
        self._conn_handle = None
        self._start_handle = None
        self._end_handle = None
        self._report_handle_1 = None
        self._report_handle_2 = None
        self._Android_button = False
        
        self._connecting = False

    def _irq(self, event, data):
        if event == _IRQ_SCAN_RESULT:
            addr_type, addr, adv_type, rssi, adv_data = data
            if adv_type in (_ADV_IND, _ADV_DIRECT_IND) and decode_name(adv_data).strip() == 'AB Shutter3':
                # Found a potential device, remember it and stop scanning.
                self._addr_type = addr_type
                self._addr = bytes(addr)  # Note: addr buffer is owned by caller so need to copy it.
                self._name = decode_name(adv_data)
                self._ble.gap_scan(None)
                #
                #SCAN_DONE 受信後に呼び出すように修正
                #self._ble.gap_connect(self._addr_type, self._addr)
                
        elif event == _IRQ_SCAN_DONE:
            if self._addr is not None:
                self._ble.gap_connect(self._addr_type, self._addr)
            else:
                self._connecting = False
            
        elif event == _IRQ_PERIPHERAL_CONNECT:
            # Connect successful.
            conn_handle, addr_type, addr = data
            if addr_type == self._addr_type and addr == self._addr:
                self._conn_handle = conn_handle
                self._ble.gattc_discover_services(self._conn_handle)

        elif event == _IRQ_PERIPHERAL_DISCONNECT:
            # Disconnect (either initiated by us or the remote end).
            conn_handle, _, _ = data
            if conn_handle == self._conn_handle:
                # If it was initiated by us, it'll already be reset.
                self._reset()

        elif event == _IRQ_GATTC_SERVICE_RESULT:
            # Connected device returned a service.
            conn_handle, start_handle, end_handle, uuid = data
            if conn_handle == self._conn_handle and uuid == _HID_Service_UUID:
                self._start_handle, self._end_handle = start_handle, end_handle

        elif event == _IRQ_GATTC_SERVICE_DONE:
            # Service query complete.
            if self._start_handle and self._end_handle:
                self._ble.gattc_discover_characteristics(
                    self._conn_handle, self._start_handle, self._end_handle
                )
            else:
                print("Failed to find service.")
                self._connecting = False

        elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
            # Connected device returned a characteristic.
            conn_handle, def_handle, value_handle, properties, uuid = data
            if conn_handle == self._conn_handle and uuid == _Report_UUID:
                if not self._report_handle_1:
                    self._report_handle_1 = value_handle
                else:
                    self._report_handle_2 = value_handle

        elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
            conn_handle, status = data
            # Characteristic query complete.
            if self._report_handle_1 and self._report_handle_2:
                self._ble.gattc_discover_descriptors(self._conn_handle, self._start_handle, self._end_handle)
            else:
                print("Failed to find characteristic.")
                self._connecting = False

        elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
            # Connected device returned a descriptor.
            conn_handle, dsc_handle, uuid = data

        elif event == _IRQ_GATTC_DESCRIPTOR_DONE:
            # Descriptor query complete.
            conn_handle, status = data
            self._connecting = False

        elif event == _IRQ_GATTC_NOTIFY:
            # The ble_temperature.py demo periodically notifies its value.
            conn_handle, value_handle, notify_data = data
            if conn_handle == self._conn_handle and (value_handle == self._report_handle_1 or value_handle == self._report_handle_2):
                if self._notify_callback:
                    #
                    # iOS button 押下時、以下の2件受信
                    # value_handle = 19, notify_data = b'\x01\x00'
                    # value_handle = 19, notify_data = b'\x00\x00'
                    #
                    # Android button 押下時、以下の4件受信
                    # value_handle = 23, notify_data = b'\x00\x28'
                    # value_handle = 19, notify_data = b'\x01\x00'
                    # value_handle = 19, notify_data = b'\x00\x00'
                    # value_handle = 23, notify_data = b'\x00\x00'
                    #
                    if notify_data == b'\x01\x00': # iOS button
                        if self._Android_button:
                            self._notify_callback(2)
                        else:
                            self._notify_callback(1)
                        self._Android_button = False
                    elif notify_data == b'\x00\x28': # Android button
                        self._Android_button = True

    # Returns true if we've successfully connected and discovered characteristics.
    def is_connected(self):
        return (
                self._conn_handle is not None
            and self._report_handle_1 is not None
            and self._report_handle_2 is not None
        )

    # Find a device advertising the environmental sensor service.
    def scan(self):
        self._connecting = True
        self._addr_type = None
        self._addr = None
        self._ble.gap_scan(5000, 30000, 30000)
        while self._connecting:
            time.sleep_ms(10)

    # Sets a callback to be invoked when the device notifies us.
    def on_notify(self, callback):
        if self._conn_handle is not None and self._report_handle_1 and self._report_handle_2:
            self._notify_callback = callback
            self._ble.gattc_write(self._conn_handle, self._report_handle_1 + 1, struct.pack('<h', 1), 1)
            self._ble.gattc_write(self._conn_handle, self._report_handle_2 + 1, struct.pack('<h', 1), 1)


def notify(button):
    print(button)
    if button == 1:
      screen.set_screen_bg_color(0xFF0000)
    else:
      screen.set_screen_bg_color(0x0000FF)


from m5stack import *
from m5stack_ui import *
from uiflow import *

screen = M5Screen()
screen.clean_screen()

ble = bluetooth.BLE()
central = BLECentral(ble)

while True:
    screen.set_screen_bg_color(0xFFFF00)
    print("scanning...")
    central.scan()

    if central.is_connected():
        print("Connected")
        central.on_notify(notify)

        while central.is_connected():
            time.sleep_ms(100)

        print("Disconnected")
    else:
        print("Failed to connect")

    time.sleep_ms(5000)

参考

「0x2A4D Report」以外の Characteristics を読んだ時の応答

0x2A4E Protocol Mode
b'01'

0x2A4A HID Information
b'11010001'

0x2A4C HID Control Point
b'00'

0x2A4B Report Map
b'050c0901a101850209e909ea09e209301501250c7510'
# 本来はもっと長いけど単純に READ するだけだと 22 byte までしか読めない
# MicroPython で長いデータを読む手段はないのかな?
# 公式の MicroPython を使った場合でも MTU サイズを変更することはできなかった

USB Descriptor and Request Parser
https://eleccelerator.com/usbdescreqparser/
-----
0x05, 0x0C,        // Usage Page (Consumer)
0x09, 0x01,        // Usage (Consumer Control)
0xA1, 0x01,        // Collection (Application)
0x85, 0x02,        //   Report ID (2)
0x09, 0xE9,        //   Usage (Volume Increment)
0x09, 0xEA,        //   Usage (Volume Decrement)
0x09, 0xE2,        //   Usage (Mute)
0x09, 0x30,        //   Usage (Power)
0x15, 0x01,        //   Logical Minimum (1)
0x25, 0x0C,        //   Logical Maximum (12)
0x75, 0x10,        //   Report Size (16)

// 22 bytes

// best guess: USB HID Report Descriptor
2
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?