概要
M5Stack UIFlow (最新版は 1.7.2) で BLE が使えるようになっていますが、Blockly で用意されているのは Peripheral 側のみ。それでも MicroPython としては Central 側の実装も可能なので、手持ちの M5Stack Fire と Core2 で BLE UART による簡単な相互連携を行ってみました。
-
M5Stack Fire 側は Peripheral として Blockly で実装
-
M5Stack Core2 側は Central として MicroPython で実装
Central 側の実装はほとんど MicroPython 公式のサンプルコード ble_simple_central.py の流用です。 -
ボタン押下で相手に文字を送信し、受信した文字を表示
今回、最初は Peripheral 側を Arduino で実装したもので試していたため、Arduino と MicroPython の BLE 実装の違いでいくつか悩まされた点がありました。
-
Arduino が Peripheral の場合は Notify を Enable してやる必要がある
MicroPython の BLE では何もしなくても Notify が送信されてくる -
現在の UIFlow(v1.7.2) の MicroPython は 1.12 で BLE の scan で Active Scan が行えない
Passive Scan の場合、アドバタイズパケットに名前が含まれない場合は名前指定ので接続ができない
Active Scan を行えれば、その応答の Scan Response に名前が含まれている
MicroPython は 1.13 から Active Scan をサポート
ESP32 BLE Arduino の場合、デフォルトでは名前が含まれず setScanResponse(false) を呼び出すことで含まれた
NimBLE-Arduino の場合、余裕があれば名前がアドバタイズパケットに含まれる
環境
- M5Stack Fire (UIFlow 1.7.2)
- M5Stack Core2 (UIFlow 1.7.2)
実行結果
#M5stack Fire と Core2 を UIFlow のBLEで相互連携。
— 稲澤祐一 (@inasawa) January 26, 2021
Fire側がPeripheralでこちらはBLE UARTのBlockyだけで作ったけど Core2側はCentralでMicroPythonでゴリゴリ。と言っても https://t.co/Fol3IxML7k をベースにちょっと直しただけですが。 pic.twitter.com/NDkb6zDt6U
プログラム
Peripheral 側 UIFlow の Blockly で作成したプログラム
生成された MicroPython コード(M5Stack Fire用)
from m5stack import *
from m5ui import *
from uiflow import *
from ble import ble_uart
setScreenColor(0x222222)
label0 = M5TextBox(140, 85, "-", lcd.FONT_DejaVu72, 0xFFFFFF, rotate=0)
def on_recv():
# global params
label0.setText(str((uart_ble.read()).decode()))
pass
def buttonA_wasPressed():
# global params
label0.setText('A')
uart_ble.write('A')
pass
btnA.wasPressed(buttonA_wasPressed)
def buttonB_wasPressed():
# global params
label0.setText('B')
uart_ble.write('B')
pass
btnB.wasPressed(buttonB_wasPressed)
def buttonC_wasPressed():
# global params
label0.setText('C')
uart_ble.write('C')
pass
btnC.wasPressed(buttonC_wasPressed)
uart_ble = ble_uart.init('BLE UART Demo')
uart_ble.irq(on_recv)
Central 側 MicroPython で作成したプログラム
MicroPython 公式のサンプルコード ble_simple_central.py からの変更点
- 接続対象を名前で指定可能
- Active Scan に対応した時のために Scan Response に名前が含まれるケースに対応
- Scan の時間を指定可能
- 切断時の Callback 指定を追加
- 調査用に Descriptor の出力を追加
- Notify を Enable にする処理を追加
- connect でアドレス指定で接続する場合にアドレスタイプが 0 (パブリック・デバイス・アドレス) の場合に接続できないバグ?の対応
# This example finds and connects to a peripheral running the
# UART service (e.g. ble_simple_peripheral.py).
import bluetooth
import random
import struct
import time
import micropython
import ubinascii
try:
from ble_advertising import decode_services, decode_name
except:
from ble.ble_advertising import decode_services, decode_name
from micropython import const
_IRQ_CENTRAL_CONNECT = const(1)
_IRQ_CENTRAL_DISCONNECT = const(2)
_IRQ_GATTS_WRITE = const(3)
_IRQ_GATTS_READ_REQUEST = const(4)
_IRQ_SCAN_RESULT = const(5)
_IRQ_SCAN_DONE = const(6)
_IRQ_PERIPHERAL_CONNECT = const(7)
_IRQ_PERIPHERAL_DISCONNECT = const(8)
_IRQ_GATTC_SERVICE_RESULT = const(9)
_IRQ_GATTC_SERVICE_DONE = const(10)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(11)
_IRQ_GATTC_CHARACTERISTIC_DONE = const(12)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(13)
_IRQ_GATTC_DESCRIPTOR_DONE = const(14)
_IRQ_GATTC_READ_RESULT = const(15)
_IRQ_GATTC_READ_DONE = const(16)
_IRQ_GATTC_WRITE_DONE = const(17)
_IRQ_GATTC_NOTIFY = const(18)
_IRQ_GATTC_INDICATE = const(19)
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)
_SCAN_RSP = const(0x04)
_UART_SERVICE_UUID = bluetooth.UUID("6E400001-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_RX_CHAR_UUID = bluetooth.UUID("6E400002-B5A3-F393-E0A9-E50E24DCCA9E")
_UART_TX_CHAR_UUID = bluetooth.UUID("6E400003-B5A3-F393-E0A9-E50E24DCCA9E")
class BLEUARTCentral:
def __init__(self, ble, debug=False):
self._ble = ble
self._ble.active(True)
self._ble.irq(self._irq)
self._debug = debug
self._reset()
def _reset(self):
# Cached name and address from a successful scan.
self._name = None
self._addr_type = None
self._addr = None
# Callbacks for completion of various operations.
# These reset back to None after being invoked.
self._scan_callback = None
self._conn_callback = None
self._read_callback = None
# Persistent callback for when new data is notified from the device.
self._notify_callback = None
# Connected device.
self._conn_handle = None
self._start_handle = None
self._end_handle = None
self._tx_handle = None
self._rx_handle = None
self._target_name = None
self._scanning = False
def _irq(self, event, data):
#print(event, data)
if event == _IRQ_SCAN_RESULT:
addr_type, addr, adv_type, rssi, adv_data = data
if self._debug: print(event, addr_type, ubinascii.hexlify(addr), adv_type, decode_name(adv_data), rssi, ubinascii.hexlify(adv_data))
if adv_type in (_ADV_IND, _ADV_DIRECT_IND, _SCAN_RSP) and (
(self._target_name and self._target_name == decode_name(adv_data)) or
(not self._target_name and _UART_SERVICE_UUID in decode_services(adv_data))
):
# Found a potential device, remember it and stop scanning.
self._addr_type = addr_type
self._addr = bytes(
addr
) # Note: addr buffer is owned by caller so need to copy it.
self._name = decode_name(adv_data) or "?"
self._ble.gap_scan(None)
elif event == _IRQ_SCAN_DONE:
if self._scan_callback:
if self._addr:
# Found a device during the scan (and the scan was explicitly stopped).
self._scan_callback(self._addr_type, self._addr, self._name)
self._scan_callback = None
else:
# Scan timed out.
self._scan_callback(None, None, None)
elif event == _IRQ_PERIPHERAL_CONNECT:
# Connect successful.
conn_handle, addr_type, addr = data
if addr_type == self._addr_type and addr == self._addr:
self._conn_handle = conn_handle
self._ble.gattc_discover_services(self._conn_handle)
elif event == _IRQ_PERIPHERAL_DISCONNECT:
# Disconnect (either initiated by us or the remote end).
conn_handle, _, _ = data
if conn_handle == self._conn_handle:
# If it was initiated by us, it'll already be reset.
self._reset()
if self._disconnect_callback:
self._disconnect_callback()
elif event == _IRQ_GATTC_SERVICE_RESULT:
# Connected device returned a service.
conn_handle, start_handle, end_handle, uuid = data
if self._debug: print("service", data)
if conn_handle == self._conn_handle and uuid == _UART_SERVICE_UUID:
self._start_handle, self._end_handle = start_handle, end_handle
elif event == _IRQ_GATTC_SERVICE_DONE:
# Service query complete.
conn_handle, status = data
if self._debug: print("service done", data)
if self._start_handle and self._end_handle:
self._ble.gattc_discover_characteristics(
self._conn_handle, self._start_handle, self._end_handle
)
else:
print("Failed to find uart service.")
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
# Connected device returned a characteristic.
conn_handle, def_handle, value_handle, properties, uuid = data
if self._debug: print("characteristic", data)
if conn_handle == self._conn_handle and uuid == _UART_RX_CHAR_UUID:
self._rx_handle = value_handle
if conn_handle == self._conn_handle and uuid == _UART_TX_CHAR_UUID:
self._tx_handle = value_handle
elif event == _IRQ_GATTC_CHARACTERISTIC_DONE:
# Characteristic query complete.
conn_handle, status = data
if self._debug: print("characteristic done", data)
if self._tx_handle is not None and self._rx_handle is not None:
# We've finished connecting and discovering device, fire the connect callback.
if self._conn_callback:
self._conn_callback()
self._ble.gattc_discover_descriptors(
self._conn_handle, self._start_handle, self._end_handle
)
else:
print("Failed to find uart rx characteristic.")
elif event == _IRQ_GATTC_DESCRIPTOR_RESULT:
# Connected device returned a descriptor.
conn_handle, dsc_handle, uuid = data
if self._debug: print("descriptor", data)
elif event == _IRQ_GATTC_DESCRIPTOR_DONE:
# Descriptor query complete.
conn_handle, status = data
if self._debug: print("descriptor done", data)
elif event == _IRQ_GATTC_WRITE_DONE:
conn_handle, value_handle, status = data
if self._debug: print("TX complete")
elif event == _IRQ_GATTC_NOTIFY:
conn_handle, value_handle, notify_data = data
if conn_handle == self._conn_handle and value_handle == self._tx_handle:
if self._notify_callback:
self._notify_callback(notify_data)
# Returns true if we've successfully connected and discovered characteristics.
def is_connected(self):
return (
self._conn_handle is not None
and self._tx_handle is not None
and self._rx_handle is not None
)
# Find a device advertising the environmental sensor service.
#def scan(self, callback=None):
def scan(self, addr=None, name=None, duration=2000, callback=None):
self._addr_type = None
self._addr = None
self._target_name = name
self._scan_callback = callback
self._scanning = True
#self._ble.gap_scan(2000, 30000, 30000)
self._ble.gap_scan(duration, 30000, 30000)
# Connect to the specified device (otherwise use cached address from a scan).
def connect(self, addr_type=None, addr=None, callback=None):
self._addr_type = addr_type if addr_type is not None else self._addr_type
self._addr = addr or self._addr
self._conn_callback = callback
if self._addr_type is None or self._addr is None:
return False
self._ble.gap_connect(self._addr_type, self._addr)
return True
# Disconnect from current device.
def disconnect(self):
if not self._conn_handle:
return
self._ble.gap_disconnect(self._conn_handle)
self._reset()
def on_disconnect(self, callback=None):
self._disconnect_callback = callback
# Send data over the UART
def write(self, v, response=False):
if not self.is_connected():
return
self._ble.gattc_write(self._conn_handle, self._rx_handle, v, 1 if response else 0)
# Set handler for when data is received over the UART.
def on_notify(self, callback):
self._notify_callback = callback
def enable_notify(self, enable):
if self.is_connected():
self._ble.gattc_write(self._conn_handle, self._tx_handle + 1, struct.pack('<h', enable), 1) # b'\x01\x00'
def demo():
screen = M5Screen()
screen.clean_screen()
screen.set_screen_bg_color(0xFFFFFF)
label0 = M5Label('-', x=140, y=100, color=0x000, font=FONT_MONT_48, parent=None)
label1 = M5Label('disconnected', x=75, y=50, color=0x000, font=FONT_MONT_24, parent=None)
def buttonA_wasPressed():
# global params
label0.set_text('X')
central.write('X', False)
pass
btnA.wasPressed(buttonA_wasPressed)
def buttonB_wasPressed():
# global params
label0.set_text('Y')
central.write('Y', False)
pass
btnB.wasPressed(buttonB_wasPressed)
def buttonC_wasPressed():
# global params
label0.set_text('Z')
central.write('Z', False)
pass
btnC.wasPressed(buttonC_wasPressed)
def on_connect():
label1.set_text('connected')
central.on_notify(callback=on_notify)
central.enable_notify(1)
def on_disconnect():
label1.set_text('disconnected')
def on_scan(addr_type, addr, name):
if addr_type is not None:
print("on_scan -- Found peripheral:", addr_type, ubinascii.hexlify(addr), name)
central.connect(callback=on_connect)
else:
print("on_scan -- No peripheral found.")
def on_notify(data):
label0.set_text(data.decode())
ble = bluetooth.BLE()
central = BLEUARTCentral(ble, debug=True)
central.on_disconnect(callback=on_disconnect)
central.scan(name='BLE UART Demo', callback=on_scan)
#central.connect(0, ubinascii.unhexlify(b'xxxxxxxxxxxx'), on_connect)
# UIFlow から動かした場合 __name__ は "flow.m5cloud" になる
# if __name__ == "__main__":
if __name__ == "flow.m5cloud":
from m5stack import *
from m5stack_ui import *
from uiflow import *
demo()