はじめに
酔っ払っている時にリモコンって見つからないです。気分良くベッドで寝ようとしてもテレビが付いていたら台無し。せっかくGoogleHomeがあるんだから音声でテレビを操作すれば解決です。そこで調べてみるとこんな記事がありました - GoogleHomeからテレビ(LG製WebOS)をコントロールしてみる。ちょうど家のテレビもLGのWebOS搭載なのでなんだか行けそうです。
でも、このためだけにRaspberryPiを使うのももったいないし、FirebaseのトークンとかもめんどくさいのでESP32で作りましょう。また、firebaseの代わりにMQTTが使えるadafruit.ioを採用します。
用意するもの
- GoogleHome
- IFTTT
- Adafruit.io
- ESP32-DevKitC
コマンド
一部省略してます。
- ok google テレビつけて→テレビの電源ON
- ok google テレビ消して→テレビの電源OFF
- ok google テレビの音量下げて→音量が1段階さがる
- ok google テレビの音量上げて→音量が1段階上がる
やってみよう
では、実際に作っていきましょう。
Adafruit.io
まずAdafruit.ioのアカウントを作成して、ログインしてください。無料プランでも1秒に一回づつ送信できるので問題ないでしょう。スキーマも@nk-tamagoさんの記事にだいたい従います。
- フィード
- lgtv-action
- 値
- 電源ON
- {"type": "wol"}
- 電源OFF
- {"type": "off"}
- 音量上げる
- {"type": "volume-up"}
- 音量下げる
- {"type": "volume-down"}
- 電源ON
IFTTT
続いてIFTTTの設定です。Adafruit.ioはIFTTT対応してるので、そのまま設定するだけで終わりです。
ESP32
micropythonの書き込みに関してはmakerzineの記事を参考にしてください。
すぐはじめられるようになったESP32のMicroPython
Wake On Lan
電源ONはWake On Lanなので、マジックパケットを出力しましょう。
import usocket as socket
def send(mac, ipaddr="255.255.255.255", port=9):
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
p = b'\xff' * 6 + mac * 16
s.sendto(p, (ipaddr, port))
s.close()
LGテレビのAPI
lgtv2のソースを読んでみると、通信はWebSocketでJSONを流し込んでるだけみたいです。WebSocketを張りっぱなしにしておくとカーソル位置やボリューム変更等のイベントをリアルタイムに受信できるみたいですが、今回は無視します。
メッセージは基本的にid、type、payloadの組み合わせがあれば問題なさそう。idは48bitで、メッセージの識別に使ってますが固定値でも問題な下げなので固定値にします。さらに、各種操作はuriで指定します。
{"id":"aa35c1d80000", "type:":"hogehoge","payload":"fugafuga","uri":"ssap://????"}
WebSocketのコネクションを張ったらまずRegisterをします。typeをregisterにして、payloadをlgtv2のpairing.jsonにして送信すると、テレビ上に承認画面が出てくるので承認するとtypeがregisteredのメッセージが帰ってきます。
{"id":"aa35c1d80000","type":"register","payload":{"forcePa..."}}
{"type":"registered","id":"b88deb910000","payload":{"client-key":"a121e...7539dbc"}}
次回以降は、このclient-keyをregisterメッセージに含めれば承認なしで利用できるようになります。
電源OFFのリクエストはこんな感じです。
{"type":"request","id":"b88deb910000","uri":"ssap://system/turnOff"}
この処理のコードはこんな感じ。
import uwebsockets.client
import ujson as json
import uos as os
def get_cid():
return "aa35c1d80000"
def command(uri, payload=None):
with uwebsockets.client.connect('ws://your_tv_ip_address:3000') as websocket:
register(websocket)
json_payload = {"id":get_cid(), "type":"request"}
if uri != None:
json_payload["uri"] = uri
if payload != None:
json_payload["payload"] = payload
json_str = json.dumps(json_payload)
print(json_str)
websocket.send(json_str)
resp = websocket.recv()
websocket.close()
return resp
def register(websocket):
f = open("pairing.json")
pairing_payload = json.loads(f.read())
f.close()
f = open("client.key")
client_key = f.read()
f.close()
if client_key != "":
pairing_payload["client-key"] = client_key
json_payload = {"id":get_cid(), "type":"register", "payload": pairing_payload}
resp = None
websocket.send(json.dumps(json_payload))
while True:
fin, opcode, data = websocket.read_frame()
resp = json.loads(data)
if(resp["type"] == "registered"):
break
if client_key == "":
f = open("client.key", 'w')
f.write(resp["payload"]["client-key"])
f.close()
あとはMQTTとか接続の処理とかを書いたboot.pyを作っておしまい。
import time
import os
import ujson as json
import machine
from umqtt.simple import MQTTClient
import wol
import lgtv
def do_connect():
import network
sta_if = network.WLAN(network.STA_IF)
if not sta_if.isconnected():
print('connecting to network...')
sta_if.active(True)
sta_if.connect('your_wifi_ssid', 'your_wifi_pass')
while not sta_if.isconnected():
pass
print('network config:', sta_if.ifconfig())
def sub_cb(topic, msg):
print((topic, msg))
action = json.loads(msg)
if action['type'] == 'wol':
for i in range(5):
# change to your tv's mac address
wol.send(b'\xAA\xBB\xCC\xDD\xEE\xFF')
time.sleep(1)
elif action['type'] == 'off':
lgtv.command("ssap://system/turnOff")
elif action['type'] == 'volume-up':
lgtv.command("ssap://audio/volumeUp")
elif action['type'] == 'volume-down':
lgtv.command("ssap://audio/volumeDown")
def main():
c = MQTTClient("your_aio_name", "io.adafruit.com", user="your_aio_name", password="your_aio_active_key", keepalive=5000, ssl=True)
c.set_callback(sub_cb)
c.connect()
c.subscribe(b"your_aio_name/feeds/lgtv-action")
while True:
c.ping()
c.check_msg()
time.sleep_ms(500)
if __name__ == "__main__":
do_connect()
main()
なぜかMQTTライブラリが放置しておくとENOENTを出して止まってしまうので、その時点で強制的に再起動します。
純粋にkeepaliveタイマーの問題でした。MQTTClient
のコンストラクタでkeepaliveを渡し、メインループで定期的にpingすることで解消しました。
#まとめ
便利です。これで気持ちよく眠れます。
参考にさせていただいた@nk-tamagoさんありがとうございます。
ソースコード:https://github.com/ainehanta/esp32-lgtv-adafruitio-py
#あとがき
音声認識だとバルスがおはこですが、新年ですし滅びの呪文ではなく、蘇りの呪文を使いたいです。
リーテラトバリタウルスアリアロスバルネトリール
Echoだと横文字弱いので「テイラーと借りたです有安あるやる昴ニトリ言ってる」と頓珍漢な認識結果に。
GoogleHomeだと認識します。IFTTTでこれとテレビの電源ONをリンクすればなんか蘇った感有るのではないでしょうか。
(パソコン点けても良いかも)