3
3

More than 1 year has passed since last update.

ESP32でBLEの実装作業【ブロードキャスト】

Last updated at Posted at 2021-02-18

はじめに

今回はESP32同士でBluetooth Low Energy(BLE)通信を構築してみました。BLEの初心者であればまず開発視点の超簡単BLE入門を見て多少これからの内容を把握できると思います。本文にはBLEを使ってESP32からパケットをアドバタイジングコードを紹介します。micropythonオフィシャルサイトで今使っているコードが全部載せているから、わからないな時には参照する方が良いと思います。

環境

  • IoTデバイス(ESP-WROOM-32)
  • ファームウェア(MicroPython1.13)

この記事の作業を始める前にmicropythonのバージョンが1.13であることを確認してください。確認手順は下にある図を参考してください。
image.png
※バージョンがずっと更新しているので, 異なるバージョンで機能しない可能性があります。

ソースコード(ESP32)

今回はライブラリとなるadvertising.pyと実際に実行するプログラムble_broadcast.pyを分けて説明していきます。

アドバタイジングの準備

今回はjimmoらが作ったbluetoothというスタックadvertising.pyを参考にして、BLE通信のパケットをペイロードという形で生成します。これはライブラリのような存在と認識してもよいで、後に出てくる若干のプログラムはこれを利用します。

advertising.py
# Helpers for generating BLE advertising payloads.

from micropython import const
import struct
import bluetooth

# Advertising payloads are repeated packets of the following form:
#   1 byte data length (N + 1)
#   1 byte type (see constants below)
#   N bytes type-specific data

_ADV_TYPE_FLAGS = const(0x01)
_ADV_TYPE_NAME = const(0x09)
_ADV_TYPE_UUID16_COMPLETE = const(0x3)
_ADV_TYPE_UUID32_COMPLETE = const(0x5)
_ADV_TYPE_UUID128_COMPLETE = const(0x7)
_ADV_TYPE_APPEARANCE = const(0x19)


# Generate a payload to be passed to gap_advertise(adv_data=...).
def advertising_payload(limited_disc=False, br_edr=False, name=None, services=None, appearance=0):
    payload = bytearray()

    def _append(adv_type, value):
        nonlocal payload
        payload += struct.pack('BB', len(value) + 1, adv_type) + value

    _append(_ADV_TYPE_FLAGS, struct.pack('B', (0x01 if limited_disc else 0x02) + (0x00 if br_edr else 0x04)))

    if name:
        _append(_ADV_TYPE_NAME, name)

    if services:
        for uuid in services:
            b = bytes(uuid)
            if len(b) == 2:
                _append(_ADV_TYPE_UUID16_COMPLETE, b)
            elif len(b) == 4:
                _append(_ADV_TYPE_UUID32_COMPLETE, b)
            elif len(b) == 16:
                _append(_ADV_TYPE_UUID128_COMPLETE, b)

    # See org.bluetooth.characteristic.gap.appearance.xml
    _append(_ADV_TYPE_APPEARANCE, struct.pack('<h', appearance))

    return payload


def decode_field(payload, adv_type):
    i = 0
    result = []
    while i + 1 < len(payload):
        if payload[i + 1] == adv_type:
            result.append(payload[i + 2:i + payload[i] + 1])
        i += 1 + payload[i]
    return result


def decode_name(payload):
    n = decode_field(payload, _ADV_TYPE_NAME)
    return str(n[0], 'utf-8') if n else ''


def decode_services(payload):
    services = []
    for u in decode_field(payload, _ADV_TYPE_UUID16_COMPLETE):
        services.append(bluetooth.UUID(struct.unpack('<h', u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID32_COMPLETE):
        services.append(bluetooth.UUID(struct.unpack('<d', u)[0]))
    for u in decode_field(payload, _ADV_TYPE_UUID128_COMPLETE):
        services.append(bluetooth.UUID(u))
    return services


def demo():
    #ペイロードの生成
    payload = advertising_payload(name='好きな名前', services=[bluetooth.UUID(0x181A), bluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')])
    print(payload)
    print(decode_name(payload))
    print(decode_services(payload))

if __name__ == '__main__':
    demo()

ブロードキャストのプログラム

ブロードキャストのプログラムの一覧

ble_broadcast.py
# This example demonstrates a simple temperature sensor peripheral.
#
# The sensor's local value updates every second, and it will notify
# any connected central every 10 seconds.

import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload

from micropython import const

_IRQ_CENTRAL_CONNECT                 = const(1)
_IRQ_CENTRAL_DISCONNECT              = const(2)

# 環境センシングサービス
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# 温度特性
_TEMP_CHAR = (bluetooth.UUID(0x2A6E), bluetooth.FLAG_READ|bluetooth.FLAG_NOTIFY,)
_ENV_SENSE_SERVICE = (_ENV_SENSE_UUID, (_TEMP_CHAR,),)

# 検測された画像様式
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)


class BLETemperature:
    def __init__(self, ble, name="好きな名前"):
        self._ble = ble
        self._ble.active(True)
        self._ble.irq(self._irq)
        ((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
        self._connections = set()
        self._payload = advertising_payload(
            name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER
        )
        self._advertise()

    def _irq(self, event, data):
        # Track connections so we can send notifications.
        if event == _IRQ_CENTRAL_CONNECT:
            conn_handle, _, _ = data
            self._connections.add(conn_handle)
        elif event == _IRQ_CENTRAL_DISCONNECT:
            conn_handle, _, _ = data
            self._connections.remove(conn_handle)
            # Start advertising again to allow a new connection.
            self._advertise()

    def set_temperature(self, temp_deg_c, notify=False, indicate=False):
        # Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
        # Write the local value, ready for a central to read.
        self._ble.gatts_write(self._handle, struct.pack('<h', int(temp_deg_c * 100)))
        if notify:
            for conn_handle in self._connections:
                # Notify connected centrals to issue a read.
                self._ble.gatts_notify(conn_handle, self._handle)

    def _advertise(self, interval_us=500000):
        self._ble.gap_advertise(interval_us, adv_data=self._payload)


def demo():
    ble = bluetooth.BLE()
    temp = BLETemperature(ble)

    t = 25
    i = 0
    count = 0

    # 二回データを出す 
    #while count < 2:
        # Write every second, notify every 10 seconds.
        #i = (i + 1) % 10
        #temp.set_temperature(t, notify=i == 0, indicate=False)
        # Random walk the temperature.
        #t += random.uniform(-0.5, 0.5)
        #print(t)
        #time.sleep_ms(1000)
        #count += 1

    # 無限ループのオプション
    while True:
        # Write every second, notify every 10 seconds.
        i = (i + 1) % 10
        temp.set_temperature(t, notify=i == 0, indicate=False)
        # Random walk the temperature.
        t += random.uniform(-0.5, 0.5)
        print(t)
        time.sleep_ms(1000)


if __name__ == "__main__":
    demo()

ブロードキャストの変数説明

UUID

BLE通信の枠組みを簡単に言うと、二つのデバイスがUUIDを使って同じお互いに認識します。その後に、同じくUUIDを使っているサービスにある属性のデータを書き込むと読み取ることができます。今回の場合は仮想温度データを送受信しているので、環境センシングサービスを利用しています。したがってUUIDは0x181Aと0x2A6Eになりました。(但し温度データだけではなく他のデータも使えるようです。)

アイコン

_ADV_APPEARANCE_GENERIC_THERMOMETERこの変数は実際に使わなくてもよいですが、これは何かというとPCやスマホでBluetoothデバイスを探す際のアイコンのことです。const(768)はimage.pngこの感じです。const(767)はimage.pngこう言う感じですね。

ble.active(True)

BLEを有効にする関数です。

irq(self._irq)と_irq関数

Eventを喚起する関数。
ble.active(True)と同じく前述の通りmicropythonオフィシャルサイトで確認できますから、後でこのような関数の説明はそこに参考すればいいでしょう。

gatts_write

この関数はBLE通信中のgattサービスにデータの書き込み処理をする関数です. gattサービスの内容はとばしますが、中にあるstruct.packはBLE通信の際にパケットをバイナリデータとしてやり取りをやっていますので、C言語の構造体みたいにデータの中身をバイナリデータにパックする必要があります。詳細はpythonのオフィシャルサイトで見えます。

結果

結果は一秒ごとにパケットを発信しています。ターミナルでは下図のように表示しているはずです。
image.png

最後に

自分が実装する途中でバージョンが違いのせいでずっと実行結果が見られませんでした。その時にMicroPython Forumで質問を投げてjimmoに助けてもらいました。jimmoさんはホントにBLEの領域でコンミュニティにすごく貢献をしていました。みんなもBLEだけではなくmicropythonに関する質問があればこのサイトを有効に利用する方がおすすめです。Thank you jimmo!!

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3