はじめに
ESP32同士でBLE通信をします。実装はMicroPythonで行いました。
そもそもBLEとは
Bluetoothをさらに省電力化した通信方式です。Bluetoothには低消費電力で通信可能距離が短い特徴があります。BLEは使い方次第ではコイン電池一個で数年間使えるそうです。
実装
MicroPythonで実装を行います。MicroPythonの公式ドキュメントを見ると
!注釈
ほとんどのアプリケーションでは、上位のaiobleライブラリの利用を推奨します。
と書いてあります。したがってaiobleライブラリを使って実装します。実装に関する基礎知識はaiobleライブラリのreadmeに詳しく書かれているので実装で分からない点があった場合は基本的にそちらを参照すると良いです。
aiobleファイルのexampleファイルにサンプルコードが存在します。このサンプルコードを使用すると簡単にBLEを実装することができます。
データ送信側のプログラムはtemp_sensor.pyを使用します。
import sys
sys.path.append("")
from micropython import const
import uasyncio as asyncio
import aioble
import bluetooth
import random
import struct
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
# How frequently to send advertising beacons.
_ADV_INTERVAL_MS = 250_000
# Register GATT server.
temp_service = aioble.Service(_ENV_SENSE_UUID)
temp_characteristic = aioble.Characteristic(
temp_service, _ENV_SENSE_TEMP_UUID, read=True, notify=True
)
aioble.register_services(temp_service)
# Helper to encode the temperature characteristic encoding (sint16, hundredths of a degree).
def _encode_temperature(temp_deg_c):
return struct.pack("<h", int(temp_deg_c * 100))
# This would be periodically polling a hardware sensor.
async def sensor_task():
t = 24.5
while True:
temp_characteristic.write(_encode_temperature(t))
t += random.uniform(-0.5, 0.5)
await asyncio.sleep_ms(1000)
# Serially wait for connections. Don't advertise while a central is
# connected.
async def peripheral_task():
while True:
async with await aioble.advertise(
_ADV_INTERVAL_MS,
name="mpy-temp",
services=[_ENV_SENSE_UUID],
appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER,
) as connection:
print("Connection from", connection.device)
await connection.disconnected()
# Run both tasks.
async def main():
t1 = asyncio.create_task(sensor_task())
t2 = asyncio.create_task(peripheral_task())
await asyncio.gather(t1, t2)
asyncio.run(main())
上記のコードを実行することで1秒ごとに温度データを送信するプログラムが実行されます。このコードを改変することで自分が送りたいデータを自分が送りたい頻度で送信できるようになります。
まずは送信するデータ内容を改変します。送信するデータを改変するにはasync def sensor_task():という関数を改変します。
async def sensor_task():
t = 24.5
while True:
temp_characteristic.write(_encode_temperature(t))
t += random.uniform(-0.5, 0.5)
await asyncio.sleep_ms(1000)
この関数内のtemp_characteristic.write(_encode_temperature(t))という部分で、tを送信し、await asyncio.sleep_ms(1000)で1000ms待機を行っています。つまりtの内容を変えることで送るデータの内容を変えることができ、await asyncio.sleep_ms(1000)の1000を変えれば送信頻度を変えることができます。使用する用途に応じて各々変えてください。
この関数の改変が終わったら、次にdef _encode_temperature(temp_deg_c):関数を改変する必要があります。
def _encode_temperature(temp_deg_c):
return struct.pack("<h", int(temp_deg_c * 100))
struct.packの()内を、送信するデータ内容に応じて変える必要があります。送信するデータの内容によってどのように改変するか異なるため、詳しくはこちらを参照してください。
最後にUUIDを変更する必要があります。
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
ここもまた、内容によってどのように改変するかが異なります。こちらを参照して、()内を使用用途に応じて改変してください。
続いて、データ受信側の実装をします。データ受信側には、temp_client.pyを使用します。
import sys
sys.path.append("")
from micropython import const
import uasyncio as asyncio
import aioble
import bluetooth
import random
import struct
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
# Helper to decode the temperature characteristic encoding (sint16, hundredths of a degree).
def _decode_temperature(data):
return struct.unpack("<h", data)[0] / 100
async def find_temp_sensor():
# Scan for 5 seconds, in active mode, with very low interval/window (to
# maximise detection rate).
async with aioble.scan(5000, interval_us=30000, window_us=30000, active=True) as scanner:
async for result in scanner:
# See if it matches our name and the environmental sensing service.
if result.name() == "mpy-temp" and _ENV_SENSE_UUID in result.services():
return result.device
return None
async def main():
device = await find_temp_sensor()
if not device:
print("Temperature sensor not found")
return
try:
print("Connecting to", device)
connection = await device.connect()
except asyncio.TimeoutError:
print("Timeout during connection")
return
async with connection:
try:
temp_service = await connection.service(_ENV_SENSE_UUID)
temp_characteristic = await temp_service.characteristic(_ENV_SENSE_TEMP_UUID)
except asyncio.TimeoutError:
print("Timeout discovering services/characteristics")
return
while True:
temp_deg_c = _decode_temperature(await temp_characteristic.read())
print("Temperature: {:.2f}".format(temp_deg_c))
await asyncio.sleep_ms(1000)
asyncio.run(main())
まずは、UUIDを送信側と同じものに変更します。
# org.bluetooth.service.environmental_sensing
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# org.bluetooth.characteristic.temperature
_ENV_SENSE_TEMP_UUID = bluetooth.UUID(0x2A6E)
上記の()内を送信側と同じものに改変してください。UUIDが揃っていないと通信することができません。
次に、def _decode_temperature(data):を改変します。
def _decode_temperature(data):
return struct.unpack("<h", data)[0] / 100
struct.unpackの()内を送信側のstruct.packの()内と同じものに改変してください。
最後にmain関数の改変を行います。main関数の最後の2行をデータ内容と送信頻度に応じて改変します。
while True:
temp_deg_c = _decode_temperature(await temp_characteristic.read())
print("Temperature: {:.2f}".format(temp_deg_c))
await asyncio.sleep_ms(1000)
print内は出力したいデータに応じて改変してください。await asyncio.sleep_ms(1000)は送信頻度に応じて()内の数字を改変してください。
まとめ
以上のように実装することでBLE通信が実装できます。
参考にしたサイト