0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Switchbotプラグミニによる電源OFF/ON制御

Posted at

はじめに

ひょんな事から、業務でラズパイのSDカードの試験を実施することになりました。
所謂、電源の瞬断によりSDカードの破壊が起こらないか、についての試験です。
ただ、つきっきりでコンセントの抜き差しをするわけにもいきません。
(数十回、数百回の回数で済む話ではないので。)

そんなところから、Switchbotプラグミニを使って試験をライトに済ませられないか、
といった内容になります。

Switchbotプラグミニの制御について

プラグミニをどう使うかという話ですが、BLEを使って制御したいと思います。
github上で公式にBLE-APIを公開してくれています。便利ですね。

余談ですが、スマートロックのAPI仕様は公開されていませんね。
そりゃ勝手に解錠されたら困るはずなので当たり前ですが。。。
(公式には公開していませんが、暗号化のもと使えるようです

プラグミニのBLE仕様

さて、早速ですがプラグミニのBLE仕様を見てみましょう。

アドバタイズについて

スクリーンショット 2024-11-16 12.10.19.png

これは、プラグミニのアドバタイジングデータの構造になります。
Manufacture Dataを使っているようです。

先頭の0x0969ですが、これはCompany IDを表します。
Bluetooth SIGに登録されているデータかは、公式より確認ができます。

スクリーンショット 2024-11-16 12.17.27.png

ありましたね。アドバタイズより、Switchbot製品であるかは、コレで見分けが付きそうです。

今回、pythonを使ってライトに検証してみます。
BLEの通信スタックには、bleakを採用しました。

scan.py
import os
import asyncio
from bleak import BleakScanner, BleakClient
import binascii
import random

TARGET_COMPANY_ID = "0969"  # SwitchBot製品のCompany ID

async def scan_switch_bot_devices():
    while True:
        devices = await BleakScanner.discover()

        for device in devices:

            # manufacturer_dataが存在するかチェックして16進数で表示
            manufacturer_data = device.metadata.get("manufacturer_data", {})
            if manufacturer_data:
                for key, value in manufacturer_data.items():
                    company_id = format(key, "04x")  # 16進数で表示
                    correct_company_id = (
                        company_id[:2] + company_id[2:]
                    )  # バイトオーダーの修正
                    hex_value = binascii.hexlify(value).decode(
                        "utf-8"
                    )  # bytesを16進数に変換

                    # Company IDがTARGET_COMPANY_IDと一致するか確認
                    if correct_company_id == TARGET_COMPANY_ID:
                        print(
                            f"Company ID {TARGET_COMPANY_ID} のデバイスを発見しました!"
                        )
                        return

        print(f"Company ID {TARGET_COMPANY_ID} のデバイスが見つかりませんでした。")
        await asyncio.sleep(5)


async def main():
    await scan_switch_bot_devices()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("プログラムを終了します")

今回は、他にSwitchbot製品がないことと、プラグミニも一台しか使わないので、
デバイスの検索はこれで良さそうです。
複数台のプラグミニを使う場合は、MACアドレスで個々を判別しましょう。

キャラクタリスティックについて

どうやってプラグミニをOFF/ONさせるのかという話ですが、
BLEのサービス上のキャラクタリスティックで仕組みが提供されています。
スクリーンショット 2024-11-16 12.28.03.png

それぞれですが、

cba20003-224d-11e6-9fb8-0002a5d5c51b

write(write without response)の特性で、プラグミニへコマンドを送信するのに使えるキャラクタリスティックです。

cba20002-224d-11e6-9fb8-0002a5d5c51b

notifyの特性で、プラグミニからのレスポンスが返ってくるキャラクタリスティックです。

検索〜接続、電源制御まで

アドバタイズよりデバイスの検索から、接続、キャラクタリスティックでのプラグミニの電源制御までを行うサンプルです。
コマンド仕様は、API仕様を確認ください。

power_test.py
import os
import asyncio
from bleak import BleakScanner, BleakClient
import binascii
import random


TARGET_COMPANY_ID = "0969"  # SwitchBotのCompany ID
NOTIFY_CHARACTERISTIC_UUID = "cba20003-224d-11e6-9fb8-0002a5d5c51b"
WRITE_CHARACTERISTIC_UUID = "cba20002-224d-11e6-9fb8-0002a5d5c51b"
TURN_ON = bytes.fromhex("570f50010180")
TURN_OFF = bytes.fromhex("570f50010100")
GET_STATE = bytes.fromhex("570f5101")
current_state = None  # 現在のステートを保持するグローバル変数
on_cnt = 0  # ON回数を保持するグローバル変数

notify_event = asyncio.Event()  # Notifyが届いたことを知らせるイベント


# Notifyで受信したデータを処理するコールバック関数
def notification_handler(sender: int, data: bytearray):
    global current_state
    global on_cnt
    print(f"Notify: {binascii.hexlify(data).decode('utf-8')}")

    if len(data) < 2:
        print("受信したNotifyデータの長さが不正です。")
        return

    if data[0] != 0x01:
        print("受信したNotifyデータのフォーマットが不正です。")
        return

    current_state = data[1]  # 0x00 or 0x80を取得

    if data[1] == 0x00:
        print("スイッチをOFFにしました")
    elif data[1] == 0x80:
        on_cnt += 1
        print(f"スイッチをONにしました(ON回数: {on_cnt}")
    else:
        pass

    notify_event.set()  # イベントをセット


# デバイス切断時に呼び出されるコールバック関数
def disconnected_handler(client):
    print("デバイスが切断されました!プログラムを終了します。")
    os._exit(1)


async def get_state(client):
    print("現在のステートを取得中")
    await client.write_gatt_char(WRITE_CHARACTERISTIC_UUID, GET_STATE, response=False)


async def turn_on(client):
    print("スイッチをONにします")
    await client.write_gatt_char(WRITE_CHARACTERISTIC_UUID, TURN_ON, response=False)


async def turn_off(client):
    print("スイッチをOFFにします")
    await client.write_gatt_char(WRITE_CHARACTERISTIC_UUID, TURN_OFF, response=False)


async def toggle_switch(client):
    global current_state
    global on_cnt

    await get_state(client)  # 現在のステートを取得

    # Notifyを受信して、現在のステートが取得できるまで待機。同期で待ちたい
    try:
        await asyncio.wait_for(notify_event.wait(), timeout=5.0)
        print("Notifyを正常に受信しました")
        notify_event.clear()
    except asyncio.TimeoutError:
        print("タイムアウトしました(Notifyを受信できませんでした)")
        return

    while True:
        notify_event.clear()

        if current_state == 0x00:
            await turn_on(client)
            sleep_time = random.randint(40, 60)
        else:
            await turn_off(client)
            sleep_time = 3

        # Notifyを受信して、スイッチの状態が変わることを待ちたい
        try:
            await asyncio.wait_for(notify_event.wait(), timeout=5.0)
            print("Notifyを正常に受信しました。スイッチの状態が変わりました")
            notify_event.clear()
            await asyncio.sleep(sleep_time)
        except asyncio.TimeoutError:
            print(
                "タイムアウトしました(Notifyを受信できませんでした)。スイッチの状態が変わらない可能性があります"
            )


async def connect_to_device(address):
    async with BleakClient(address) as client:
        if client.is_connected:
            client.set_disconnected_callback(disconnected_handler)
            services = await client.get_services()

            for service in services:
                print(f"Service: {service.uuid}")
                for characteristic in service.characteristics:
                    print(
                        f"  Characteristic: {characteristic.uuid}, プロパティ: {characteristic.properties}"
                    )

                await client.start_notify(
                    NOTIFY_CHARACTERISTIC_UUID, notification_handler
                )
                print("Notifyを開始しました")

                # スイッチをトグルする
                await toggle_switch(client)

                # Notifyを停止する
                # await client.stop_notify(NOTIFY_CHARACTERISTIC_UUID)


async def scan_switch_bot_devices():
    while True:
        devices = await BleakScanner.discover()

        for device in devices:

            # manufacturer_dataが存在するかチェックして16進数で表示
            manufacturer_data = device.metadata.get("manufacturer_data", {})
            if manufacturer_data:
                for key, value in manufacturer_data.items():
                    company_id = format(key, "04x")  # 16進数で表示
                    correct_company_id = (
                        company_id[:2] + company_id[2:]
                    )  # バイトオーダーの修正
                    hex_value = binascii.hexlify(value).decode(
                        "utf-8"
                    )  # bytesを16進数に変換

                    # Company IDがTARGET_COMPANY_IDと一致するか確認
                    if correct_company_id == TARGET_COMPANY_ID:
                        print(
                            f"Company ID {TARGET_COMPANY_ID} のデバイスを発見しました!"
                        )
                        print(f"Manufacturer Data: {correct_company_id}: {hex_value}")
                        await connect_to_device(device.address)
                        return
            else:
                print("Manufacturer Data: なし")

        print(f"Company ID {TARGET_COMPANY_ID} のデバイスが見つかりませんでした。")
        await asyncio.sleep(5)


async def main():
    await scan_switch_bot_devices()


if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("プログラムを終了します")

まとめ

  • switchbotミニはちょっとした電源OFF/ONの自動化テストに有用
    • 1万回ぐらいでしたらサクサク動きました
  • pythonでもbleakだったらHWプラットフォーム問わず使いやすそう
    • MACOS、ラズパイでそれぞれ動かすことができました。便利ですね
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?