はじめに
Amazon Echoを買ってみたら Alexaがなかなか良いので、上手く活用したいなと思いました。
「やっぱ家電とか操作したいよな。スマートホームってやつ」「ただ対応している家電をわざわざ買うのもなぁ」ということから辿り着いたのがこちらの記事です。
格安スマートリモコンの作り方
https://qiita.com/takjg/items/e6b8af53421be54b62c9
材料費400円の赤外線送受信器を、1,800円のラズパイZeroに載せて、Google Homeから操作する方法を、ゼロから丁寧に解説します。
赤外線を発信する回路
こちらのサイトに、とても丁寧に解説されています → 赤外線LEDドライブ回路の決定版
素晴らしい記事です。このスマートホームシステムを構築中、何回読んだか数え切れないぐらいです。
こちらの記事のはGoogle Homeですから、それをAlexaに置き換えたらいいだけ。何て有り難い。
そんなわけで Amazon Echo(Alexa) + Raspberry Piでスマートホームシステム作成していきましょう。
概要
本記事の技術的な内容は次の通りです。
- Raspberry Piから赤外線を送信して家電(ライト)を操作する機能を実装します。
- 人感センサーを使って人を検知したらライトをONにして、一定時間いなかったらライトをOFFにします
- 光センサーを利用して部屋が明るいか暗いか(ライトがついているかいないか)を検知して余計な赤外線送信をしないようにします。
- Alexa からの命令をnode-red-contrib-amazon-echoを使ってRaspberry Piが受け取るようにします。
- 命令を一旦 クラウド上のデータベースに保存してデバイス間通信をします。今回は Firebase Realtime Database を使用します。
- プログラム言語は Pythonを使用します。Python 3.9.2で動作確認しています。Python3.8以下だと動きませんが、それは主に型ヒント(listなど)で動かないだけなので(
from typing import List
など)適宜修正してもらえれば動くはずです
今回使用したラズパイは 開発用として "Raspberry Pi 4 Model B"、本番用として"Raspberry Pi Zero WH"を使用しました。OSはRaspberry Pi OS 2022-09-22リリースのを使用しています。
※1 ラズパイZeroでは VSCodeのRemote Developmentが動かないので開発しづらいのです。
※2 本記事執筆時点ではRaspberry Pi はどこも売り切れ、もしくはとってもお高い。2023年は回復するといいですね。参考:ラズパイの2023年はサプライチェーン回復の年
仕様
さて、一口にスマートホームシステムを作りたいといっても具体的にどういうのが良いでしょうか。
音声でのライトのON・OFFは必須。あといちいち声で命令するのも面倒だから、人感センサー付けて部屋に入ったら自動でONにしてほしいですね。あと一定時間、人がいなかったらOFFするのもいい。デバイスはリビング部屋用や台所用とか複数用意したいです。
そのため現状ざっくり仕様としてはこんな感じでしょうか。
- Amazon Echo(Alexa)からの命令をラズパイで受信し、赤外線を送ってライトのON、OFFを行なう
- 人感センサーを搭載して、人がいたら自動でライトON,一定期間居なかったらライトOFF
- 光センサーを搭載して、明るかったらライトONをしない、暗かったらライトOFFしないようにして余計な赤外線送信をしないようにする
- 複数デバイス想定
- 今後色んな機能を追加するかもしれないので拡張性ありを想定
ハードウェア構築
必要なのは赤外線の送信回路。そして人感センサーと光センサーの回路です。
赤外線の送信回路は冒頭でも紹介したこちらの記事を参照してください。
格安スマートリモコンの作り方
人感センサーこと焦電型赤外線センサーは"EKMC1601111"を使用します。
https://akizukidenshi.com/catalog/g/gM-09750/
光センサーは、光可変抵抗器 "MI527"(Cdsセル5mmタイプ)を使用します。
https://akizukidenshi.com/catalog/g/gI-00110/
本来であれば、このようなアナログ信号は、アナログ・デジタル変換(MCP3208とかMCP3202とか)して SPI(Serial Peripheral Interface)で情報をやり取りするそうです。
……正直面倒くさいなぁと思いました。配線が増えるし、ラズパイゼロのユニバーサル基板のスペースが足りるでしょうか。そこまで正確な値がほしいわけでもないし、もしかしたら今後SPIは別の用途に使うかもしれません。
そこで、gpiozeroを使うことにしました。
gpiozeroをインストールする必要がありますが便利です。
sudo pip install gpiozero
gpiozeroライブラリ入門⑲import MotionSensor
gpiozeroライブラリ入門⑳import LightSensor (LDR)
これなら人感センサーも直接配線するだけでOK。光センサーもコンデンサと組合わせるだけ。楽!
コンデンサはこちらを使用しました。
フィルムコンデンサー 0.1μ F50V
https://akizukidenshi.com/catalog/g/gP-05332/
ラズパイのセットアップとハードウェア操作
ラズパイの初期セットアップはこちらの記事が参考になると思います。
Raspberry Pi はじめての初期セットアップ(2022/04/04版)
https://www.ingenious.jp/articles/howto/raspberry-pi-howto/raspberry-pi-basic-setup/
赤外線送受信が出来ることを確認
格安スマートリモコンの作り方:ラズパイで回路を制御の項目を参考に設定してください。
上記記事と2023年1月現在との違いは。今回使用したOSのバージョンには pigpioが既にインストールされているので、こちらのインストール作業は必要無いことぐらいです。
sudo apt install pigpio
この時点で赤外線受信機能を使って、ライトの赤外線信号を記録したJSONファイルを作成しておきます。
ファイル名は light.json とします。
人感センサーと光センサーの値が取れることを確認
gpiozeroライブラリ入門⑲import MotionSensor のサンプルコードが動くことを確認します。
from gpiozero import MotionSensor
pir = MotionSensor(4)
pir.wait_for_motion()
print("Motion detected!")
gpiozeroライブラリ入門⑳import LightSensor (LDR) のサンプルコードが動くことを確認します。
from gpiozero import LightSensor
ldr = LightSensor(4)
ldr.wait_for_light()
print("Light detected!")
# 指でCdSをふさいで、実行します。指を離すと、Light detected!と表示が出て終了しました。
Amazon Echo(Alexa) と Raspberry Piの連携
Amazon Echo(Alexa)からの命令をRaspberry Piが受信する必要があります。
代表的なものに Alexa Home Skill Bridgeを使う方法とnode-red-contrib-amazon-echoを使う方法の2つがあるようですが、前者は上手くいったものの後者は上手くいきませんでした。そこで 上手くいったAlexa Home Skill Bridge を使って連携します。
Alexa Home Skill Bridgeのセットアップ
こちらの記事が参考になりました。
Amazon Echoとラズパイで、音声で照明をon/offする
手順としては次の通りです。
- Ben-Hardill-Node-RED を有効にする
https://www.amazon.co.jp/Ben-Hardill-Node-RED/dp/B01N0D97FZ - Node-RED Alexa Home Skill Bridge にてデバイス登録を行なう
https://alexa-node-red.bm.hardill.me.uk/devices - スマートホーム -> デバイスから、登録した新しいデバイス検出する
https://alexa.amazon.co.jp/spa/index.html#cards - ラズパイに Node-Red をインストール・アップデートする
https://nodered.jp/docs/getting-started/raspberrypi - node-red-contrib-alexa-home-skill を Node-Red にインストールする
Palette→Install タブを選択し、検索欄に node-red-contrib-alexa-home-skill を入力し install
Alexa Home Skill Bridgeにて、「部屋の電気」や「台所の電気」といったデバイスを設定します。ついでに外出先からパソコンの電源をONにもしたかった( Wake on LAN )ので、パソコンの電源というデバイスも追加しておきました。
それではまず部屋担当ラズパイの設定をしてみます。Node-Redの設定はこんな感じです。
EXEしているシェルはこちら。
#!/bin/sh
cd `dirname $0`
python3 ir_send.py normal_on
#!/bin/sh
cd `dirname $0`
python3 ir_send.py normal_on
Pythonコードは下記の通りです。
3行目にある from ir import IR
である IR は、こちらのコードを使いやすくクラス化したものです。長いので本記事の最後に添付しておきます。
import sys
import os
from ir import IR
current_path = os.path.dirname(os.path.abspath(__file__))
light_code_file_path = f"{current_path}/light.json"
value = str(sys.argv[1])
ir = IR()
ir.send(17, light_code_file_path, value)
これによって、Amazon Echoから「アレクサ、部屋の電気を消して」「アレクサ、部屋の電気をつけて」が可能になりました。あとは「アレクサ、台所の電気をつけて」も出来るように、台所担当ラズパイに、これと同じ設定をすればいいだけです。
(メモ)node-red-contrib-amazon-echoが上手くいかない現状について
できれば完全にローカルで完結する node-red-contrib-amazon-echo を使いたかったのですが、Amazon Echoがデバイスを検出してくれないので見送りです。
送信側であるAmazon Echoに問題があるのか、受け手側であるRaspberry Piに問題があるのかまだ分かっていません。受け手側は、たとえば下記記事にあるようにポートフォワードをする必要があったりと少々複雑です。
【RaspberryPi】Amazon Echoを使った音声認識でNode-REDの処理を行う
ただAmazon Echo側に問題ありそうだという印象を受けています。Amazon Echoは 自身のネットワークの80番ポートを検索してデバイスを検出する仕様とのことなので、試しに80番ポートを開けたデバイスを立ててパケットキャプチャしてみたのですが、何のパケットも来ませんでした。また受信側へのアクセスをしてみたところ80番ポートが空いているのが確認できてポートフォワード自体は上手くいっているみたいで受け手側の問題ではなさそうなのですよね。Amazon Echo側の通信ログが見たいところですが、あいにくと見方が分かりません……
上手くいってる方の解説記事。
問題発生による構成変更 と Firebase Realtime Databaseの導入
さて、同様に台所用ラズパイをセットアップしたところ問題が発生しました。Alexa Home Skill Bridge では同じアカウントを複数のデバイスで使うことが出来ないようです。設定すると、部屋担当と台所担当が お互いがそれぞれ connected → disconnect を繰り返す状態になります。デバイスごとにアカウントを作成すればできそうですが、それは煩雑なのでやりたくありません。
そこでAlexa Home Skill Bridgeと繋がる専用のラズパイ、マスターラズパイを作り、マスターが部屋用ラズパイと台所用ラズパイに命令を出すような構成にすることにしました。
この構成の方が、Amazon Echoがデバイスを直接操作するよりも、一つの命令で複数のデバイスを操作するといったことも可能になり拡張性が上がります。
今回はその通信用として、Firebase Realtime Databaseを使うことにしました。
- Amazon Echo → Alexa Home Skill Bridge → Masterラズパイ→Firebase Realtime Databaseに命令を書き込み
- 部屋用と台所用ラズパイはFirebase Realtime Databaseに命令が書き込まれたら実行
(注釈)クラウドを使うのではなく、マスターラズパイにMongoDBを構築するといったローカルで直接やり取るする方法も検討したのですが、構築が面倒だったことと Alexaのカスタムスキルを使って直接書き込みたいという構想もあったので止めました。
もっともクラウドでもFirebase採用だったのは「Firebaseは最近話題になるからどっかで使ってみたい」といったモチベーションの方が大きかったです。
現状、Realtime Databaseはアメリカ、ベルギー、シンガポールのリージョンのみで日本がありません。通信ラグが発生しそうです。そのためAWSとかAzureのデータベースを使ってもいいでしょう。特にAWSはAmazon Echoと相性も良さそうです。
Firebase Realtime Databaseのセットアップ
任意の名前のプロジェクトを作成して、そこに Realtime Database を追加するだけです。
こちらの記事が参考になります。Firebase Realtime Databaseを使用しデータベース作成しデータを登録する
ただし上記記事では「テストモードで開始する」にしていますが、ロックモードのままにしておいてください。
また記事中の『ウェブアプリにFirebaseを追加する』以降は実行する必要はありません。
詳しくはFirebaseRealtimeデータベースのドキュメントを確認してください。
https://firebase.google.com/docs/database?hl=ja
今回試験的に作成したデータベースのURLはこちらになります。
https://smarthomesystem-2c112-default-rtdb.firebaseio.com
作成したプロジェクトの設定 -> サービスアカウント にて、Firebase Admin SDKで使用する秘密鍵を生成します。
今回は Pythonを使っているので、Admin SDK 構成スニペットを確認しておきましょう。
ここで、「新しい秘密鍵を生成」をクリックしてキーを生成します。
こんなJSONファイルがダウンロードされます。ファイル名は長いので、firebase-adminsdk-key.json とかにしておきましょうか。
ラズパイからFirebase Realtime Databaseの読み書き
Pythonで使えるよう firebase-adminをインストールします。
pip install --user firebase-admin
32ビットOSの場合の対応
今回試してみた限り64ビットOSではfirebase-adminのインストールは問題ありませんでしたが、32ビットOSでは firebase-admin のインストールに失敗する事象を確認しています。
ラズパイゼロなど32ビットOSで実施する場合は、予め次のコマンドで足りないパッケージをインストールしておきましょう。
sudo apt-get install libssl-dev
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh
これが必要な理由は、エラーメッセージが can't find Rust compiler
というRustのコンパイラが必要だというものだったり、fatal error: openssl/opensslv.h: そのようなファイルやディレクトリはありません
というような open-ssl が足りないといったものだったからです。
(参考):Raspberry Pi に Rust をインストールする
Rustのインストール後は、メッセージに表示されるように一度ログアウトしておいた方がいいかもしれません。
To get started you may need to restart your current shell.
This would reload your PATH environment variable to include
Cargo's bin directory ($HOME/.cargo/bin).
To configure your current shell, run:
source "$HOME/.cargo/env"
firebase-adminインストール終了時に次の警告が出たらパスが通っていないということなので、メッセージの通りパスを加えてください。
WARNING: The scripts pyrsa-decrypt, pyrsa-encrypt, pyrsa-keygen, pyrsa-priv2pub, pyrsa-sign and pyrsa-verify are installed in '/home/user/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
WARNING: The script doesitcache is installed in '/home/user/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
環境変数にパスを追加:
export PATH=/home/ユーザー名/.local/bin:$PATH
またこれは主にラズパイゼロにインストールしていた時の話ですが、firebase-adminをインストールしている最中に強制終了してしまうことがありました。おそらくはメモリが足りない所為です。
Collecting protobuf!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.19.5
Using cached https://www.piwheels.org/simple/protobuf/protobuf-4.21.12-py2.py3-none-any.whl (165 kB)
Collecting grpcio<2.0dev,>=1.33.2
強制終了
その場合、不必要なサービスを一時停止して再度インストールを試してください。
firebase-adminをインストールすると pipが壊れる?
正確な事象再現までには至っていないため無関係かもしれませんが、firebase-admin をインストールをしたぐらいのタイミングで pip が正常に動かなくなってしまう事象を確認しています。
その場合、python3-pip を一旦アンインストールして手動でのインストールで解決しました。
sudo apt remove python3-pip
wget https://bootstrap.pypa.io/get-pip.py
sudo python3 get-pip.py
再起動してインストール。
pip3 install pyopenssl --upgrade
Pythonでデータベースに読み書き
firebase-adminのインストールが終了したらPythonで読み書きできるのか確認してみます。
第一引数に場所(Room or Kitchen)を、第二引数に内容(ON or OFF)を設定できるPythonコードが以下になります。
秘密鍵ファイルの「firebase-adminsdk-key.json」をPythonコードファイルと同じ場所に置いておきます。
プログラム中の databaseURLは、今回使用する https://smarthomesystem-2c112-default-rtdb.firebaseio.com
を設定していますが、ここはそれぞれのデータベースのURLを設定してください。
import os
import sys
import firebase_admin
from firebase_admin import credentials, db
current_path = os.path.dirname(os.path.abspath(__file__))
place = str(sys.argv[1])
value = str(sys.argv[2])
cred = credentials.Certificate(f"{current_path}/firebase-adminsdk-key.json")
default_app = firebase_admin.initialize_app(
cred, {"databaseURL": "https://smarthomesystem-2c112-default-rtdb.firebaseio.com"}
)
orders_ref = db.reference("/orders")
led_ref = orders_ref.child(place).child("LED")
led_ref.set(value)
これを実行すると、データベースに書き込まれるところが確認できます。
python3 set2firebase.py Kitchen OFF
読み取るコードはこちらです。ほとんど同じで.set(value)
の代わりに.get()
を使うだけで取得できます。
import os
import sys
import firebase_admin
from firebase_admin import credentials, db
current_path = os.path.dirname(os.path.abspath(__file__))
cred = credentials.Certificate(f"{current_path}/firebase-adminsdk-key.json")
default_app = firebase_admin.initialize_app(
cred, {"databaseURL": "https://smarthomesystem-2c112-default-rtdb.firebaseio.com"}
)
orders_ref = db.reference("/orders")
led_ref = orders_ref.child("Kitchen").child("LED")
value: str = led_ref.get()
print(value)
ハードとソフトの組立
ここまででようやく各部品が揃いましたので、ここでそれぞれの担当を整理しておきます。
- Amazon Echo:Node-REDをインストールして、登録してあるデバイスを認識する
https://www.amazon.co.jp/Ben-Hardill-Node-RED/dp/B01N0D97FZ - マスターラズパイ: Alexa Home Skill Bridgeで命令を受信。Firebase Realtime Databaseに書き込みを行なう
- 部屋ラズパイと台所ラズパイ:Firebase Realtime Databaseの命令を受信して、指定された赤外線送信を行なう。またセンサーの情報を取得して赤外線の送信を行なう。
まずは部屋ラズパイのハードウェアをブレッドボードを使って構築します。スマートリモコンの回路に人感センサーと光センサーを付け加えた図です。
それぞれ使用しているピンは次の通りです。テスト時にそのピンを使用したのをそのまま流用しただけなので深い意味はありません。適宜自身の環境にあったものを採用してください。
- MotionSensor = GPIO 21
- LightSensor = GPIO 4
- 赤外線送信 = GPIO 17
- 他 5Vや3.3V、GNDの配線
テストして上手くいったら台所ラズパイのハードも作ります。
マスターラズパイ側の設定
前述の set2firebase.py
をそのまま使用します。
Amazon Echoからのオーダーを受信する Node-Redの全体の設定は次のような形にします。
叩くシェルファイルはそれぞれ次の通りです。
#!/bin/sh
cd `dirname $0`
python3 set2firebase.py Room ON
#!/bin/sh
cd `dirname $0`
python3 set2firebase.py Room OFF
#!/bin/sh
cd `dirname $0`
python3 set2firebase.py Kitchen ON
#!/bin/sh
cd `dirname $0`
python3 set2firebase.py Kitchen OFF
部屋(台所)のラズパイ用のPythonコード
常にループさせてデータベースを監視するようにします。命令があったら赤外線を送信して、命令のデータを消去します。
このコード自体は部屋ラズパイ用のコードですが、台所ラズパイの場合、8行目の MY_PLACE = "Room"
を MY_PLACE = "Kitchen"
に置き換えます。ハードコーディングです。
log_folder も保存したいログのパスを設定してください。
import logging
import os
import firebase_admin
from firebase_admin import credentials, db
from ir import IR
MY_PLACE = "Room"
IR_GPIO_NUMBER = 17
log_folder = "/home/user/log"
current_path = os.path.dirname(os.path.abspath(__file__))
# ログの設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.FileHandler(f"{log_folder}/ir_firebase.log")
logger.addHandler(handler)
fmt = logging.Formatter("%(asctime)s %(message)s")
handler.setFormatter(fmt)
cred = credentials.Certificate(f"{current_path}/firebase-adminsdk-key.json")
database_url = "https://smarthomesystem-2c112-default-rtdb.firebaseio.com"
default_app = firebase_admin.initialize_app(cred, {"databaseURL": database_url})
light_code_file_path = f"{current_path}/light.json"
ir = IR()
try:
while True:
orders_ref = db.reference("/orders")
led_ref = orders_ref.child(MY_PLACE).child("LED")
led_order: str = led_ref.get()
if str(led_order) == "OFF":
ir.send(IR_GPIO_NUMBER, light_code_file_path, "off")
led_ref.set("")
logger.info("LED:OFF")
if str(led_order) == "ON":
ir.send(IR_GPIO_NUMBER, light_code_file_path, "normal_on")
led_ref.set("")
logger.info("LED:ON")
except KeyboardInterrupt: # CTRL+C
print("Bye.")
次はセンサーに反応があったらライトをONにするプログラムです。また15分間反応がなかったらOFFにします。
こちらの log_folder も保存したいログのパスを設定してください。
import logging
import os
import time
from datetime import datetime, timedelta
from gpiozero import LightSensor, MotionSensor
from ir import IR
log_folder = "/home/user/log"
IR_GPIO_NUMBER = 17
MOTION_GPIO_NUMBER = 21
LIGHT_GPIO_NUMBER = 4
wait_time_min = 15
current_path = os.path.dirname(os.path.abspath(__file__))
# ログの設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.FileHandler(f"{log_folder}/ir_sensor.log")
logger.addHandler(handler)
fmt = logging.Formatter("%(asctime)s %(message)s")
handler.setFormatter(fmt)
light_code_file_path = f"{current_path}/light.json"
motion_sensor = MotionSensor(MOTION_GPIO_NUMBER)
ldr = LightSensor(LIGHT_GPIO_NUMBER, threshold=0.1)
ir = IR()
lighton_datetime = datetime.now() + timedelta(minutes=wait_time_min)
try:
while True:
current_datetime = datetime.now()
if motion_sensor.motion_detected is True:
if ldr.light_detected is False:
ir.send(IR_GPIO_NUMBER, light_code_file_path, "normal_on")
logger.info("LED:ON")
lighton_datetime = datetime.now() + timedelta(minutes=wait_time_min)
if lighton_datetime < current_datetime:
if ldr.light_detected is True:
ir.send(IR_GPIO_NUMBER, light_code_file_path, "off")
logger.info("LED:OFF")
time.sleep(1) # 1秒間待つ
except KeyboardInterrupt: # CTRL+C
print("Bye.")
(注釈)ここでir_from_sensor.py
にてセンサーに反応があったらそのまま赤外線送信していますが、一旦データベースに赤外線送信の命令を格納して、赤外線の送信の役割をir_from_firebase.py
に一本化した方が良いかもしれません。複雑になってくると命令のバッティングが発生しそうです。
とりあえず現状はデータベース読み書きの時間が発生する時間が惜しいというスピード優先のこれでいきます。
Pythonコードの自動起動
作ったPythonコードを再起動しても自動起動するようにします。
こちらの記事が参考になります。
Raspberry Pi で systemd を使ってプログラムを自動実行する
https://qiita.com/molchiro/items/ee32a11b81fa1dc2fd8d
/opt/mybin
フォルダを作り、そこにPythonコードや秘密鍵のJSONファイル、赤外線信号JSONファイルを保存します。
- ir.py
- ir_from_firebase.py
- ir_from_sensor.py
- firebase-adminsdk-key.json
- light.json
サービス設定ファイルは次のようにしました。
私の環境のユーザーは user なので、User=user になっています。
ラズパイのデフォルトユーザーは pi なので、その場合は User=pi と設定してください。
[Unit]
Description=IR from firebase
After=network-online.target
Wants=network-online.target
[Service]
User=user
ExecStart=/usr/bin/python3 /opt/mybin/ir_from_firebase.py
Restart=always
Type=simple
RestartSec=60s
[Install]
WantedBy=multi-user.target
サービスファイルについての説明
このPythonコードは、クラウドのfirebaseを使用しているため、サービス起動はオンラインになるまで待つ必要があります。
設定の After と Wants をnetwork-online.target
に設定することでオンラインになってからサービスを起動します。
After=network-online.target
Wants=network-online.target
参考:(詳しい話)Systemdのネットワーク関連ユニット解説(After=network.targetあたり)
しかしながらこのAfterとWantsの設定だけでは解決しませんでした。どうしても次のようなエラーが発生してサービスが落ちてしまいます。
1月 27 01:20:13 raspberrypi python3[571]: request_succeeded, response_data, retryable_error = _perform_request()
1月 27 01:20:13 raspberrypi python3[571]: File "/home/user/.local/lib/python3.9/site-packages/google/oauth2/_client.py", line 180, in _perform_request
1月 27 01:20:13 raspberrypi python3[571]: response = request(
1月 27 01:20:13 raspberrypi python3[571]: File "/home/user/.local/lib/python3.9/site-packages/google/auth/transport/requests.py", line 199, in __call__
1月 27 01:20:13 raspberrypi python3[571]: six.raise_from(new_exc, caught_exc)
1月 27 01:20:13 raspberrypi python3[571]: File "<string>", line 3, in raise_from
1月 27 01:20:13 raspberrypi python3[571]: google.auth.exceptions.TransportError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url:
しばらく待ってrestartすれば上手く動くので、自動で再起動をさせようと Restart=always
の設定にしてみることにしました。しかしそれでは、start request repeated too quickly for ~
が出てしまいます。(デフォルトで5回再起動を繰り返すと出現)
そこで再起動の間隔を 60s に設定することで、強引に解決しています。(デフォルト値は100ms)
RestartSec=60s
(参考)オプション設定:RestartSec= systemd.service — Service unit configuration
センサーから赤外線送信する方のサービスは次の通りです。こちらは必ずしもオンラインである必要がないので、After=network.target としてあります。RestartSecの設定も省略です。
[Unit]
Description=IR from sensor
After=network.target
[Service]
User=user
ExecStart=/usr/bin/python3 /opt/mybin/ir_from_sensor.py
Restart=always
Type=simple
[Install]
WantedBy=multi-user.target
まとめ
Amazon Echo(Alexa) + Raspberry Pi を使ってスマートホームシステムの基本を構築してみました。
現状は「アレクサ、部屋(台所)の電気つけて(消して)」といった音声での命令と、ライトOFFの状態で置いてあるラズパイの近くにいくとONになることと、15分間ずっと人がいないとOFFになるという機能が実装できました。
(動きがないと人感センサーは反応しないので、ずっと座っているような部屋の方は人感センサーの機能は切った方がいいかも)
これをさらに応用することでエアコンのスイッチを冷房、暖房、ドライにしたり、扇風機のON・OFFも可能になりそうです。
またハード部分はブレッドボードの状態なので、これでしばらく運用してみて、問題なさそうならユニバーサル基板に半田付けしていきたいですね。
ぶっちゃけ電子工作は素人もいいところなので何かありましたらお気軽にコメント頂けると嬉しいです。
次回予定
音声で部屋と台所の電気をON、OFFできるようになりましたが、出掛ける時などは「Alexa いってきます」といった一つの命令で複数のデバイスを操作できるようにしたいです。
現状だと「アレクサ、部屋の電気を消して」→Alexa「はい、分かりました」(部屋の電気が消える)、「アレクサ、台所の電気を消して」といったように、順番に命令をしなければなりませんが、そんなの面倒です。手でやった方が早いです。
そこでAlexaのカスタムスキルを構築して、一つの命令でまとめてデバイスを操作することを実現したいと思います。
追記:記事を公開しました。Alexa のカスタムスキルから Firebase のデータベースに書き込む
他参考にした書籍とキット
電子工作の基本を学習するために使用したものです。
以下Amazonリンク
- Raspberry PiによるIoTシステム開発実習:センサネットワーク構築からwebサービス実装まで 単行本
- Raspberry Piで学ぶ電子工作 専用 実験キット 基本部品セット スターターパック (電子部品関連)
使いやすくクラス化した赤外線送受信プログラム
途中で出てきた ir.pyのコードを記載しておきます。
こちらのプログラムは使いづらかったので、自分用にクラス化したものです。
import json
import logging
import os
import time
from typing import Optional
import pigpio
logger = logging.getLogger(__name__)
class IR:
"""赤外線の送受信を担当するクラス
Python3.9 以上
元スクリプトを使いやすくクラス化したもの
http://abyz.me.uk/rpi/pigpio/examples.html#Python_irrp_py
Raises:
Exception: _description_
Exception: _description_
Returns:
_type_: _description_
"""
GLITCH = 100
SHORT = 10
FREQ: float = 38.0
def __init__(self) -> None:
self.last_tick = 0
self.in_code = False
self.code: list[float] = []
self.fetching_code = False
PRE_MS = 200
self.PRE_US = PRE_MS * 1000
self.POST_MS = 15
self.POST_US = self.POST_MS * 1000
TOLERANCE = 15
self.TOLER_MIN = (100 - TOLERANCE) / 100.0
self.TOLER_MAX = (100 + TOLERANCE) / 100.0
GAP_MS: int = 100
self.GAP_S = GAP_MS / 1000.0
def _backup(self, f: str):
"""f -> f.bak -> f.bak1 -> f.bak2
Args:
f (str): [description]
"""
file_path = os.path.realpath(f)
try:
os.rename(f"{file_path}.bak1", f"{file_path}.bak2")
except Exception:
pass
try:
os.rename(f"{file_path}.bak", f"{file_path}.bak1")
except Exception:
pass
try:
os.rename(file_path, f"{file_path}.bak")
except Exception:
pass
def _normalise(self, c: list[float]):
"""
Typically a code will be made up of two or three distinct
marks (carrier) and spaces (no carrier) of different lengths.
Because of transmission and reception errors those pulses
which should all be x micros long will have a variance around x.
This function identifies the distinct pulses and takes the
average of the lengths making up each distinct pulse. Marks
and spaces are processed separately.
This makes the eventual generation of waves much more efficient.
Input
M S M S M S M S M S M
9000 4500 600 540 620 560 590 1660 620 1690 615
Distinct marks
9000 average 9000
600 620 590 620 615 average 609
Distinct spaces
4500 average 4500
540 560 average 550
1660 1690 average 1675
Output
M S M S M S M S M S M
9000 4500 609 550 609 550 609 1675 609 1675 609
"""
logger.debug("before normalise", c)
entries = len(c)
p = [0] * entries # Set all entries not processed.
for i in range(entries):
if not p[i]: # Not processed?
v = c[i]
tot = v
similar = 1.0
# Find all pulses with similar lengths to the start pulse.
for j in range(i + 2, entries, 2):
if not p[j]: # Unprocessed.
# Similar.
if (c[j] * self.TOLER_MIN) < v < (c[j] * self.TOLER_MAX):
tot = tot + c[j]
similar += 1.0
# Calculate the average pulse length.
newv = round(tot / similar, 2)
c[i] = newv
# Set all similar pulses to the average value.
for j in range(i + 2, entries, 2):
if not p[j]: # Unprocessed.
# Similar.
if (c[j] * self.TOLER_MIN) < v < (c[j] * self.TOLER_MAX):
c[j] = newv
p[j] = 1
logger.debug("after normalise", c)
def _compare(self, p1: list[float], p2: list[float]) -> Optional[list[float]]:
"""Check that both recodings correspond in pulse length to within
TOLERANCE%. If they do average the two recordings pulse lengths.
Input
M S M S M S M S M S M
1: 9000 4500 600 560 600 560 600 1700 600 1700 600
2: 9020 4570 590 550 590 550 590 1640 590 1640 590
Output
A: 9010 4535 595 555 595 555 595 1670 595 1670 595
Args:
p1 (List[float]): [description]
p2 (List[float]): [description]
Returns:
Optional[List[int]]: [description]
"""
if len(p1) != len(p2):
return None
for i in range(len(p1)):
v = p1[i] / p2[i]
if (v < self.TOLER_MIN) or (v > self.TOLER_MAX):
return None
ave_p: list[float] = []
for i in range(len(p1)):
ave_p.append(int(round((p1[i] + p2[i]) / 2.0)))
logger.debug("after compare", p1)
return ave_p
def _tidy_mark_space(self, records: dict[str, list[float]], base: int):
ms: dict[float, int] = {}
# Find all the unique marks (base=0) or spaces (base=1)
# and count the number of times they appear,
for rec in records:
rl = len(records[rec])
for i in range(base, rl, 2):
if records[rec][i] in ms:
ms[records[rec][i]] += 1
else:
ms[records[rec][i]] = 1
logger.debug("t_m_s A", ms)
e: list[float] = []
v = 0
tot = 0
similar = 0
for plen in sorted(ms):
# Now go through in order, shortest first, and collapse
# pulses which are the same within a tolerance to the
# same value. The value is the weighted average of the
# occurences.
#
# E.g. 500x20 550x30 600x30 1000x10 1100x10 1700x5 1750x5
#
# becomes 556(x80) 1050(x20) 1725(x10)
#
if len(e) == 0:
e = [plen]
v = plen
tot = plen * ms[plen]
similar = ms[plen]
elif plen < (v * self.TOLER_MAX):
e.append(plen)
tot += plen * ms[plen]
similar += ms[plen]
else:
v = int(round(tot / float(similar)))
# set all previous to v
for i in e:
ms[i] = v
e = [plen]
v = plen
tot = plen * ms[plen]
similar = ms[plen]
v = int(round(tot / float(similar)))
# set all previous to v
for i in e:
ms[i] = v
logger.debug("t_m_s B", ms)
for rec in records:
rl = len(records[rec])
for i in range(base, rl, 2):
records[rec][i] = ms[records[rec][i]]
def _tidy(self, records: dict[str, list[float]]):
self._tidy_mark_space(records, 0) # Marks.
self._tidy_mark_space(records, 1) # Spaces.
def _end_of_code(self):
if len(self.code) > self.SHORT:
self._normalise(self.code)
self.fetching_code = False
else:
self.code = []
print("Short code, probably a repeat, try again")
def _cbf(self, gpio: int, level: int, tick: int):
if level != pigpio.TIMEOUT:
edge = pigpio.tickDiff(self.last_tick, tick)
self.last_tick = tick
if self.fetching_code:
# Start of a code.
if (edge > self.PRE_US) and (not self.in_code):
self.in_code = True
# Start watchdog.
self.pi.set_watchdog(self.input_gpio, self.POST_MS)
elif (edge > self.POST_US) and self.in_code: # End of a code.
self.in_code = False
# Cancel watchdog.
self.pi.set_watchdog(self.input_gpio, 0)
self._end_of_code()
elif self.in_code:
self.code.append(edge)
else:
# Cancel watchdog.
self.pi.set_watchdog(self.input_gpio, 0)
if self.in_code:
self.in_code = False
self._end_of_code()
def record(self, gpio: int, file_name: str, code_name: str, confirm: bool = False):
"""赤外線を記録する
Args:
gpio (int): 使用するGPIO番号
file_name (str): 記録するファイル名
code_name (str): 記録するコード名
CONFIRM (bool, optional): 確認するか否か. Defaults to False.
"""
self.input_gpio = gpio
self.pi = pigpio.pi() # Connect to Pi.
records: dict[str, list[float]] = {}
if not self.pi.connected:
exit(0)
try:
f = open(file_name, "r")
records = json.load(f)
f.close()
except Exception:
pass
# IR RX connected to this GPIO.
self.pi.set_mode(self.input_gpio, pigpio.INPUT)
# Ignore glitches.
self.pi.set_glitch_filter(self.input_gpio, self.GLITCH)
self.pi.callback(self.input_gpio, pigpio.EITHER_EDGE, self._cbf)
# Process each id
logger.info("Recording")
print("Recording")
print(f"Press key for '{code_name}'")
self.code = []
self.fetching_code = True
while self.fetching_code:
time.sleep(0.1)
print("Okay")
time.sleep(0.5)
if confirm:
press_1: list[float] = self.code[:]
done = False
tries = 0
while not done:
print(f"Press key for '{code_name}' to confirm")
self.code = []
self.fetching_code = True
while self.fetching_code:
time.sleep(0.1)
press_2: list[float] = self.code[:]
ave_p = self._compare(press_1, press_2)
if ave_p:
done = True
records[code_name] = ave_p[:]
print("Okay")
time.sleep(0.5)
else:
tries += 1
if tries <= 3:
print("No match")
else:
print(f"Giving up on key '{code_name}'")
done = True
time.sleep(0.5)
else: # No confirm.
records[code_name] = self.code[:]
# Cancel glitch filter.
self.pi.set_glitch_filter(self.input_gpio, 0)
# Cancel watchdog.
self.pi.set_watchdog(self.input_gpio, 0)
self._tidy(records)
self._backup(file_name)
f = open(file_name, "w")
f.write(json.dumps(records, sort_keys=True).replace("],", "],\n") + "\n")
f.close()
self.pi.stop() # Disconnect from Pi.
def _carrier(self, gpio: int, frequency: float, micros: int):
"""Generate carrier square wave.
Args:
gpio (int): [description]
frequency (float): [description]
micros (int): [description]
Returns:
[type]: [description]
"""
wf: list[pigpio.pulse] = []
cycle = 1000.0 / frequency
cycles = int(round(micros / cycle))
on = int(round(cycle / 2.0))
sofar = 0
for c in range(cycles):
target = int(round((c + 1) * cycle))
sofar += on
off = target - sofar
sofar += off
wf.append(pigpio.pulse(1 << gpio, 0, on))
wf.append(pigpio.pulse(0, 1 << gpio, off))
return wf
def send(self, gpio: int, file_name: str, code_name: str):
"""赤外線を送信
Args:
gpio (int):使用するGPIO番号
file_name (str): 使用するファイル名
code_name (str): 使用するコード名
Raises:
Exception: _description_
Exception: _description_
"""
pi = pigpio.pi() # Connect to Pi.
if not pi.connected:
logger.error("pi not connected")
raise Exception("")
with open(file_name, "r") as f:
records = json.load(f)
# IR TX connected to this GPIO.
pi.set_mode(gpio, pigpio.OUTPUT)
pi.wave_add_new()
emit_time = time.time()
logger.debug("Playing")
if code_name not in records:
logger.error(f"Id {code_name} not found")
raise Exception("")
code: list[int] = records[code_name]
# Create wave
marks_wid: dict[int, int] = {}
spaces_wid: dict[int, int] = {}
wave = [0] * len(code)
for i in range(0, len(code)):
ci = code[i]
if i & 1: # Space
if ci not in spaces_wid:
pi.wave_add_generic([pigpio.pulse(0, 0, ci)])
spaces_wid[ci] = pi.wave_create() # type: ignore
wave[i] = spaces_wid[ci]
else: # Mark
if ci not in marks_wid:
wf = self._carrier(gpio, self.FREQ, ci)
pi.wave_add_generic(wf)
marks_wid[ci] = pi.wave_create() # type: ignore
wave[i] = marks_wid[ci]
delay = emit_time - time.time()
if delay > 0.0:
time.sleep(delay)
pi.wave_chain(wave)
logger.debug(f"key {code_name}")
while pi.wave_tx_busy():
time.sleep(0.002)
emit_time = time.time() + self.GAP_S
for i in marks_wid:
pi.wave_delete(marks_wid[i])
marks_wid = {}
for i in spaces_wid:
pi.wave_delete(spaces_wid[i])
spaces_wid = {}
pi.stop() # Disconnect from Pi.