18
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

ラズパイをハブとしてアレクサからSwitchBotを操作する

Last updated at Posted at 2022-12-11

こんにちは。

家の電気は点けたり消したりをアレクサでやっているのですが、WifiもBluetoothも赤外線も無いダウンライトがあり、全てを制御できていません。

そこで今回ついに物理的なスイッチのON/OFFを自動化するSwitchBotを買いました!

image.png
出典:Amazon.co.jp

これでアレクサとサクッと連携して……

image.png

ハ、ハブ……?

そうです。このSwitchBotはBluetoothの信号を受けて動くため、アレクサやGoogleHomeなどとはWifiで連携するので、そのハブとなるSwitchBotハブが必要なのです!!!

だ、だまされた……(騙されてない)

このままおめおめとハブを買うのも癪なので考えました。

 
ラズパイをハブとしてSwitchBotに信号を送れるのでは?
 

ということでこんな感じのシステムを作ってみます。AWS IoT Coreを使ったラズパイとの連携は以前も実装したことがあるので、今回の肝はラズパイとSwitchBot間のBluetooth(BLE)による連携です。

architecture.png

作成したPublisher(アレクサスキル)とSubscriber(ラズパイ)のプログラムはGitHubに上げています。

環境

Publisher(アレクサスキル)

  • AWS Lambda
  • NodeJS v18

Subscriber

  • RaspberryPi Model4
  • Raspberry Pi OS 10
  • Python3.7
  • SwitchBot ボット

Broker

  • AWS IoT Core

Bluetoothの信号をラズパイからSwitchBotに送る

SwitchBotはBLE(Bluetooth Low Energy)という規格に則り、GATT(Generic Attribute Profile)によって通信します。

Raspbianにはデフォルトでgatttoolが入っていて、これを使って手軽に通信することができます。

が、さらにSwitchBot公式でgattoolを使ったPythonスクリプトをGitHubに公開しています。楽勝では?

公式のPythonスクリプトをダウンロード

python2とpython3それぞれでスクリプトが提供されています。

SiwtchBotのMACアドレスを調べる

このスクリプトを使うにはSwitchBotのMACアドレスが必要です。

MACアドレスはSwitchBot公式アプリで確認できます。

うご……かない

あとはこのMACアドレスをパラメータとしてスクリプトを実行するだけです!

python3 switchbot_py2topy3.py [MACアドレス] Bot Press
Preparing to connect.
Connection successful.
Complete

……

………

…………

CompleteとなっていますがSwitchBotが動きません。

同じリポジトリにあるswitchbot.pyswitchbot_py3.pyも試しますが同じです。

ググってもググっても、みんなうまくいってそうで途方に暮れました。

公式アプリのBLE信号をキャプチャする

ひたすらググる中で 「Bluetoothの信号をキャプチャする」 というワードを目にしました。

これだ!!

Switchbot公式のアプリは動いているわけなので、Pythonスクリプトとアプリとの信号の差異を見れば何が悪いのかが分かるはずです。

ラズパイをキャプチャする

まずはラズパイから発信される信号をキャプチャしてみます。

最終的にWiresharkで確認するためにtsharkを使ってキャプチャを試みます。

sudo apt install tshark

まずラズパイ上のインターフェース一覧を出力してBluetoothインターフェースを確認します。

sudo tshark -D

1. wlan0
2. any
3. lo (Loopback)
4. eth0
5. docker0
6. bluetooth0
7. nflog
8. nfqueue
9. usbmon1
10. usbmon2
11. ciscodump (Cisco remote capture)
12. randpkt (Random packet generator)
13. sshdump (SSH remote capture)
14. udpdump (UDP Listener remote capture)

Bluetoothインターフェースのインデックスは6ですね。

インターフェースのインデックスとログ出力を指定してtsharkを起動します。この時、起動ユーザーはrootである必要があります。(なぜかsudoで動かないかつ先に空のファイル作っていないとダメでした)

sudo -i
touch capture.log
# -i でインターフェースを指定、-x でHexの出力、-Vで詳細出力、-w でキャプチャをファイル保存
tshark -i 6 -x -V -w capture.log

この状態で先ほどのPythonスクリプトを実行するとcapture.logに記録されます。

取得したログをWiresharkで見てみます。フィルタにbluetooth.addr==[MACアドレス]と指定することで、SwitchBot宛の通信を探します。

capture.log 2022_12_08 1_18_52.png

一応想定した値は送れているようです。

GATTの構造に関する説明は省きますが0x10がハンドル(書き込み位置)、0x57010が命令、というような意味合いになります。

SwitchBotに何を送ればどのような命令になるかはGitHubリポジトリに説明があります。

アプリの信号をキャプチャする

続いてSwitchBot公式アプリの信号をキャプチャしてみます。

私のスマートフォンはAndroidなので下記の記事を参考にBluetoothのスヌープログ取得を行いました。

ただし使っている端末がXiaomiのRedmi9TでカスタムOSのため「デバッグレポートの送信」というオプションは見当たりませんでした。公式ドキュメントによればスヌープログはbtsnoop_hci.logという名前で出力されるようなので、この名前を頼りにストレージからログを取り出しました。

取り出したログはWiresharkで解析できます。

btsnoop_hci(1).log 2022_12_09 1_08_30.png

スクリプトで送信しているものと同じ信号がNo.966と思われますが、よくよく見るとその前のNo.961でハンドル0x130x01という信号を送っています。

これはGATTのCCCD (Client Characteristic Configuration Descriptor)という属性で、0x01という値はNotifyを許可する意味があるようです。Notifyとは信号を受け取ったデバイス(サーバー)が、自身の属性値を変更したことを周り(クライアント)に通知するための通信方式です。

キャプチャを見るとNo.961でNotifyが有効になったため、No.966の送信に対してNo.972で値の変更を通知しているのが分かります。

先ほどのラズパイでは送ったきり何の反応もありませんでしたね。

……という違いは分かったのですが、正直なぜPythonスクリプトで動かないのか(Notifyを有効にしなければならないのか)は良くわかっていません。

とにかくこの部分が明確に異なるので、Pythonスクリプトを修正してみます。

うご……いた!

とりあえずPress命令送信前に何も考えずにハンドル0x13に値0x01を書き込みます。

switchbot_py2to3.py
    cmd_handle = con.before.decode('utf-8').split('\n')[-1].split()[2].strip(',')
+   con.sendline('char-write-req 13 0100')
+   con.expect('\[LE\]>')
python3 switchbot_py2topy3.py <MACアドレス> Bot Press
Preparing to connect.
Connection successful.
Complete

コンソールの出力は先ほどと同じですが、SwitchBotが動きました!!

Bluetoothをキャプチャしてみるとハンドル0x13への書き込みも行われていることが分かります。
capture3.log 2022_12_11 2_37_54.png

PubSubの構築

ここまでくればこっちのものです。

PubSubモデルとしてAWS IoT Coreを介したメッセージングアプリケーションを構築します。

AWS IoT Coreのセットアップ

詳細な手順は割愛しますが、AWS IoT Coreを使えばとても簡単にPubSubアプリケーションを構築することができます。AWS IoT Coreのエンドポイントがブローカーの役割を果たし、クライアント証明書によるエッジデバイスの認証やトピック単位の読み書き権限制御などが行えます。

設定方法や使い方は過去の記事で紹介しています。

アレクサスキルでパブリッシュ

パブリッシャーとなるのはアレクサスキルです。

今回は 「オリジナルボット」 という名前でスキルを作成しましたが、公開しない開発用スキルかつ、呼び出しは定型アクションにまとめるので名前は何でもよいです。

またインテントは「電気を点ける」「電気を消す」で命令が分けられるように2つのインテントを用意します。

image.png
image.png

プログラムはベタにAWS Lambda上に実装します。

こちらはスキル呼び出しのhandlerです。

index.ts
// スキルのインテントはパブリッシュ用の処理にJSON形式でメッセージを渡します。
//{location: スイッチの設置場所,switch_name: スイッチの識別名, state: スイッチのON or OFF}
// 電気を点けるインテント
const SwitchOffIntentHandler = {
    canHandle(handlerInput: HandlerInput) {
        return getRequestType(handlerInput.requestEnvelope) === 'IntentRequest' &&
            getIntentName(handlerInput.requestEnvelope) === 'SwitchOnIntent';
    },
    async handle(handlerInput: HandlerInput) {
        const publisher = MqttPublisher.build();
        await publisher.publish(TOPIC_ID,{location: LOCATION, switch_name: SWITCH_NAME, state: true});
        return handlerInput.responseBuilder.speak("").getResponse();
    }
}
// 電気を消すインテント
const SwitchOnIntentHandler = {
    canHandle(handlerInput: HandlerInput) {
        return getRequestType(handlerInput.requestEnvelope) === 'IntentRequest' &&
            getIntentName(handlerInput.requestEnvelope) === 'SwitchOffIntent';
    },
    async handle(handlerInput: HandlerInput) {
        const publisher = MqttPublisher.build();
        await publisher.publish(TOPIC_ID,{location: LOCATION, switch_name: SWITCH_NAME, state: false});
        return handlerInput.responseBuilder.speak("").getResponse();
    }
}

handlerからメッセージ配信処理を起動します。クライアント証明書を使ってエンドポイント(AWS IoT Core)に接続してメッセージを配信します。

mqtt_publisher.ts
class MqttPublisherImpl implements MqttPublisher {
    private mqttClientConnection: mqtt.MqttClientConnection
    constructor() {
        const client = new mqtt.MqttClient();
        const builder = iot.AwsIotMqttConnectionConfigBuilder.new_mtls_builder_from_path("./cert/certificate.pem.crt","./cert/private.pem.key");
        builder.with_client_id("switchbot-publisher");
        builder.with_certificate_authority_from_path(undefined,"./cert/AmazonRootCA1.pem");
        builder.with_endpoint("xxxxxxxxxxxxxxx.iot.ap-northeast-1.amazonaws.com");
        this.mqttClientConnection = client.new_connection(builder.build());
    }
    async publish(topic: string, body: SwitchBotTopicBody) {
        try {
            console.log("tls negotiation...");
            await this.mqttClientConnection.connect();
            console.log("connected");
            console.log(`publish message -> topic: ${topic}, message: ${JSON.stringify(body)}`);
            await this.mqttClientConnection.publish(topic, JSON.stringify(body),mqtt.QoS.AtLeastOnce);
            console.log("publish complete");
            await this.mqttClientConnection.disconnect();
            console.log("connection closed");
        } catch(error: any) {
            console.error(error);
        }
    }
}

ラズパイでサブスクライブ

メッセージをラズパイ上でサブスクライブし、先ほど作ったBluetooth送信のスクリプトを呼び出します。

処理の起点となるサブスクライバーはAWS-SDKのサンプルほぼそのままです。

controller.py
# state_repositoryは電気の状態を保持する簡易Repository
def on_message_received(topic, payload, **kwargs):
    print("Received message from topic '{}': {}".format(topic, payload))

    service.exec_switchbot(payload.decode("utf-8"),state_repository)

if __name == "main":
...
    mqtt_connection = mqtt_connection_builder.mtls_from_path(
        endpoint=args.endpoint,
        cert_filepath=args.cert,
        pri_key_filepath=args.key,
        client_bootstrap=client_bootstrap,
        ca_filepath=args.root_ca,
        on_connection_interrupted=on_connection_interrupted,
        on_connection_resumed=on_connection_resumed,
        client_id=args.client_id,
        clean_session=False,
        keep_alive_secs=15)

...
    connect_future = mqtt_connection.connect()
...
    print("Subscribing to topic '{}'...".format(args.topic))
    # メッセージを受信したらon_message_receivedをコールバック
    subscribe_future, packet_id = mqtt_connection.subscribe(
        topic=args.topic,
        qos=mqtt.QoS.AT_MOST_ONCE,
        callback=on_message_received)

サブスクライバーのメッセージ受信で実行される処理です。

注意点として今回のようなシンプルなSwitchBotは「点ける」も「消す」も関係なく同じ命令(Press)なので、例えば電気が消えた状態で「電気消して」というと電気を点けてしまいます。

そこで今回簡易的に メモリ上に電気のON/OFFのステータスを保持し、受信したメッセージの内容を見てSwitchBotに信号を送るかを判断するようにしてみました。

ここには載せていませんが値の変更はrepository.pyの中で行います。一応JSONフォーマットでファイルに書き出して永続化しています。

service.py
def exec_switchbot(payload: str, repository: IRepository)->None:
...
    received_data = json.loads(payload)
...
    target_switch_name = received_data['switch_name']
    required_state = received_data['state']
    # 受信メッセージの電気ON/OFと現在の電気の状態を比較してON->OFF,OFF->ONの場合はSwitchBotに信号を送る
    if current_state["state"] != required_state:
        if switchbot.execute(mac=current_state["mac"], dev_type="Bot", cmd="Press"):
            try:
                # SwitchBotへ信号が送信出来たらメモリ上のステータスを更新
                repository.update_state_by_switch_name(target_switch_name, required_state)
            except UpdateStateError as e:
                print("Update state failed.")
                return

完成!!

最後にアレクサの定型アクションをセットアップして完成です。

現在は 「アレクサ、電気消して」 と言うとアレクサ標準のスマートホーム制御のアクションを実行する定型アクションを設定しています。ここに今回作成したスキル呼び出しを追加します。

  1. 既存の定型アクションに 「カスタムスキル」 を追加
  2. 呼び出しフレーズとして 「オリジナルボットで電気を消して」 を設定。

これでもともとあったスマートホームの制御アクション(スクリーンショットの「一階 電源:オフ」)に続いて、オリジナルボットスキルの呼び出しが実行されます

最後にcontroller.pyを起動してラズパイでメッセージを待ち受けます。

python3 controller.py --endpoint <AWS_IOTのエンドポイント> --cert <AWS_IOTクライアント証明書の証明書ファイルのパス> --key <AWS_IOTクライアント証明書プライベートキーのパス> --root-ca <AWS_IOTクライアント証明書のルート証明書のパス> --topic switchbot

ということで、こんな感じに動きます。
demo.gif

ちょっと時間差が生じるのが気になりますが、SwitchBotハブ無しでアレクサからSwitchBotを動かすことができました!

18
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
18
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?