ペリフェラル側をラズパイPicoW、セントラル側をクロームブラウザ,その他でBLE通信テストです。
インターフェース誌を参考に作成、10秒毎に仮想温度をnotify通知しブラウザで受け取ります。
セントラル側:ラズパイPicoW
import bluetooth
import random
import struct
import time
from ble_advertising import advertising_payload
from micropython import const
# Bluetoothイベントの定義
_IRQ_CENTRAL_CONNECT = const(1) # セントラルが接続したときのイベント
_IRQ_CENTRAL_DISCONNECT = const(2) # セントラルが切断したときのイベント
_IRQ_GATTS_INDICATE_DONE = const(20)
# GATT特性のフラグ
_FLAG_READ = const(0x0002) # 読み込み可能な特性フラグ
_FLAG_NOTIFY = const(0x0010) # 通知可能な特性フラグ
_FLAG_INDICATE = const(0x0020)
# 環境センシングサービスUUID
_ENV_SENSE_UUID = bluetooth.UUID(0x181A)
# 温度特性UUIDとそのフラグ
_TEMP_CHAR = (
bluetooth.UUID(0x2A6E),
_FLAG_READ | _FLAG_NOTIFY | _FLAG_INDICATE,
)
# 環境センシングサービスとその特性
_ENV_SENSE_SERVICE = (
_ENV_SENSE_UUID,
(_TEMP_CHAR,),
)
# 汎用温度計の外観
# org.bluetooth.characteristic.gap.appearance.xml
_ADV_APPEARANCE_GENERIC_THERMOMETER = const(768)
class BLETemperature:
def __init__(self, ble, name="mpy-temp"):
self._ble = ble
self._ble.active(True)
# IRQハンドラの設定
self._ble.irq(self._irq)
# GATTサービスの登録
((self._handle,),) = self._ble.gatts_register_services((_ENV_SENSE_SERVICE,))
self._connections = set()
# 広告データのペイロードを作成
self._payload = advertising_payload(
name=name, services=[_ENV_SENSE_UUID], appearance=_ADV_APPEARANCE_GENERIC_THERMOMETER
)
self._advertise()
def _irq(self, event, data):
# Track connections so we can send notifications.
# 接続を追跡して通知を送信できるようにする
if event == _IRQ_CENTRAL_CONNECT:
conn_handle, _, _ = data
self._connections.add(conn_handle)
elif event == _IRQ_CENTRAL_DISCONNECT:
conn_handle, _, _ = data
self._connections.remove(conn_handle)
# 新しい接続を許可するために再度広告を開始
# Start advertising again to allow a new connection.
self._advertise()
elif event == _IRQ_GATTS_INDICATE_DONE:
conn_handle, value_handle, status = data
def set_temperature(self, temp_deg_c, notify=False, indicate=False):
# Data is sint16 in degrees Celsius with a resolution of 0.01 degrees Celsius.
# Write the local value, ready for a central to read.
# データは0.01度単位のsint16形式で表現
# 中央デバイスが読み取れるようにローカル値を書き込む
self._ble.gatts_write(self._handle, struct.pack("<h", int(temp_deg_c * 100)))
if notify or indicate:
for conn_handle in self._connections:
if notify:
# Notify connected centrals.
# 接続された中央デバイスに通知
self._ble.gatts_notify(conn_handle, self._handle)
if indicate:
# Indicate connected centrals.
# 接続された中央デバイスにインディケート
self._ble.gatts_indicate(conn_handle, self._handle)
def _advertise(self, interval_us=500000):
# 広告を開始
self._ble.gap_advertise(interval_us, adv_data=self._payload)
def demo():
ble = bluetooth.BLE()
temp = BLETemperature(ble)
t = 25
i = 0
while True:
# Write every second, notify every 10 seconds.
# 毎秒書き込み、10秒ごとに通知
i = (i + 1) % 10
temp.set_temperature(t, notify=i == 0, indicate=False)
# Random walk the temperature.
# 温度をランダムに変化させる
t += random.uniform(-0.5, 0.5)
time.sleep_ms(1000)
if __name__ == "__main__":
demo()
ペリフェラル側:ブラウザ
<!DOCTYPE html>
<html lang="ja">
<head>
</head>
<body>
<button id="connectButton">Connect</button>
</body>
</html>
<script>
var name;
let device = null;
// 温度サービスUUIDというよりプロファイルのUUIDのこと!
const SERVICE_UUID = "0000181a-0000-1000-8000-00805f9b34fb"; // 確認された温度サービスUUID
const CHARACTERISTICS_UUID = "00002a6e-0000-1000-8000-00805f9b34fb"; // 確認された温度キャラクタリスティックUUID
document.getElementById('connectButton').addEventListener('click', clickPage);
// クリック時の動作
function clickPage() {
console.log('connect'); // ペアリング開始
// BLEデバイスをスキャンする
navigator.bluetooth.requestDevice({
filters: [
{ namePrefix: 'mpy-temp' } // デバイス名プレフィックスでフィルタリング
],
optionalServices: [SERVICE_UUID] // サービスUUIDを指定
})
// デバイス接続
.then(device => {
console.log("Device found:", device);
name = device.name;
device = device;
return device.gatt.connect();
})
// サービスを取得
.then(server => {
console.log("Connected to GATT server:", server);
return server.getPrimaryService(SERVICE_UUID);
})
// キャラクタリスティックを取得
.then(service => {
console.log("Service found:", service);
return service.getCharacteristic(CHARACTERISTICS_UUID);
})
// 温度を取得
.then(characteristic => {
console.log("Characteristic found:", characteristic);
return characteristic.startNotifications().then(() => {
console.log("Notifications started");
characteristic.addEventListener('characteristicvaluechanged', onTemperatureChanged);
});
})
.catch(error => {
console.log("Error:", error);
});
}
// 温度の取得
function onTemperatureChanged(event) {
let value = event.target.value;
let temperature = value.getInt16(0, true) / 100; // 温度データを取得(2バイト)
console.log(name + ' 温度 : ' + temperature);
disconnect(); // ペアリング解除
}
// ペアリング解除
function disconnect() {
console.log('disconnect');
if (!device || !device.gatt.connected) {
return;
}
device.gatt.disconnect();
}
</script>
結果
注意点
ブラウザはクロームを使用しました。最新のクロームでもUbuntuではだめでここがかなりはまりました...Windowsのクロームですんなりいきました。
結果、F12押下でコンソールログに温度がでたらOKです。
ペリフェラル側:gatttool
0x0009には何も入っていない>0x000cに01を書く>0x0009には32.20℃が書き込まれてる
0x000cに00を書く>0x0009には何も書かれていない
ペリフェラル側:bleak(python)
bluepyは不安定なので非同期通信のbleak使用してテストする。
import asyncio
from bleak import BleakClient, BleakScanner
picow_address = '28:cd:c1:0e:34:7b'
picow_uuid = '6e400001-b5a3-f393-e0a9-e50e24dcca9e'
write_characteristic_uuid = '6e400002-b5a3-f393-e0a9-e50e24dcca9e' # 書き込みを行う特性のUUID
notify_characteristic_uuid = '6e400003-b5a3-f393-e0a9-e50e24dcca9e' # 通知を受け取る特性のUUID
timeout = 5
async def notification_handler(sender, data):
print(f"Received Data: {data.decode('utf-8')}")
async def main():
device = await BleakScanner.find_device_by_address(picow_address, timeout=20.0)
if not device:
print(f"Device with address {picow_address} not found.")
return
async with BleakClient(device) as client:
await client.start_notify(notify_characteristic_uuid, notification_handler)
# send_data = b'change'
send_data = b'\x01'
await client.write_gatt_char(write_characteristic_uuid, send_data, response=True)
st_time = asyncio.get_event_loop().time()
while asyncio.get_event_loop().time() < st_time + timeout:
await asyncio.sleep(1.0)
await client.stop_notify(notify_characteristic_uuid)
asyncio.run(main())
ペリフェラル側:Flutter
main.dart
import 'dart:async';
import 'dart:io';
import 'dart:convert';
import 'package:flutter/material.dart';
import 'package:flutter_blue_plus/flutter_blue_plus.dart';
void main() => runApp(MyApp());
class MyApp extends StatelessWidget {
@override
Widget build(BuildContext context) {
return MaterialApp(
title: 'Flutter BLE Demo',
theme: ThemeData(
primarySwatch: Colors.blue,
),
home: BLEHomePage(),
);
}
}
class BLEHomePage extends StatefulWidget {
@override
_BLEHomePageState createState() => _BLEHomePageState();
}
class _BLEHomePageState extends State<BLEHomePage> {
StreamSubscription<BluetoothAdapterState>? adapterStateSubscription;
StreamSubscription<List<ScanResult>>? scanResultsSubscription;
StreamSubscription<bool>? isScanningSubscription;
StreamSubscription<List<int>>? characteristicSubscription;
BluetoothDevice? connectedDevice;
List<BluetoothService> bluetoothServices = [];
bool isScanning = false;
List<ScanResult> scanResults = [];
String statusMessage = "Checking Bluetooth support...";
TextEditingController writeController = TextEditingController();
Map<Guid, List<int>> characteristicValues = {}; // 状態管理用の変数
@override
void initState() {
super.initState();
initializeBluetooth();
}
@override
void dispose() {
adapterStateSubscription?.cancel();
scanResultsSubscription?.cancel();
isScanningSubscription?.cancel();
characteristicSubscription?.cancel();
connectedDevice?.disconnect();
super.dispose();
}
Future<void> initializeBluetooth() async {
// Check if Bluetooth is supported
if (await FlutterBluePlus.isSupported == false) {
setState(() {
statusMessage = "Bluetooth not supported by this device";
});
print("Bluetooth not supported by this device");
return;
}
// Handle Bluetooth on/off state
adapterStateSubscription =
FlutterBluePlus.adapterState.listen((BluetoothAdapterState state) {
print(state);
if (state == BluetoothAdapterState.on) {
setState(() {
statusMessage = "Bluetooth is ON";
});
} else {
setState(() {
statusMessage = "Bluetooth is OFF";
stopScanning();
});
}
});
// Turn on Bluetooth for Android
if (Platform.isAndroid) {
await FlutterBluePlus.turnOn();
}
}
void startScanning() {
if (!isScanning) {
setState(() {
isScanning = true;
scanResults.clear();
});
// Listen to scan results
scanResultsSubscription = FlutterBluePlus.onScanResults.listen(
(results) {
setState(() {
scanResults = results;
});
if (results.isNotEmpty) {
ScanResult r = results.last; // the most recently found device
print(
'${r.device.remoteId}: "${r.advertisementData.localName}" found!');
}
},
onError: (e) => print(e),
);
// Listen to scanning state
isScanningSubscription = FlutterBluePlus.isScanning.listen((scanning) {
setState(() {
isScanning = scanning;
if (!scanning) {
statusMessage = "Scan complete";
}
});
});
// Start scanning
FlutterBluePlus.startScan(
timeout: Duration(seconds: 15),
).catchError((e) {
setState(() {
statusMessage = "Scan failed: $e";
isScanning = false;
});
});
}
}
void stopScanning() {
FlutterBluePlus.stopScan();
scanResultsSubscription?.cancel();
setState(() {
isScanning = false;
});
}
void connectToDevice(BluetoothDevice device) async {
setState(() {
statusMessage = "Connecting to ${device.remoteId}";
});
await device.connect();
setState(() {
connectedDevice = device;
statusMessage = "Connected to ${device.remoteId}";
});
discoverServices();
}
void disconnectFromDevice() async {
await connectedDevice?.disconnect();
setState(() {
connectedDevice = null;
bluetoothServices.clear();
statusMessage = "Disconnected";
});
}
void discoverServices() async {
if (connectedDevice != null) {
List<BluetoothService> services =
await connectedDevice!.discoverServices();
setState(() {
bluetoothServices = services;
});
for (var service in services) {
for (var characteristic in service.characteristics) {
if (characteristic.properties.notify) {
await characteristic.setNotifyValue(true);
characteristicSubscription =
characteristic.lastValueStream.listen((value) {
setState(() {
characteristicValues[characteristic.uuid] = value;
});
String receivedData = utf8.decode(value);
print(
'Notification received for ${characteristic.uuid}: $receivedData');
});
connectedDevice!
.cancelWhenDisconnected(characteristicSubscription!);
}
}
}
}
}
Future<void> readCharacteristic(
BluetoothCharacteristic characteristic) async {
List<int> value = await characteristic.read();
setState(() {
characteristicValues[characteristic.uuid] = value;
});
String receivedData = utf8.decode(value);
print('Read value: $receivedData');
}
// これはwriteボタンで単純にそのままテキスト入力を送信するもの
// Future<void> writeCharacteristic(
// BluetoothCharacteristic characteristic, String value) async {
// await characteristic.write(utf8.encode(value));
// print('Wrote value: $value');
// }
// sendBinaryボタン押下で一律x01送信
Future<void> sendBinaryData(
BluetoothCharacteristic characteristic, List<int> data) async {
await characteristic.write(data);
print('Sent binary data: $data');
}
// writeボタン押下でテキスト入力をバイナリに変換して送信
Future<void> sendHexString(
BluetoothCharacteristic characteristic, String hexString) async {
// Convert hex string (e.g., "01") to a list of integers representing bytes
List<int> data = [];
for (int i = 0; i < hexString.length; i += 2) {
data.add(int.parse(hexString.substring(i, i + 2), radix: 16));
}
await characteristic.write(data);
print('Sent hex string as binary data: $data');
}
@override
Widget build(BuildContext context) {
return Scaffold(
appBar: AppBar(
title: Text('Flutter BLE Demo'),
),
body: Center(
child: Column(
mainAxisAlignment: MainAxisAlignment.center,
children: [
Text(statusMessage),
SizedBox(height: 20),
ElevatedButton(
onPressed: isScanning ? stopScanning : startScanning,
child: Text(isScanning ? "Stop Scanning" : "Start Scanning"),
),
SizedBox(height: 20),
//Expandedを使えば、ラップしたwidgetを余白いっぱいに広げることができる
Expanded(
child: ListView.builder(
itemCount: scanResults.length,
itemBuilder: (context, index) {
final result = scanResults[index];
return ListTile(
title: Text(result.device.remoteId.toString()),
subtitle:
Text(result.advertisementData.localName ?? 'Unknown'),
onTap: () => connectToDevice(result.device),
);
},
),
),
if (connectedDevice != null) ...[
Text('Connected to ${connectedDevice!.remoteId}'),
ElevatedButton(
onPressed: disconnectFromDevice,
child: Text('Disconnect'),
),
Expanded(
child: ListView.builder(
itemCount: bluetoothServices.length,
itemBuilder: (context, index) {
final service = bluetoothServices[index];
return ExpansionTile(
title: Text('Service: ${service.uuid}'),
children: service.characteristics.map((c) {
final value = characteristicValues[c.uuid] ?? [];
return ListTile(
title: Text('Characteristic: ${c.uuid}'),
subtitle: Column(
crossAxisAlignment: CrossAxisAlignment.start,
children: [
Text(
'Properties: ${_buildPropertiesString(c.properties)}'),
if (value.isNotEmpty)
Text('Value: ${utf8.decode(value)}'),
Row(
children: [
if (c.properties.read)
ElevatedButton(
onPressed: () => readCharacteristic(c),
child: Text('Read'),
),
if (c.properties.write)
ElevatedButton(
onPressed: () async {
await showDialog(
context: context,
builder: (context) {
return AlertDialog(
title:
Text('Write Characteristic'),
content: TextField(
controller: writeController,
decoration: InputDecoration(
hintText:
'Enter value to write (e.g., 01)',
),
),
actions: [
TextButton(
onPressed: () {
sendHexString(c,
writeController.text);
Navigator.of(context).pop();
},
child: Text('Write'),
),
],
);
},
);
},
child: Text('Write'),
),
////////////////////////////////////////////////////////////
if (c.properties.write)
ElevatedButton(
onPressed: () {
sendBinaryData(c, [0x01]);
},
child: Text('Send Binary Data'),
),
],
),
],
),
);
}).toList(),
);
},
),
),
],
],
),
),
);
}
// String _buildPropertiesString(BluetoothCharacteristicProperties properties) {
String _buildPropertiesString(properties) {
List<String> props = [];
if (properties.read) props.add('read');
if (properties.write) props.add('write');
if (properties.notify) props.add('notify');
if (properties.indicate) props.add('indicate');
return props.join(', ');
}
}