はじめに
本記事は、iPadを操作するの2つ目になります。
今回は、ESP32間のBluetoochの接続を行い、ページめくりを実行します。
電子工作でiPadを操作してみる関連の記事
- 電子工作でiPadを操作してみる-1 : タップ
- 電子工作でiPadを操作してみる2 - Bluetooth経由で制御する ← 本記事
- 電子工作でiPadを操作してみる3 - スワイプをやってみる
最終的な目標
- ユーザがスイッチを押す
- ESP32(Periferal)がスイッチを検出し、Bluetooth経由で通知する
- ESP32(Central)が通知を検出し、リレータッチボードをON/OFFする
- iPadがタップを検出する
ESP32 - MicroPython - Bluetooth
今回使用しているESP32用のMicroPythonファームには、
bluetoothのライブラリがポーティングされています。
- MicroPython 1.12(esp32-idf4-20191220-v1.12.bin)
また、bluetooth機能を制御するためのドキュメントは、下記に記載されています。
MicroPythonライブラリ - ubluetooth --- 低レベル Bluetooth
技術的な情報については、[参考]のURLを参照いただいた方がいいかと思います。
今回は、温度センサーの公式のサンプルに手を入れて、動作を確認します。
MicroPython - bluetoothサンプルディレクトリ
使用するファイル
- ble_temperature.py
- periferalのコードのベースとなります
- ble_temperature_central.py
※最新版では、INDICATE(bluetooth.FLAG_INDICATE)の対応しているため、そのままでは動作できません。
今回は、69661f3 のコードをベースに説明します。
サンプルコードを修正する
ペリフェラル側とセントラル側の各コードを修正します。
全体のシーケンス
つくりたいシーケンス図は以下となります。
ペリフェラル側
本家のサンプルプログラムは、Bluetooth接続のセンサーとなり、定期的にデータの読み取りや通知を行うことができます。
これを参考に、スイッチを経由して通知できるように修正します。
回路図
使用するPINは、以下となります。
ESP32 | スイッチ | 備考 |
---|---|---|
15 | スイッチ1 | プルアップ設定で使用 |
25 | スイッチ2 | プルアップ設定で使用 |
回路図は以下となります。
プログラム
ペリフェラル側は、4つのファイルから構成されます。
- ble_temperature.py
- 最初に実行するファイル
- bluetoothのペリフェラル動作
- スイッチイベントを受信すると、notifyを発行
- sw.py
- スイッチイベントを管理
- スイッチを検出すると、mainスレッドへ通知
- utl.py
- soket通信を提供する
- ble_advertising.py
- BLE advertising payloadのヘルパー
- github - bluetoothをそのまま使用
※スレッド間のやりとりは、UDP通信で行っています。
各ソースコードは以下となります。
- ble_temperature.py
# This example demonstrates a simple temperature sensor peripheral.
#
# The sensor's local value updates every second, and it will notify
# any connected central every 10 seconds.
import bluetooth
import random
import struct
import time
import sys
from ble_advertising import advertising_payload
from micropython import const
from machine import Pin
from sw import SwProc
from usocket import socket, AF_INET, SOCK_DGRAM
import ujson as json
from util import CommSnd
from util import HOST
from util import ADDR
from util import PORT_DST_CTL
RCV_BUF_SIZE = 128
_IRQ_CENTRAL_CONNECT = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_TEMP_CHAR = (
bluetooth.UUID(0x2A6E),
bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_ENV_SENSE_SERVICE = (
_ENV_SENSE_UUID,
(_TEMP_CHAR,),
)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
class BLETemperature:
def __init__(self, ble, name="mpy-temp"):
self._ble = ble
self._ble.active(True)
self._ble.irq(handler=self._irq)
((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
self._connections = set()
self._payload = advertising_payload(
name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER
)
self._advertise()
def _irq(self, event, data):
# Track connections so we can send notifications.
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _, = data
self._connections.add(conn_handle)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _, = data
self._connections.remove(conn_handle)
# Start advertising again to allow a new connection.
self._advertise()
def set_temperature(self, temp_deg_c, notify=False):
# root - print("set_temperature:run")
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
# Write the local value, ready for a central to read.
# self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c * 100)))
if notify:
self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c)))
for conn_handle in self._connections:
self._ble.gatts_notify(conn_handle, self._handle)
def _advertise(self, interval_us=500000):
self._ble.gap_advertise(interval_us, adv_data=self._payload)
def main():
# If [tsw_no1] is pressed, exit the program
PIN_SW_NO1 = 15
pin_shut = Pin(PIN_SW_NO1, Pin.IN, Pin.PULL_UP)
if 0 == pin_shut.value():
print("!!!program shutdown!!!")
time.sleep_ms(100)
sys.exit()
# スイッチの開始
sw_proc = SwProc()
sw_proc.run()
# bluetoothの開始
ble = bluetooth.BLE()
temp = BLETemperature(ble)
# 受信準備
s = socket(AF_INET, SOCK_DGRAM)
s.bind((HOST, PORT_DST_CTL))
while True:
msg, address = s.recvfrom(RCV_BUF_SIZE)
d = json.loads(msg.decode())
if d["cmd"] == "sw":
# ex. "no2" => 2
temp.set_temperature(int(d["type"].replace('no', '')), notify=True)
if __name__ == "__main__":
main()
- sw.py
import _thread
import time
from machine import Pin
from util import CommSnd
from util import ADDR
from util import PORT_DST_CTL
PIN_SW_NO1 = 15
PIN_SW_NO2 = 25
TACK_JUDGE = [False, False, True, True]
class TackSwitch:
def __init__(self, pin, mode, push_logic):
if mode == "in":
self.pin = Pin(pin, Pin.IN)
pass
elif mode == "in_pullup":
self.pin = Pin(pin, Pin.IN, Pin.PULL_UP)
else:
raise Exception('')
self.store = []
self.push_logic = push_logic
def read(self):
value = self.pin.value()
if value == self.push_logic:
return True
else:
return False
def read_poll(self):
logic = self.read()
self.store.append(logic)
if len(self.store) < 5:
return False
self.store.pop(0)
if TACK_JUDGE == self.store:
return True
return False
class SwProc():
def __init__(self):
self._cs_ctl = CommSnd(ADDR, PORT_DST_CTL)
self._tsw_no1 = TackSwitch(PIN_SW_NO1, "in_pullup", 0)
self._tsw_no2 = TackSwitch(PIN_SW_NO2, "in_pullup", 0)
return
def _proc_poll(self):
print("_proc_poll - run")
while True:
time.sleep_ms(50)
if self._tsw_no1.read_poll() :
print("push - sw_no1!")
self._cs_ctl.sendto_dict({'cmd': "sw", 'type':'no1'})
if self._tsw_no2.read_poll() :
print("push - sw_no2!")
self._cs_ctl.sendto_dict({'cmd': "sw", 'type':'no2'})
return
def run(self):
_thread.start_new_thread(self._proc_poll, ())
- utl.py
try:
from usocket import socket, AF_INET, SOCK_DGRAM
except:
from socket import socket, AF_INET, SOCK_DGRAM
try:
import ujson as json
except:
import json
HOST = ''
PORT_DST_CTL = 5001
ADDR = '127.0.0.1'
class CommSnd:
def __init__(self, addr, port):
self._addr = addr
self._port = port
self.s = socket(AF_INET, SOCK_DGRAM)
def sendto_dict(self, d):
self.s.sendto(json.dumps(d).encode(), (self._addr, self._port))
def close(self):
self.s.close()
ペリフェラル側の動作確認
iOSアプリ - "BLE Scanner"を使用して確認します。
- ESP32側で、ble_temperature.pyを実行する
MicroPython v1.12 on 2019-12-20; ESP32 module with ESP32
Type "help()" for more information.
>>> exec(open('ble_temperature.py').read())
- "BLE Scanner"を起動する
- 画面が遷移したら、"Service"をタップ
- ESP32側のスイッチを押すと、値が変わることが分かる
以上で、ペリフェラル側の作成は完了です。
セントラル側
本家のサンプルプログラムは、Bluetooth接続のセンサーより定期的に値を取得しています。
これを参考に、上記で作ったスイッチ通知を取得します。
回路図
再掲となりますが、以下となります。
ESP32 | リレータッチボード | 備考 |
---|---|---|
5V | 5V | |
GND | GND | |
26 | EN | |
27 | EN |
- 接続図(ESP32 - Central)
使用するPINは、以下となります。
ESP32 | スイッチ | 備考 |
---|---|---|
15 | スイッチ1 | プルアップ設定で使用 |
25 | スイッチ2 | プルアップ設定で使用 |
プログラム
セントラル側は、3つのファイルから構成されます。
- ble_temperature_central.py
- 最初に実行するファイル
- bluetoothのセントラル動作
- notifyを受信すると、tap処理を行う
- relay.py
- リレータッチボードの管理
- ble_advertising.py
- BLE advertising payloadのヘルパー
- github - bluetoothをそのまま使用
各ソースコードは、以下となります。
- ble_temperature_central.py
# This example finds and connects to a BLE temperature sensor (e.g. the one in ble_temperature.py).
import bluetooth
import random
import struct
import time
import micropython
from ble_advertising import decode_services, decode_name
from micropython import const
from relay import RelayProc
_IRQ_CENTRAL_CONNECT = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
_IRQ_GATTS_WRITE = const(1 << 2)
_IRQ_GATTS_READ_REQUEST = const(1 << 3)
_IRQ_SCAN_RESULT = const(1 << 4)
_IRQ_SCAN_COMPLETE = const(1 << 5)
_IRQ_PERIPHERAL_CONNECT = const(1 << 6)
_IRQ_PERIPHERAL_DISCONNECT = const(1 << 7)
_IRQ_GATTC_SERVICE_RESULT = const(1 << 8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(1 << 9)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(1 << 10)
_IRQ_GATTC_READ_RESULT = const(1 << 11)
_IRQ_GATTC_WRITE_STATUS = const(1 << 12)
_IRQ_GATTC_NOTIFY = const(1 << 13)
_IRQ_GATTC_INDICATE = const(1 << 14)
_IRQ_ALL = const(0xFFFF)
_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_TEMP_UUID = bluetooth.UUID(0x2A6E)
_TEMP_CHAR = (
_TEMP_UUID,
bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_ENV_SENSE_SERVICE = (
_ENV_SENSE_UUID,
(_TEMP_CHAR,),
)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
class BLETemperatureCentral:
def __init__(self, ble):
self._ble = ble
self._ble.active(True)
self._ble.irq(handler=self._irq)
self._reset()
def _reset(self):
# Cached name and address from a successful scan.
self._name = None
self._addr_type = None
self._addr = None
# Cached value (if we have one)
self._value = None
# Callbacks for completion of various operations.
# These reset back to None after being invoked.
self._scan_callback = None
self._conn_callback = None
self._read_callback = None
# Persistent callback for when new data is notified from the device.
self._notify_callback = None
# Connected device.
self._conn_handle = None
self._value_handle = None
self._notify_flag = False
def _irq(self, event, data):
if event == _IRQ_SCAN_RESULT:
print("irq:event == _IRQ_SCAN_RESULT")
addr_type, addr, adv_type, rssi, adv_data = data
if adv_type in (_ADV_IND, _ADV_DIRECT_IND,) and _ENV_SENSE_UUID in decode_services(
adv_data
):
# Found a potential device, remember it and stop scanning.
self._addr_type = addr_type
self._addr = bytes(
addr
) # Note: addr buffer is owned by caller so need to copy it.
self._name = decode_name(adv_data) or "?"
self._ble.gap_scan(None)
elif event == _IRQ_SCAN_COMPLETE:
print("irq:event == _IRQ_SCAN_COMPLETE")
if self._scan_callback:
if self._addr:
# Found a device during the scan (and the scan was explicitly stopped).
self._scan_callback(self._addr_type, self._addr, self._name)
self._scan_callback = None
else:
# Scan timed out.
self._scan_callback(None, None, None)
elif event == _IRQ_PERIPHERAL_CONNECT:
print("irq:event == _IRQ_PERIPHERAL_CONNECT")
# Connect successful.
conn_handle, addr_type, addr, = data
if addr_type == self._addr_type and addr == self._addr:
self._conn_handle = conn_handle
self._ble.gattc_discover_services(self._conn_handle)
elif event == _IRQ_PERIPHERAL_DISCONNECT:
print("irq:event == _IRQ_PERIPHERAL_DISCONNECT")
# Disconnect (either initiated by us or the remote end).
conn_handle, _, _, = data
if conn_handle == self._conn_handle:
# If it was initiated by us, it'll already be reset.
self._reset()
elif event == _IRQ_GATTC_SERVICE_RESULT:
print("irq:event == _IRQ_GATTC_SERVICE_RESULT")
# Connected device returned a service.
conn_handle, start_handle, end_handle, uuid = data
if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID:
self._ble.gattc_discover_characteristics(
self._conn_handle, start_handle, end_handle
)
elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
print("irq:event == _IRQ_GATTC_CHARACTERISTIC_RESULT")
# Connected device returned a characteristic.
conn_handle, def_handle, value_handle, properties, uuid = data
if conn_handle == self._conn_handle and uuid == _TEMP_UUID:
self._value_handle = value_handle
# We've finished connecting and discovering device, fire the connect callback.
if self._conn_callback:
self._conn_callback()
elif event == _IRQ_GATTC_READ_RESULT:
print("irq:event == _IRQ_GATTC_READ_RESULT")
# A read completed successfully.
conn_handle, value_handle, char_data = data
if conn_handle == self._conn_handle and value_handle == self._value_handle:
self._update_value(char_data)
if self._read_callback:
self._read_callback(self._value)
self._read_callback = None
elif event == _IRQ_GATTC_NOTIFY:
print("irq:event == _IRQ_GATTC_NOTIFY")
# The ble_temperature.py demo periodically notifies its value.
conn_handle, value_handle, notify_data = data
print("NOTIFY:", conn_handle, value_handle, notify_data)
if conn_handle == self._conn_handle and value_handle == self._value_handle:
self._update_value(notify_data)
self._notify_flag = True
if self._notify_callback:
print("NOTIFY:if self._notify_callback:")
self._notify_callback(self._value)
# Returns true if we've successfully connected and discovered characteristics.
def is_connected(self):
# print("is_connected:run")
return self._conn_handle is not None and self._value_handle is not None
# Find a device advertising the environmental sensor service.
def scan(self, callback=None):
self._addr_type = None
self._addr = None
self._scan_callback = callback
self._ble.gap_scan(2000, 30000, 30000)
# Connect to the specified device (otherwise use cached address from a scan).
def connect(self, addr_type=None, addr=None, callback=None):
self._addr_type = addr_type or self._addr_type
self._addr = addr or self._addr
self._conn_callback = callback
if self._addr_type is None or self._addr is None:
return False
self._ble.gap_connect(self._addr_type, self._addr)
return True
# Disconnect from current device.
def disconnect(self):
if not self._conn_handle:
return
self._ble.gap_disconnect(self._conn_handle)
self._reset()
# Issues an (asynchronous) read, will invoke callback with data.
def read(self, callback):
if not self.is_connected():
return
self._read_callback = callback
self._ble.gattc_read(self._conn_handle, self._value_handle)
print("read:over")
# Sets a callback to be invoked when the device notifies us.
def on_notify(self, callback):
self._notify_callback = callback
def _update_value(self, data):
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
# self._value = struct.unpack("<h", data)[0] / 100
self._value = struct.unpack("<h", data)[0]
return self._value
def value(self):
# print("value:run")
return self._value
def notify_flag(self):
return self._notify_flag
def disable_notify_flag(self):
self._notify_flag = False
return
def main():
print("main:run")
ble = bluetooth.BLE()
central = BLETemperatureCentral(ble)
not_found = False
def on_scan(addr_type, addr, name):
print("on_scan:run")
if addr_type is not None:
print("Found sensor:", addr_type, addr, name)
central.connect()
else:
nonlocal not_found
not_found = True
print("No sensor found.")
central.scan(callback=on_scan)
# Wait for connection...
while not central.is_connected():
time.sleep_ms(100)
if not_found:
return
print("Connected")
# リレードライバ制御
relay_proc = RelayProc()
# Alternative to the above, just show the most recently notified value.
while central.is_connected():
time.sleep_ms(100)
if central.notify_flag():
central.disable_notify_flag()
value = central.value()
if value < 100:
relay_proc.tap(value, 1, 50)
print("Disconnected")
if __name__ == "__main__":
main()
# pass
- relay.py
import time
from machine import Pin
PIN_RELAY_1 = 27
PIN_RELAY_2 = 26
class RelayProc():
def __init__(self):
self._relays = {}
self._relays[1] = {"pin":Pin(PIN_RELAY_1, Pin.OUT)}
self._relays[2] = {"pin":Pin(PIN_RELAY_2, Pin.OUT)}
return
def tap(self, no, count=1, ht=50):
for i in range(count):
self._relays[no]["pin"].value(True)
time.sleep_ms(ht)
self._relays[no]["pin"].value(False)
time.sleep_ms(ht)
return
セントラル側の動作確認
先ほど作ったペリフェラルと合わせて確認します。
- ESP32 - ペリフェラル側で、ble_temperature.pyを実行する
MicroPython v1.12 on 2019-12-20; ESP32 module with ESP32
Type "help()" for more information.
>>> exec(open('ble_temperature.py').read())
- ESP32 - セントラル側で、ble_temperature_central.pyを実行する
MicroPython v1.12 on 2019-12-20; ESP32 module with ESP32
Type "help()" for more information.
>>> exec(open('ble_temperature_central.py').read())
- ESP32 - ペリフェラル側で、スイッチを押すと、セントラル側でログが出力される
irq:event == _IRQ_GATTC_NOTIFY
NOTIFY: 0 13 b'\x01\x00'
irq:event == _IRQ_GATTC_NOTIFY
NOTIFY: 0 13 b'\x02\x00'
また、スイッチのイベントに応じて、リレースイッチが"カチッ"と音がしていると思います。
以上で、セントラルの動作が完了となります。
iPad - Kindleアプリで動作をみる
必要な要素は全部そろったので、あとはセッティングして確認です。
- セントラル側のリレータッチボードを洗濯ばさみで取り付ける
- ペリフェラル側の電源をいれる
- スイッチは個人的につくった筐体に入れて確認していますが、回路は上記と同じ
- セントラル側の電源を入れる
- ペリフェラル側で、スイッチを押すとページがめくられる
さいごに
- ノートPCで作業をしながら、手元でページ切り替えができました
- 固定できるような治具があれば、さらにいいと考えています