1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

電子工作でiPadを操作してみる2 - Bluetooth経由で制御する

Last updated at Posted at 2020-07-26

はじめに

本記事は、iPadを操作するの2つ目になります。
今回は、ESP32間のBluetoochの接続を行い、ページめくりを実行します。

output2_2.gif

電子工作でiPadを操作してみる関連の記事

最終的な目標

Untitled Diagram-Page-6.png

  1. ユーザがスイッチを押す
  2. ESP32(Periferal)がスイッチを検出し、Bluetooth経由で通知する
  3. ESP32(Central)が通知を検出し、リレータッチボードをON/OFFする
  4. iPadがタップを検出する

ESP32 - MicroPython - Bluetooth

今回使用しているESP32用のMicroPythonファームには、
bluetoothのライブラリがポーティングされています。

  • MicroPython 1.12(esp32-idf4-20191220-v1.12.bin)

また、bluetooth機能を制御するためのドキュメントは、下記に記載されています。

MicroPythonライブラリ - ubluetooth --- 低レベル Bluetooth

技術的な情報については、[参考]のURLを参照いただいた方がいいかと思います。

今回は、温度センサーの公式のサンプルに手を入れて、動作を確認します。

MicroPython - bluetoothサンプルディレクトリ

使用するファイル

  • ble_temperature.py
    • periferalのコードのベースとなります
  • ble_temperature_central.py

※最新版では、INDICATE(bluetooth.FLAG_INDICATE)の対応しているため、そのままでは動作できません。
 今回は、69661f3 のコードをベースに説明します。

サンプルコードを修正する

ペリフェラル側とセントラル側の各コードを修正します。

全体のシーケンス

つくりたいシーケンス図は以下となります。

Untitled Diagram-Page-4.png

ペリフェラル側

本家のサンプルプログラムは、Bluetooth接続のセンサーとなり、定期的にデータの読み取りや通知を行うことができます。
これを参考に、スイッチを経由して通知できるように修正します。

回路図

使用するPINは、以下となります。

ESP32 スイッチ 備考
15 スイッチ1 プルアップ設定で使用
25 スイッチ2 プルアップ設定で使用

回路図は以下となります。

esp_peri.png

プログラム

ペリフェラル側は、4つのファイルから構成されます。

  • ble_temperature.py
    • 最初に実行するファイル
    • bluetoothのペリフェラル動作
    • スイッチイベントを受信すると、notifyを発行
  • sw.py
    • スイッチイベントを管理
    • スイッチを検出すると、mainスレッドへ通知
  • utl.py
    • soket通信を提供する
  • ble_advertising.py

※スレッド間のやりとりは、UDP通信で行っています。

各ソースコードは以下となります。

  • ble_temperature.py
# This example demonstrates a simple temperature sensor peripheral.
#
# The sensor's local value updates every second, and it will notify
# any connected central every 10 seconds.

import bluetooth
import random
import struct
import time
import sys
from ble_advertising import advertising_payload

from micropython import const
from machine import Pin
from sw import SwProc

from usocket import socket, AF_INET, SOCK_DGRAM
import ujson as json
from util import CommSnd
from util import HOST
from util import ADDR
from util import PORT_DST_CTL

RCV_BUF_SIZE = 128

_IRQ_CENTRAL_CONNECT = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)

# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_TEMP_CHAR = (
    bluetooth.UUID(0x2A6E),
    bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_ENV_SENSE_SERVICE = (
    _ENV_SENSE_UUID,
    (_TEMP_CHAR,),
)

# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)


class BLETemperature:
    def __init__(self, ble, name="mpy-temp"):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(handler=self._irq)
        ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
        self._connections = set()
        self._payload = advertising_payload(
            name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER
        )
        self._advertise()

    def _irq(self, event, data):
        # Track connections so we can send notifications.
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _, = data
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _, = data
            self._connections.remove(conn_handle)
            # Start advertising again to allow a new connection.
            self._advertise()

    def set_temperature(self, temp_deg_c, notify=False):
        # root - print("set_temperature:run")
        # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
        # Write the local value, ready for a central to read.
        # self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c * 100)))
        if notify:
            self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c)))
            for conn_handle in self._connections:
                self._ble.gatts_notify(conn_handle, self._handle)

    def _advertise(self, interval_us=500000):
        self._ble.gap_advertise(interval_us, adv_data=self._payload)


def main():

    # If [tsw_no1] is pressed, exit the program
    PIN_SW_NO1 = 15
    pin_shut = Pin(PIN_SW_NO1, Pin.IN, Pin.PULL_UP)
    if 0 == pin_shut.value():
        print("!!!program shutdown!!!")
        time.sleep_ms(100)
        sys.exit()

    # スイッチの開始
    sw_proc = SwProc()
    sw_proc.run()

    # bluetoothの開始
    ble = bluetooth.BLE()
    temp = BLETemperature(ble)

    # 受信準備
    s = socket(AF_INET, SOCK_DGRAM)
    s.bind((HOST, PORT_DST_CTL))

    while True:
        msg, address = s.recvfrom(RCV_BUF_SIZE)
        d = json.loads(msg.decode())

        if d["cmd"] == "sw":
            # ex. "no2" => 2
            temp.set_temperature(int(d["type"].replace('no', '')), notify=True)



if __name__ == "__main__":
    main()

  • sw.py
import _thread
import time
from machine import Pin

from util import CommSnd
from util import ADDR
from util import PORT_DST_CTL

PIN_SW_NO1 = 15
PIN_SW_NO2 = 25

TACK_JUDGE = [False, False, True, True]

class TackSwitch:
    def __init__(self, pin, mode, push_logic):
        if mode == "in":
            self.pin = Pin(pin, Pin.IN)
            pass
        elif mode == "in_pullup":
            self.pin = Pin(pin, Pin.IN, Pin.PULL_UP)
        else:
            raise Exception('')

        self.store = []
        self.push_logic = push_logic


    def read(self):
        value = self.pin.value()
        if value == self.push_logic:
            return True
        else:
            return False


    def read_poll(self):
        logic = self.read()
        self.store.append(logic)
        if len(self.store) < 5:
            return False

        self.store.pop(0)
        if TACK_JUDGE == self.store:
            return True
        
        return False



class SwProc():
    
    def __init__(self):
        self._cs_ctl = CommSnd(ADDR, PORT_DST_CTL)
        self._tsw_no1 = TackSwitch(PIN_SW_NO1, "in_pullup", 0)
        self._tsw_no2 = TackSwitch(PIN_SW_NO2, "in_pullup", 0)
        return


    def _proc_poll(self):
        print("_proc_poll - run")
        while True:
            time.sleep_ms(50)
            if self._tsw_no1.read_poll() :
                print("push - sw_no1!")
                self._cs_ctl.sendto_dict({'cmd': "sw", 'type':'no1'})
            if self._tsw_no2.read_poll() :
                print("push - sw_no2!")
                self._cs_ctl.sendto_dict({'cmd': "sw", 'type':'no2'})
        return

    def run(self):
        _thread.start_new_thread(self._proc_poll, ())


  • utl.py
try:
    from usocket import socket, AF_INET, SOCK_DGRAM
except:
    from socket import socket, AF_INET, SOCK_DGRAM

try:
    import ujson as json
except:
    import json

HOST = ''
PORT_DST_CTL = 5001
ADDR = '127.0.0.1'

class CommSnd:

    def __init__(self, addr, port):
        self._addr = addr
        self._port = port
        self.s = socket(AF_INET, SOCK_DGRAM)


    def sendto_dict(self, d):
        self.s.sendto(json.dumps(d).encode(), (self._addr, self._port))


    def close(self):
        self.s.close()

ペリフェラル側の動作確認

iOSアプリ - "BLE Scanner"を使用して確認します。

  • ESP32側で、ble_temperature.pyを実行する
MicroPython v1.12 on 2019-12-20; ESP32 module with ESP32
Type "help()" for more information.
>>> exec(open('ble_temperature.py').read())

  • "BLE Scanner"を起動する
    • "ESP32"の"Connect"を選択
    • ios_ble_1.PNG
  • 画面が遷移したら、"Service"をタップ
    • ios_ble_2.PNG
  • ESP32側のスイッチを押すと、値が変わることが分かる
    • ios_ble_3.PNG
    • スイッチ1を押した場合、2byteの1を送信している
    • リトルエンディアンのため、0x0100と表示されている

以上で、ペリフェラル側の作成は完了です。

セントラル側

本家のサンプルプログラムは、Bluetooth接続のセンサーより定期的に値を取得しています。
これを参考に、上記で作ったスイッチ通知を取得します。

回路図

再掲となりますが、以下となります。

ESP32 リレータッチボード 備考
5V 5V
GND GND
26 EN
27 EN
  • 接続図(ESP32 - Central)

esp_cen.png

使用するPINは、以下となります。

ESP32 スイッチ 備考
15 スイッチ1 プルアップ設定で使用
25 スイッチ2 プルアップ設定で使用

プログラム

セントラル側は、3つのファイルから構成されます。

  • ble_temperature_central.py
    • 最初に実行するファイル
    • bluetoothのセントラル動作
    • notifyを受信すると、tap処理を行う
  • relay.py
    • リレータッチボードの管理
  • ble_advertising.py

各ソースコードは、以下となります。

  • ble_temperature_central.py
# This example finds and connects to a BLE temperature sensor (e.g. the one in ble_temperature.py).

import bluetooth
import random
import struct
import time
import micropython

from ble_advertising import decode_services, decode_name

from micropython import const

from relay import RelayProc


_IRQ_CENTRAL_CONNECT = const(1 << 0)
_IRQ_CENTRAL_DISCONNECT = const(1 << 1)
_IRQ_GATTS_WRITE = const(1 << 2)
_IRQ_GATTS_READ_REQUEST = const(1 << 3)
_IRQ_SCAN_RESULT = const(1 << 4)
_IRQ_SCAN_COMPLETE = const(1 << 5)
_IRQ_PERIPHERAL_CONNECT = const(1 << 6)
_IRQ_PERIPHERAL_DISCONNECT = const(1 << 7)
_IRQ_GATTC_SERVICE_RESULT = const(1 << 8)
_IRQ_GATTC_CHARACTERISTIC_RESULT = const(1 << 9)
_IRQ_GATTC_DESCRIPTOR_RESULT = const(1 << 10)
_IRQ_GATTC_READ_RESULT = const(1 << 11)
_IRQ_GATTC_WRITE_STATUS = const(1 << 12)
_IRQ_GATTC_NOTIFY = const(1 << 13)
_IRQ_GATTC_INDICATE = const(1 << 14)
_IRQ_ALL = const(0xFFFF)

_ADV_IND = const(0x00)
_ADV_DIRECT_IND = const(0x01)
_ADV_SCAN_IND = const(0x02)
_ADV_NONCONN_IND = const(0x03)

# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_TEMP_UUID = bluetooth.UUID(0x2A6E)
_TEMP_CHAR = (
    _TEMP_UUID,
    bluetooth.FLAG_READ | bluetooth.FLAG_NOTIFY,
)
_ENV_SENSE_SERVICE = (
    _ENV_SENSE_UUID,
    (_TEMP_CHAR,),
)

# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)


class BLETemperatureCentral:
    def __init__(self, ble):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(handler=self._irq)

        self._reset()

    def _reset(self):
        # Cached name and address from a successful scan.
        self._name = None
        self._addr_type = None
        self._addr = None

        # Cached value (if we have one)
        self._value = None

        # Callbacks for completion of various operations.
        # These reset back to None after being invoked.
        self._scan_callback = None
        self._conn_callback = None
        self._read_callback = None

        # Persistent callback for when new data is notified from the device.
        self._notify_callback = None

        # Connected device.
        self._conn_handle = None
        self._value_handle = None

        self._notify_flag = False

    def _irq(self, event, data):
        if event == _IRQ_SCAN_RESULT:
            print("irq:event == _IRQ_SCAN_RESULT")
            addr_type, addr, adv_type, rssi, adv_data = data
            if adv_type in (_ADV_IND, _ADV_DIRECT_IND,) and _ENV_SENSE_UUID in decode_services(
                adv_data
            ):
                # Found a potential device, remember it and stop scanning.
                self._addr_type = addr_type
                self._addr = bytes(
                    addr
                )  # Note: addr buffer is owned by caller so need to copy it.
                self._name = decode_name(adv_data) or "?"
                self._ble.gap_scan(None)

        elif event == _IRQ_SCAN_COMPLETE:
            print("irq:event == _IRQ_SCAN_COMPLETE")
            if self._scan_callback:
                if self._addr:
                    # Found a device during the scan (and the scan was explicitly stopped).
                    self._scan_callback(self._addr_type, self._addr, self._name)
                    self._scan_callback = None
                else:
                    # Scan timed out.
                    self._scan_callback(None, None, None)

        elif event == _IRQ_PERIPHERAL_CONNECT:
            print("irq:event == _IRQ_PERIPHERAL_CONNECT")
            # Connect successful.
            conn_handle, addr_type, addr, = data
            if addr_type == self._addr_type and addr == self._addr:
                self._conn_handle = conn_handle
                self._ble.gattc_discover_services(self._conn_handle)

        elif event == _IRQ_PERIPHERAL_DISCONNECT:
            print("irq:event == _IRQ_PERIPHERAL_DISCONNECT")
            # Disconnect (either initiated by us or the remote end).
            conn_handle, _, _, = data
            if conn_handle == self._conn_handle:
                # If it was initiated by us, it'll already be reset.
                self._reset()

        elif event == _IRQ_GATTC_SERVICE_RESULT:
            print("irq:event == _IRQ_GATTC_SERVICE_RESULT")
            # Connected device returned a service.
            conn_handle, start_handle, end_handle, uuid = data
            if conn_handle == self._conn_handle and uuid == _ENV_SENSE_UUID:
                self._ble.gattc_discover_characteristics(
                    self._conn_handle, start_handle, end_handle
                )

        elif event == _IRQ_GATTC_CHARACTERISTIC_RESULT:
            print("irq:event == _IRQ_GATTC_CHARACTERISTIC_RESULT")
            # Connected device returned a characteristic.
            conn_handle, def_handle, value_handle, properties, uuid = data
            if conn_handle == self._conn_handle and uuid == _TEMP_UUID:
                self._value_handle = value_handle
                # We've finished connecting and discovering device, fire the connect callback.
                if self._conn_callback:
                    self._conn_callback()

        elif event == _IRQ_GATTC_READ_RESULT:
            print("irq:event == _IRQ_GATTC_READ_RESULT")
            # A read completed successfully.
            conn_handle, value_handle, char_data = data
            if conn_handle == self._conn_handle and value_handle == self._value_handle:
                self._update_value(char_data)
                if self._read_callback:
                    self._read_callback(self._value)
                    self._read_callback = None

        elif event == _IRQ_GATTC_NOTIFY:
            print("irq:event == _IRQ_GATTC_NOTIFY")
            # The ble_temperature.py demo periodically notifies its value.
            conn_handle, value_handle, notify_data = data
            print("NOTIFY:", conn_handle, value_handle, notify_data)
            if conn_handle == self._conn_handle and value_handle == self._value_handle:
                self._update_value(notify_data)
                self._notify_flag = True
                if self._notify_callback:
                    print("NOTIFY:if self._notify_callback:")
                    self._notify_callback(self._value)

    # Returns true if we've successfully connected and discovered characteristics.
    def is_connected(self):
        # print("is_connected:run")
        return self._conn_handle is not None and self._value_handle is not None

    # Find a device advertising the environmental sensor service.
    def scan(self, callback=None):
        self._addr_type = None
        self._addr = None
        self._scan_callback = callback
        self._ble.gap_scan(2000, 30000, 30000)

    # Connect to the specified device (otherwise use cached address from a scan).
    def connect(self, addr_type=None, addr=None, callback=None):
        self._addr_type = addr_type or self._addr_type
        self._addr = addr or self._addr
        self._conn_callback = callback
        if self._addr_type is None or self._addr is None:
            return False
        self._ble.gap_connect(self._addr_type, self._addr)
        return True

    # Disconnect from current device.
    def disconnect(self):
        if not self._conn_handle:
            return
        self._ble.gap_disconnect(self._conn_handle)
        self._reset()

    # Issues an (asynchronous) read, will invoke callback with data.
    def read(self, callback):
        if not self.is_connected():
            return
        self._read_callback = callback
        self._ble.gattc_read(self._conn_handle, self._value_handle)
        print("read:over")

    # Sets a callback to be invoked when the device notifies us.
    def on_notify(self, callback):
        self._notify_callback = callback

    def _update_value(self, data):
        # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
        # self._value = struct.unpack("<h", data)[0] / 100
        self._value = struct.unpack("<h", data)[0]
        return self._value

    def value(self):
        # print("value:run")
        return self._value
    
    def notify_flag(self):
        return self._notify_flag

    def disable_notify_flag(self):
        self._notify_flag = False
        return


def main():
    print("main:run")
    ble = bluetooth.BLE()
    central = BLETemperatureCentral(ble)

    not_found = False

    def on_scan(addr_type, addr, name):
        print("on_scan:run")
        if addr_type is not None:
            print("Found sensor:", addr_type, addr, name)
            central.connect()
        else:
            nonlocal not_found
            not_found = True
            print("No sensor found.")

    central.scan(callback=on_scan)

    # Wait for connection...
    while not central.is_connected():
        time.sleep_ms(100)
        if not_found:
            return

    print("Connected")

    # リレードライバ制御
    relay_proc = RelayProc()

    # Alternative to the above, just show the most recently notified value.
    while central.is_connected():
        time.sleep_ms(100)
        if central.notify_flag():
            central.disable_notify_flag()
            value = central.value()
            if value < 100:
                relay_proc.tap(value, 1, 50)

    print("Disconnected")


if __name__ == "__main__":
    main()
    # pass

  • relay.py
import time
from machine import Pin

PIN_RELAY_1 = 27
PIN_RELAY_2 = 26

class RelayProc():
    
    def __init__(self):
        self._relays = {}

        self._relays[1] = {"pin":Pin(PIN_RELAY_1, Pin.OUT)}
        self._relays[2] = {"pin":Pin(PIN_RELAY_2, Pin.OUT)}
        return

    def tap(self, no, count=1, ht=50):
        for i in range(count):
            self._relays[no]["pin"].value(True)
            time.sleep_ms(ht)
            self._relays[no]["pin"].value(False)
            time.sleep_ms(ht)
        return

セントラル側の動作確認

先ほど作ったペリフェラルと合わせて確認します。

  • ESP32 - ペリフェラル側で、ble_temperature.pyを実行する
MicroPython v1.12 on 2019-12-20; ESP32 module with ESP32
Type "help()" for more information.
>>> exec(open('ble_temperature.py').read())

  • ESP32 - セントラル側で、ble_temperature_central.pyを実行する
MicroPython v1.12 on 2019-12-20; ESP32 module with ESP32
Type "help()" for more information.
>>> exec(open('ble_temperature_central.py').read())

  • ESP32 - ペリフェラル側で、スイッチを押すと、セントラル側でログが出力される
irq:event == _IRQ_GATTC_NOTIFY
NOTIFY: 0 13 b'\x01\x00'
irq:event == _IRQ_GATTC_NOTIFY
NOTIFY: 0 13 b'\x02\x00'

また、スイッチのイベントに応じて、リレースイッチが"カチッ"と音がしていると思います。
以上で、セントラルの動作が完了となります。


iPad - Kindleアプリで動作をみる

必要な要素は全部そろったので、あとはセッティングして確認です。

  • セントラル側のリレータッチボードを洗濯ばさみで取り付ける
  • ペリフェラル側の電源をいれる
    • スイッチは個人的につくった筐体に入れて確認していますが、回路は上記と同じ
  • セントラル側の電源を入れる
  • ペリフェラル側で、スイッチを押すとページがめくられる
    • output2_2.gif

さいごに

  • ノートPCで作業をしながら、手元でページ切り替えができました
  • 固定できるような治具があれば、さらにいいと考えています

参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?