はじめに
ひょんな事から、業務でラズパイのSDカードの試験を実施することになりました。
所謂、電源の瞬断によりSDカードの破壊が起こらないか、についての試験です。
ただ、つきっきりでコンセントの抜き差しをするわけにもいきません。
(数十回、数百回の回数で済む話ではないので。)
そんなところから、Switchbotプラグミニを使って試験をライトに済ませられないか、
といった内容になります。
Switchbotプラグミニの制御について
プラグミニをどう使うかという話ですが、BLEを使って制御したいと思います。
github上で公式にBLE-APIを公開してくれています。便利ですね。
余談ですが、スマートロックのAPI仕様は公開されていませんね。
そりゃ勝手に解錠されたら困るはずなので当たり前ですが。。。
(公式には公開していませんが、暗号化のもと使えるようです)
プラグミニのBLE仕様
さて、早速ですがプラグミニのBLE仕様を見てみましょう。
アドバタイズについて
これは、プラグミニのアドバタイジングデータの構造になります。
Manufacture Dataを使っているようです。
先頭の0x0969ですが、これはCompany IDを表します。
Bluetooth SIGに登録されているデータかは、公式より確認ができます。
ありましたね。アドバタイズより、Switchbot製品であるかは、コレで見分けが付きそうです。
今回、pythonを使ってライトに検証してみます。
BLEの通信スタックには、bleakを採用しました。
import os
import asyncio
from bleak import BleakScanner, BleakClient
import binascii
import random
TARGET_COMPANY_ID = "0969" # SwitchBot製品のCompany ID
async def scan_switch_bot_devices():
while True:
devices = await BleakScanner.discover()
for device in devices:
# manufacturer_dataが存在するかチェックして16進数で表示
manufacturer_data = device.metadata.get("manufacturer_data", {})
if manufacturer_data:
for key, value in manufacturer_data.items():
company_id = format(key, "04x") # 16進数で表示
correct_company_id = (
company_id[:2] + company_id[2:]
) # バイトオーダーの修正
hex_value = binascii.hexlify(value).decode(
"utf-8"
) # bytesを16進数に変換
# Company IDがTARGET_COMPANY_IDと一致するか確認
if correct_company_id == TARGET_COMPANY_ID:
print(
f"Company ID {TARGET_COMPANY_ID} のデバイスを発見しました!"
)
return
print(f"Company ID {TARGET_COMPANY_ID} のデバイスが見つかりませんでした。")
await asyncio.sleep(5)
async def main():
await scan_switch_bot_devices()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("プログラムを終了します")
今回は、他にSwitchbot製品がないことと、プラグミニも一台しか使わないので、
デバイスの検索はこれで良さそうです。
複数台のプラグミニを使う場合は、MACアドレスで個々を判別しましょう。
キャラクタリスティックについて
どうやってプラグミニをOFF/ONさせるのかという話ですが、
BLEのサービス上のキャラクタリスティックで仕組みが提供されています。
それぞれですが、
cba20003-224d-11e6-9fb8-0002a5d5c51b
write(write without response)の特性で、プラグミニへコマンドを送信するのに使えるキャラクタリスティックです。
cba20002-224d-11e6-9fb8-0002a5d5c51b
notifyの特性で、プラグミニからのレスポンスが返ってくるキャラクタリスティックです。
検索〜接続、電源制御まで
アドバタイズよりデバイスの検索から、接続、キャラクタリスティックでのプラグミニの電源制御までを行うサンプルです。
コマンド仕様は、API仕様を確認ください。
import os
import asyncio
from bleak import BleakScanner, BleakClient
import binascii
import random
TARGET_COMPANY_ID = "0969" # SwitchBotのCompany ID
NOTIFY_CHARACTERISTIC_UUID = "cba20003-224d-11e6-9fb8-0002a5d5c51b"
WRITE_CHARACTERISTIC_UUID = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
TURN_ON = bytes.fromhex("570f50010180")
TURN_OFF = bytes.fromhex("570f50010100")
GET_STATE = bytes.fromhex("570f5101")
current_state = None # 現在のステートを保持するグローバル変数
on_cnt = 0 # ON回数を保持するグローバル変数
notify_event = asyncio.Event() # Notifyが届いたことを知らせるイベント
# Notifyで受信したデータを処理するコールバック関数
def notification_handler(sender: int, data: bytearray):
global current_state
global on_cnt
print(f"Notify: {binascii.hexlify(data).decode('utf-8')}")
if len(data) < 2:
print("受信したNotifyデータの長さが不正です。")
return
if data[0] != 0x01:
print("受信したNotifyデータのフォーマットが不正です。")
return
current_state = data[1] # 0x00 or 0x80を取得
if data[1] == 0x00:
print("スイッチをOFFにしました")
elif data[1] == 0x80:
on_cnt += 1
print(f"スイッチをONにしました(ON回数: {on_cnt})")
else:
pass
notify_event.set() # イベントをセット
# デバイス切断時に呼び出されるコールバック関数
def disconnected_handler(client):
print("デバイスが切断されました!プログラムを終了します。")
os._exit(1)
async def get_state(client):
print("現在のステートを取得中")
await client.write_gatt_char(WRITE_CHARACTERISTIC_UUID, GET_STATE, response=False)
async def turn_on(client):
print("スイッチをONにします")
await client.write_gatt_char(WRITE_CHARACTERISTIC_UUID, TURN_ON, response=False)
async def turn_off(client):
print("スイッチをOFFにします")
await client.write_gatt_char(WRITE_CHARACTERISTIC_UUID, TURN_OFF, response=False)
async def toggle_switch(client):
global current_state
global on_cnt
await get_state(client) # 現在のステートを取得
# Notifyを受信して、現在のステートが取得できるまで待機。同期で待ちたい
try:
await asyncio.wait_for(notify_event.wait(), timeout=5.0)
print("Notifyを正常に受信しました")
notify_event.clear()
except asyncio.TimeoutError:
print("タイムアウトしました(Notifyを受信できませんでした)")
return
while True:
notify_event.clear()
if current_state == 0x00:
await turn_on(client)
sleep_time = random.randint(40, 60)
else:
await turn_off(client)
sleep_time = 3
# Notifyを受信して、スイッチの状態が変わることを待ちたい
try:
await asyncio.wait_for(notify_event.wait(), timeout=5.0)
print("Notifyを正常に受信しました。スイッチの状態が変わりました")
notify_event.clear()
await asyncio.sleep(sleep_time)
except asyncio.TimeoutError:
print(
"タイムアウトしました(Notifyを受信できませんでした)。スイッチの状態が変わらない可能性があります"
)
async def connect_to_device(address):
async with BleakClient(address) as client:
if client.is_connected:
client.set_disconnected_callback(disconnected_handler)
services = await client.get_services()
for service in services:
print(f"Service: {service.uuid}")
for characteristic in service.characteristics:
print(
f" Characteristic: {characteristic.uuid}, プロパティ: {characteristic.properties}"
)
await client.start_notify(
NOTIFY_CHARACTERISTIC_UUID, notification_handler
)
print("Notifyを開始しました")
# スイッチをトグルする
await toggle_switch(client)
# Notifyを停止する
# await client.stop_notify(NOTIFY_CHARACTERISTIC_UUID)
async def scan_switch_bot_devices():
while True:
devices = await BleakScanner.discover()
for device in devices:
# manufacturer_dataが存在するかチェックして16進数で表示
manufacturer_data = device.metadata.get("manufacturer_data", {})
if manufacturer_data:
for key, value in manufacturer_data.items():
company_id = format(key, "04x") # 16進数で表示
correct_company_id = (
company_id[:2] + company_id[2:]
) # バイトオーダーの修正
hex_value = binascii.hexlify(value).decode(
"utf-8"
) # bytesを16進数に変換
# Company IDがTARGET_COMPANY_IDと一致するか確認
if correct_company_id == TARGET_COMPANY_ID:
print(
f"Company ID {TARGET_COMPANY_ID} のデバイスを発見しました!"
)
print(f"Manufacturer Data: {correct_company_id}: {hex_value}")
await connect_to_device(device.address)
return
else:
print("Manufacturer Data: なし")
print(f"Company ID {TARGET_COMPANY_ID} のデバイスが見つかりませんでした。")
await asyncio.sleep(5)
async def main():
await scan_switch_bot_devices()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("プログラムを終了します")
まとめ
- switchbotミニはちょっとした電源OFF/ONの自動化テストに有用
- 1万回ぐらいでしたらサクサク動きました
- pythonでもbleakだったらHWプラットフォーム問わず使いやすそう
- MACOS、ラズパイでそれぞれ動かすことができました。便利ですね