はじめに
意味もなく買ってみたポータブル電源、Anker 757は携帯のアプリから本体情報や充電情報を取得出来ます。
ところが常時ログを取るにも、ファイルとしてのログを取るにも携帯アプリからは困難です。
そこでどこの誤家庭にもあるLinuxサーバでログを取ってみました。
本記事では、PythonのBLEライブラリを使用し取得データを解析したのち、ファイル出力する方法を紹介します。
また、受信データの解析内容も併せて記載します。
他のBLE機器にも応用出来るはずです。
結論
- Pyhonのライブラリ
bluepy
で実装可能 - コードは特に難しくない
- BLEは該当するサービスを見つけるまでが難しい
前提条件
ポータブル電源:Anker 757 Portable Power Station
Linux: Ubuntu 24.02 (x64)
Bluetooth MOD: 知らん
Python: 3.10
BLEが使えるLinuxであれば何でもいい気がします。
手順
- MACアドレスの特定
- サービス解析
- 通信内容解析
- コードの実装
MACアドレス
BLEに接続するためにはMACアドレスが必要です。Ethernetとは異なり、何かしらの画面上には出ないので何とか調べます。
今回は詳細を紹介しませんが、bluepy
に付属のツールや、macOS系であればBLE Scanner 4.0など任意のツールを使います。
明らかにソレっぽい名前があるのですぐにわかるはずです。
サービス解析
偶然にもAnker757はサービスが2個しかありません。
中の通信も書き込み、通知の2系統しかありません。
通知の方のデータを読み取ればよいのは自明です。
Writeの方が恐らく照明やAC電源の操作用です。
通信内容解析
解析といっても動作条件を変えて受信データの変化を見ればいいだけです。
受信データを標準出力するコードは下記です。通知内容解析以外はドキュメント通りの実装です。
from bluepy import btle
from datetime import datetime, timezone
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
class KusoDelegate(btle.DefaultDelegate):
def __init__(self, client):
btle.DefaultDelegate.__init__(self)
self.client = client
def handleNotification(self, cHandle, data):
print("rcv:", data)
client = InfluxDBClient(url=url, token=token, org=org)
# デバイスに接続
device = btle.Peripheral("XX:XX:XX:XX:XX:XX")
# デリゲートを設定
device.setDelegate(KusoDelegate(client))
# サービスとキャラクタリスティックを取得
service_uuid = "0159f5da-0000-1000-8000-00805f9b34fb"
service = device.getServiceByUUID(service_uuid)
char = service.getCharacteristics("8888")[0]
# 通知を有効にする
device.writeCharacteristic(char.valHandle + 1, b"\x01\x00", withResponse=True)
print("データ待機")
while True:
if device.waitForNotifications(1.0):
continue
bluepy
はroot権限でしか動きません。Buletoothデバイスの所有者がrootのためです。
仮想環境をセットしてbluepy
をpip
でインストールしておきます。
環境変数とかよく知りませんが、rootで実行する場合は仮想環境のpython
をフルパスで指定するみたいです。
$ source /仮想環境のフルパス/bin/activate
$ sudo /仮想環境のフルパス/bin/python anker757.py
充電してみたり、USBポートを使用してみたりしてデータの変化を確認します。
1バイトを1セルとして、Excelに書き込んでみます。
Excelの条件着色を使えば変化がよく分かるでしょう。こういう時だけはExcelが便利です。
ということで判明したのがこちら。
インデックス | 内容 |
---|---|
0 | ヘッダ |
1 | ヘッダ |
2-16 | ? |
17 | 残時間(時) |
18 | 残時間(日) |
19 | AC充電電力下位バイト |
20 | AC充電電力上位バイト |
21 | AC出力電力下位バイト |
22 | AC出力電力上位バイト |
23 | C1電力 |
24 | 予備? |
25 | C2電力 |
26 | 予備? |
27 | A1電力 |
28 | 予備? |
29 | A2電力 |
30 | 予備? |
31 | A3電力 |
32 | 予備? |
33 | A4電力 |
34 | 予備? |
35-36 | ? |
37 | DC充電電力Lo |
38 | DC充電電力Hi |
39 | 充電電力合計Lo |
40 | 充電電力合計Hi |
41 | 出力電力合計Lo |
42 | 出力電力合計Hi |
43-58 | ? |
59 | AC出力中フラグ |
60 | ? |
61 | 充電中フラグ |
62 | 本体温度? |
63 | ? |
64 | バッテリ残量 |
65 | 予備? |
66 | C1状態 |
67 | C2状態 |
68 | A1状態 |
69 | A2状態 |
70 | A3状態 |
71 | A4状態 |
72-92 | ? |
93 | チェックサム? |
データ内容は単にバイナリ整数でした。そのまま10進数に変換するだけです。
使っていると時々中途半端なデータが来ることに気づいたので、次のヘッダを検知するまでバッファするようにしました。
さらにCSVファイルとして出力するように改造したコードはこちら。
from bluepy import btle
from datetime import datetime
import os
class KusoDelegate(btle.DefaultDelegate):
def __init__(self):
btle.DefaultDelegate.__init__(self)
self.buffer = b""
def handleNotification(self, cHandle, data):
self.buffer += data
while b'\x09\xFF' in self.buffer:
# ヘッダ位置を見つける
index = self.buffer.find(b'\x09\xFF')
# ヘッダまでのデータを取り出す
chunk = self.buffer[:index]
# バッファを更新
self.buffer = self.buffer[index + 2:]
if len(chunk) >= 63:
if chunk[4] == 0x49 and chunk[5] == 0x5e:
input_power = int(chunk[35])
temperature = int(chunk[60])
remaining = int(chunk[62])
ac_out = int.from_bytes(chunk[19:21], byteorder='little')
usbc_out = int(chunk[21]) + int(chunk[23])
usba_out = int(chunk[25]) + int(chunk[27]) + int(chunk[29]) + int(chunk[31])
current_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
output = "{},{},{},{},{},{},{}\n".format(current_time, input_power, temperature, remaining, ac_out, usbc_out, usba_out)
# 出力先設定はテキトー
date_str = datetime.now().strftime("%Y-%m-%d")
directory = "csv"
if not os.path.exists(directory):
os.makedirs(directory)
with open(os.path.join(directory, f"{date_str}.csv"), "a") as f:
f.write(output)
# 残りは上記のメインルーチンと同じなので後略
実は
redditの先駆者たちがAnker 767のデータを解析していました。
大抵757と同じです。
reddit: Anker 767 Bluetooth LE
あとがき
この後、取得データを時系列DBに直接保存するよう改造した。コードは自分で考えてネ。