12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Amazon Echo(Alexa) + Raspberry Pi を使ってスマートホームシステムを作ろう

Last updated at Posted at 2023-01-28

はじめに

 Amazon Echoを買ってみたら Alexaがなかなか良いので、上手く活用したいなと思いました。
「やっぱ家電とか操作したいよな。スマートホームってやつ」「ただ対応している家電をわざわざ買うのもなぁ」ということから辿り着いたのがこちらの記事です。

格安スマートリモコンの作り方
https://qiita.com/takjg/items/e6b8af53421be54b62c9

材料費400円の赤外線送受信器を、1,800円のラズパイZeroに載せて、Google Homeから操作する方法を、ゼロから丁寧に解説します。

赤外線を発信する回路
こちらのサイトに、とても丁寧に解説されています → 赤外線LEDドライブ回路の決定版

 素晴らしい記事です。このスマートホームシステムを構築中、何回読んだか数え切れないぐらいです。
こちらの記事のはGoogle Homeですから、それをAlexaに置き換えたらいいだけ。何て有り難い。

 そんなわけで Amazon Echo(Alexa) + Raspberry Piでスマートホームシステム作成していきましょう。

概要

本記事の技術的な内容は次の通りです。

  • Raspberry Piから赤外線を送信して家電(ライト)を操作する機能を実装します。
  • 人感センサーを使って人を検知したらライトをONにして、一定時間いなかったらライトをOFFにします
  • 光センサーを利用して部屋が明るいか暗いか(ライトがついているかいないか)を検知して余計な赤外線送信をしないようにします。
  • Alexa からの命令をnode-red-contrib-amazon-echoを使ってRaspberry Piが受け取るようにします。
  • 命令を一旦 クラウド上のデータベースに保存してデバイス間通信をします。今回は Firebase Realtime Database を使用します。
  • プログラム言語は Pythonを使用します。Python 3.9.2で動作確認しています。Python3.8以下だと動きませんが、それは主に型ヒント(listなど)で動かないだけなので(from typing import Listなど)適宜修正してもらえれば動くはずです

 今回使用したラズパイは 開発用として "Raspberry Pi 4 Model B"、本番用として"Raspberry Pi Zero WH"を使用しました。OSはRaspberry Pi OS 2022-09-22リリースのを使用しています。
 ※1 ラズパイZeroでは VSCodeのRemote Developmentが動かないので開発しづらいのです。
 ※2 本記事執筆時点ではRaspberry Pi はどこも売り切れ、もしくはとってもお高い。2023年は回復するといいですね。参考:ラズパイの2023年はサプライチェーン回復の年

仕様

 さて、一口にスマートホームシステムを作りたいといっても具体的にどういうのが良いでしょうか。
 音声でのライトのON・OFFは必須。あといちいち声で命令するのも面倒だから、人感センサー付けて部屋に入ったら自動でONにしてほしいですね。あと一定時間、人がいなかったらOFFするのもいい。デバイスはリビング部屋用や台所用とか複数用意したいです。

 そのため現状ざっくり仕様としてはこんな感じでしょうか。

  • Amazon Echo(Alexa)からの命令をラズパイで受信し、赤外線を送ってライトのON、OFFを行なう
  • 人感センサーを搭載して、人がいたら自動でライトON,一定期間居なかったらライトOFF
  • 光センサーを搭載して、明るかったらライトONをしない、暗かったらライトOFFしないようにして余計な赤外線送信をしないようにする
  • 複数デバイス想定
  • 今後色んな機能を追加するかもしれないので拡張性ありを想定

ハードウェア構築

必要なのは赤外線の送信回路。そして人感センサーと光センサーの回路です。

赤外線の送信回路は冒頭でも紹介したこちらの記事を参照してください。
格安スマートリモコンの作り方

人感センサーこと焦電型赤外線センサーは"EKMC1601111"を使用します。
https://akizukidenshi.com/catalog/g/gM-09750/

光センサーは、光可変抵抗器 "MI527"(Cdsセル5mmタイプ)を使用します。
https://akizukidenshi.com/catalog/g/gI-00110/

本来であれば、このようなアナログ信号は、アナログ・デジタル変換(MCP3208とかMCP3202とか)して SPI(Serial Peripheral Interface)で情報をやり取りするそうです。

 ……正直面倒くさいなぁと思いました。配線が増えるし、ラズパイゼロのユニバーサル基板のスペースが足りるでしょうか。そこまで正確な値がほしいわけでもないし、もしかしたら今後SPIは別の用途に使うかもしれません。
そこで、gpiozeroを使うことにしました。

 gpiozeroをインストールする必要がありますが便利です。

sudo pip install gpiozero

gpiozeroライブラリ入門⑲import MotionSensor
gpiozeroライブラリ入門⑳import LightSensor (LDR)

 これなら人感センサーも直接配線するだけでOK。光センサーもコンデンサと組合わせるだけ。楽!
コンデンサはこちらを使用しました。

フィルムコンデンサー 0.1μ F50V
https://akizukidenshi.com/catalog/g/gP-05332/

ラズパイのセットアップとハードウェア操作

ラズパイの初期セットアップはこちらの記事が参考になると思います。

Raspberry Pi はじめての初期セットアップ(2022/04/04版)
https://www.ingenious.jp/articles/howto/raspberry-pi-howto/raspberry-pi-basic-setup/

赤外線送受信が出来ることを確認

格安スマートリモコンの作り方:ラズパイで回路を制御の項目を参考に設定してください。

格安スマートリモコンの作り方:ラズパイで回路を制御

上記記事と2023年1月現在との違いは。今回使用したOSのバージョンには pigpioが既にインストールされているので、こちらのインストール作業は必要無いことぐらいです。

sudo apt install pigpio

 この時点で赤外線受信機能を使って、ライトの赤外線信号を記録したJSONファイルを作成しておきます。
ファイル名は light.json とします。
image.png

人感センサーと光センサーの値が取れることを確認

gpiozeroライブラリ入門⑲import MotionSensor のサンプルコードが動くことを確認します。

from gpiozero import MotionSensor

pir = MotionSensor(4)
pir.wait_for_motion()
print("Motion detected!")

gpiozeroライブラリ入門⑳import LightSensor (LDR) のサンプルコードが動くことを確認します。

from gpiozero import LightSensor

ldr = LightSensor(4)
ldr.wait_for_light()
print("Light detected!")
# 指でCdSをふさいで、実行します。指を離すと、Light detected!と表示が出て終了しました。

Amazon Echo(Alexa) と Raspberry Piの連携

 Amazon Echo(Alexa)からの命令をRaspberry Piが受信する必要があります。
 代表的なものに Alexa Home Skill Bridgeを使う方法とnode-red-contrib-amazon-echoを使う方法の2つがあるようですが、前者は上手くいったものの後者は上手くいきませんでした。そこで 上手くいったAlexa Home Skill Bridge を使って連携します。

Alexa Home Skill Bridgeのセットアップ

こちらの記事が参考になりました。
Amazon Echoとラズパイで、音声で照明をon/offする

手順としては次の通りです。

  1. Ben-Hardill-Node-RED を有効にする
    https://www.amazon.co.jp/Ben-Hardill-Node-RED/dp/B01N0D97FZ
  2. Node-RED Alexa Home Skill Bridge にてデバイス登録を行なう
    https://alexa-node-red.bm.hardill.me.uk/devices
  3. スマートホーム -> デバイスから、登録した新しいデバイス検出する
    https://alexa.amazon.co.jp/spa/index.html#cards
  4. ラズパイに Node-Red をインストール・アップデートする
    https://nodered.jp/docs/getting-started/raspberrypi
  5. node-red-contrib-alexa-home-skill を Node-Red にインストールする
    Palette→Install タブを選択し、検索欄に node-red-contrib-alexa-home-skill を入力し install

 Alexa Home Skill Bridgeにて、「部屋の電気」や「台所の電気」といったデバイスを設定します。ついでに外出先からパソコンの電源をONにもしたかった( Wake on LAN )ので、パソコンの電源というデバイスも追加しておきました。
image.png

それではまず部屋担当ラズパイの設定をしてみます。Node-Redの設定はこんな感じです。

image.png

EXEしているシェルはこちら。

on.sh
#!/bin/sh
cd `dirname $0`
python3 ir_send.py normal_on
off.sh
#!/bin/sh
cd `dirname $0`
python3 ir_send.py normal_on

Pythonコードは下記の通りです。
3行目にある from ir import IR である IR は、こちらのコードを使いやすくクラス化したものです。長いので本記事の最後に添付しておきます。

ir_send.py
import sys
import os
from ir import IR

current_path = os.path.dirname(os.path.abspath(__file__))
light_code_file_path = f"{current_path}/light.json"

value = str(sys.argv[1])

ir = IR()
ir.send(17, light_code_file_path, value)

 これによって、Amazon Echoから「アレクサ、部屋の電気を消して」「アレクサ、部屋の電気をつけて」が可能になりました。あとは「アレクサ、台所の電気をつけて」も出来るように、台所担当ラズパイに、これと同じ設定をすればいいだけです。

(メモ)node-red-contrib-amazon-echoが上手くいかない現状について

 できれば完全にローカルで完結する node-red-contrib-amazon-echo を使いたかったのですが、Amazon Echoがデバイスを検出してくれないので見送りです。
 送信側であるAmazon Echoに問題があるのか、受け手側であるRaspberry Piに問題があるのかまだ分かっていません。受け手側は、たとえば下記記事にあるようにポートフォワードをする必要があったりと少々複雑です。

【RaspberryPi】Amazon Echoを使った音声認識でNode-REDの処理を行う

 ただAmazon Echo側に問題ありそうだという印象を受けています。Amazon Echoは 自身のネットワークの80番ポートを検索してデバイスを検出する仕様とのことなので、試しに80番ポートを開けたデバイスを立ててパケットキャプチャしてみたのですが、何のパケットも来ませんでした。また受信側へのアクセスをしてみたところ80番ポートが空いているのが確認できてポートフォワード自体は上手くいっているみたいで受け手側の問題ではなさそうなのですよね。Amazon Echo側の通信ログが見たいところですが、あいにくと見方が分かりません……

上手くいってる方の解説記事。

問題発生による構成変更 と Firebase Realtime Databaseの導入

 さて、同様に台所用ラズパイをセットアップしたところ問題が発生しました。Alexa Home Skill Bridge では同じアカウントを複数のデバイスで使うことが出来ないようです。設定すると、部屋担当と台所担当が お互いがそれぞれ connected → disconnect を繰り返す状態になります。デバイスごとにアカウントを作成すればできそうですが、それは煩雑なのでやりたくありません。

 そこでAlexa Home Skill Bridgeと繋がる専用のラズパイ、マスターラズパイを作り、マスターが部屋用ラズパイと台所用ラズパイに命令を出すような構成にすることにしました。
 この構成の方が、Amazon Echoがデバイスを直接操作するよりも、一つの命令で複数のデバイスを操作するといったことも可能になり拡張性が上がります。
 今回はその通信用として、Firebase Realtime Databaseを使うことにしました。

  • Amazon Echo → Alexa Home Skill Bridge → Masterラズパイ→Firebase Realtime Databaseに命令を書き込み
  • 部屋用と台所用ラズパイはFirebase Realtime Databaseに命令が書き込まれたら実行

(注釈)クラウドを使うのではなく、マスターラズパイにMongoDBを構築するといったローカルで直接やり取るする方法も検討したのですが、構築が面倒だったことと Alexaのカスタムスキルを使って直接書き込みたいという構想もあったので止めました。
 もっともクラウドでもFirebase採用だったのは「Firebaseは最近話題になるからどっかで使ってみたい」といったモチベーションの方が大きかったです。
 現状、Realtime Databaseはアメリカ、ベルギー、シンガポールのリージョンのみで日本がありません。通信ラグが発生しそうです。そのためAWSとかAzureのデータベースを使ってもいいでしょう。特にAWSはAmazon Echoと相性も良さそうです。

Firebase Realtime Databaseのセットアップ

 任意の名前のプロジェクトを作成して、そこに Realtime Database を追加するだけです。

 こちらの記事が参考になります。Firebase Realtime Databaseを使用しデータベース作成しデータを登録する

 ただし上記記事では「テストモードで開始する」にしていますが、ロックモードのままにしておいてください。
また記事中の『ウェブアプリにFirebaseを追加する』以降は実行する必要はありません。
image.png

詳しくはFirebaseRealtimeデータベースのドキュメントを確認してください。
https://firebase.google.com/docs/database?hl=ja

今回試験的に作成したデータベースのURLはこちらになります。
https://smarthomesystem-2c112-default-rtdb.firebaseio.com

作成したプロジェクトの設定 -> サービスアカウント にて、Firebase Admin SDKで使用する秘密鍵を生成します。
今回は Pythonを使っているので、Admin SDK 構成スニペットを確認しておきましょう。
ここで、「新しい秘密鍵を生成」をクリックしてキーを生成します。

image.png

新しい秘密鍵を生成します。
image.png

 こんなJSONファイルがダウンロードされます。ファイル名は長いので、firebase-adminsdk-key.json とかにしておきましょうか。
image.png

ラズパイからFirebase Realtime Databaseの読み書き

 Pythonで使えるよう firebase-adminをインストールします。

pip install --user firebase-admin

32ビットOSの場合の対応

 今回試してみた限り64ビットOSではfirebase-adminのインストールは問題ありませんでしたが、32ビットOSでは firebase-admin のインストールに失敗する事象を確認しています。
 ラズパイゼロなど32ビットOSで実施する場合は、予め次のコマンドで足りないパッケージをインストールしておきましょう。

sudo apt-get install libssl-dev
curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

 これが必要な理由は、エラーメッセージが can't find Rust compiler というRustのコンパイラが必要だというものだったり、fatal error: openssl/opensslv.h: そのようなファイルやディレクトリはありません というような open-ssl が足りないといったものだったからです。

(参考):Raspberry Pi に Rust をインストールする

 Rustのインストール後は、メッセージに表示されるように一度ログアウトしておいた方がいいかもしれません。

To get started you may need to restart your current shell.
This would reload your PATH environment variable to include
Cargo's bin directory ($HOME/.cargo/bin).

To configure your current shell, run:
source "$HOME/.cargo/env"

 firebase-adminインストール終了時に次の警告が出たらパスが通っていないということなので、メッセージの通りパスを加えてください。

WARNING: The scripts pyrsa-decrypt, pyrsa-encrypt, pyrsa-keygen, pyrsa-priv2pub, pyrsa-sign and pyrsa-verify are installed in '/home/user/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.
WARNING: The script doesitcache is installed in '/home/user/.local/bin' which is not on PATH.
Consider adding this directory to PATH or, if you prefer to suppress this warning, use --no-warn-script-location.

環境変数にパスを追加:

export PATH=/home/ユーザー名/.local/bin:$PATH

 またこれは主にラズパイゼロにインストールしていた時の話ですが、firebase-adminをインストールしている最中に強制終了してしまうことがありました。おそらくはメモリが足りない所為です。

Collecting protobuf!=3.20.0,!=3.20.1,!=4.21.0,!=4.21.1,!=4.21.2,!=4.21.3,!=4.21.4,!=4.21.5,<5.0.0dev,>=3.19.5
  Using cached https://www.piwheels.org/simple/protobuf/protobuf-4.21.12-py2.py3-none-any.whl (165 kB)
Collecting grpcio<2.0dev,>=1.33.2
強制終了

その場合、不必要なサービスを一時停止して再度インストールを試してください。

firebase-adminをインストールすると pipが壊れる?

 正確な事象再現までには至っていないため無関係かもしれませんが、firebase-admin をインストールをしたぐらいのタイミングで pip が正常に動かなくなってしまう事象を確認しています。
 その場合、python3-pip を一旦アンインストールして手動でのインストールで解決しました。

sudo apt remove python3-pip
wget https://bootstrap.pypa.io/get-pip.py
sudo python3 get-pip.py

再起動してインストール。

pip3 install pyopenssl --upgrade

参考情報: https://stackoverflow.com/questions/73830524/attributeerror-module-lib-has-no-attribute-x509-v-flag-cb-issuer-check

Pythonでデータベースに読み書き

 firebase-adminのインストールが終了したらPythonで読み書きできるのか確認してみます。
 第一引数に場所(Room or Kitchen)を、第二引数に内容(ON or OFF)を設定できるPythonコードが以下になります。

 秘密鍵ファイルの「firebase-adminsdk-key.json」をPythonコードファイルと同じ場所に置いておきます。
 プログラム中の databaseURLは、今回使用する https://smarthomesystem-2c112-default-rtdb.firebaseio.com を設定していますが、ここはそれぞれのデータベースのURLを設定してください。

set2firebase.py
import os
import sys
import firebase_admin
from firebase_admin import credentials, db

current_path = os.path.dirname(os.path.abspath(__file__))
place = str(sys.argv[1])
value = str(sys.argv[2])
cred = credentials.Certificate(f"{current_path}/firebase-adminsdk-key.json")
default_app = firebase_admin.initialize_app(
    cred, {"databaseURL": "https://smarthomesystem-2c112-default-rtdb.firebaseio.com"}
)
orders_ref = db.reference("/orders")
led_ref = orders_ref.child(place).child("LED")
led_ref.set(value)

 これを実行すると、データベースに書き込まれるところが確認できます。

python3 set2firebase.py Kitchen OFF

image.png

読み取るコードはこちらです。ほとんど同じで.set(value)の代わりに.get()を使うだけで取得できます。

readfirebase.py
import os
import sys
import firebase_admin
from firebase_admin import credentials, db

current_path = os.path.dirname(os.path.abspath(__file__))
cred = credentials.Certificate(f"{current_path}/firebase-adminsdk-key.json")
default_app = firebase_admin.initialize_app(
    cred, {"databaseURL": "https://smarthomesystem-2c112-default-rtdb.firebaseio.com"}
)
orders_ref = db.reference("/orders")
led_ref = orders_ref.child("Kitchen").child("LED")
value: str = led_ref.get()
print(value)

ハードとソフトの組立

 ここまででようやく各部品が揃いましたので、ここでそれぞれの担当を整理しておきます。

  • Amazon Echo:Node-REDをインストールして、登録してあるデバイスを認識する
    https://www.amazon.co.jp/Ben-Hardill-Node-RED/dp/B01N0D97FZ
  • マスターラズパイ: Alexa Home Skill Bridgeで命令を受信。Firebase Realtime Databaseに書き込みを行なう
  • 部屋ラズパイと台所ラズパイ:Firebase Realtime Databaseの命令を受信して、指定された赤外線送信を行なう。またセンサーの情報を取得して赤外線の送信を行なう。

 まずは部屋ラズパイのハードウェアをブレッドボードを使って構築します。スマートリモコンの回路に人感センサーと光センサーを付け加えた図です。
 それぞれ使用しているピンは次の通りです。テスト時にそのピンを使用したのをそのまま流用しただけなので深い意味はありません。適宜自身の環境にあったものを採用してください。

  • MotionSensor = GPIO 21
  • LightSensor = GPIO 4
  • 赤外線送信 = GPIO 17
  • 他 5Vや3.3V、GNDの配線

image.png

 テストして上手くいったら台所ラズパイのハードも作ります。

マスターラズパイ側の設定

前述の set2firebase.py をそのまま使用します。

Amazon Echoからのオーダーを受信する Node-Redの全体の設定は次のような形にします。

image.png

叩くシェルファイルはそれぞれ次の通りです。

room_on.sh
#!/bin/sh
cd `dirname $0`
python3 set2firebase.py Room ON
room_off.sh
#!/bin/sh
cd `dirname $0`
python3 set2firebase.py Room OFF
kitchen_on.sh
#!/bin/sh
cd `dirname $0`
python3 set2firebase.py Kitchen ON
kitchen_off.sh
#!/bin/sh
cd `dirname $0`
python3 set2firebase.py Kitchen OFF

部屋(台所)のラズパイ用のPythonコード

常にループさせてデータベースを監視するようにします。命令があったら赤外線を送信して、命令のデータを消去します。
このコード自体は部屋ラズパイ用のコードですが、台所ラズパイの場合、8行目の MY_PLACE = "Room"MY_PLACE = "Kitchen"に置き換えます。ハードコーディングです。
log_folder も保存したいログのパスを設定してください。

ir_from_firebase.py
import logging
import os

import firebase_admin
from firebase_admin import credentials, db
from ir import IR

MY_PLACE = "Room"
IR_GPIO_NUMBER = 17
log_folder = "/home/user/log"

current_path = os.path.dirname(os.path.abspath(__file__))

# ログの設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.FileHandler(f"{log_folder}/ir_firebase.log")
logger.addHandler(handler)
fmt = logging.Formatter("%(asctime)s %(message)s")
handler.setFormatter(fmt)

cred = credentials.Certificate(f"{current_path}/firebase-adminsdk-key.json")
database_url = "https://smarthomesystem-2c112-default-rtdb.firebaseio.com"
default_app = firebase_admin.initialize_app(cred, {"databaseURL": database_url})
light_code_file_path = f"{current_path}/light.json"

ir = IR()

try:
    while True:
        orders_ref = db.reference("/orders")
        led_ref = orders_ref.child(MY_PLACE).child("LED")
        led_order: str = led_ref.get()
        if str(led_order) == "OFF":
            ir.send(IR_GPIO_NUMBER, light_code_file_path, "off")
            led_ref.set("")
            logger.info("LED:OFF")
        if str(led_order) == "ON":
            ir.send(IR_GPIO_NUMBER, light_code_file_path, "normal_on")
            led_ref.set("")
            logger.info("LED:ON")

except KeyboardInterrupt:  # CTRL+C
    print("Bye.")

次はセンサーに反応があったらライトをONにするプログラムです。また15分間反応がなかったらOFFにします。
こちらの log_folder も保存したいログのパスを設定してください。

ir_from_sensor.py
import logging
import os
import time
from datetime import datetime, timedelta
from gpiozero import LightSensor, MotionSensor
from ir import IR

log_folder = "/home/user/log"
IR_GPIO_NUMBER = 17
MOTION_GPIO_NUMBER = 21
LIGHT_GPIO_NUMBER = 4
wait_time_min = 15

current_path = os.path.dirname(os.path.abspath(__file__))

# ログの設定
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
handler = logging.FileHandler(f"{log_folder}/ir_sensor.log")
logger.addHandler(handler)
fmt = logging.Formatter("%(asctime)s %(message)s")
handler.setFormatter(fmt)

light_code_file_path = f"{current_path}/light.json"

motion_sensor = MotionSensor(MOTION_GPIO_NUMBER)
ldr = LightSensor(LIGHT_GPIO_NUMBER, threshold=0.1)

ir = IR()
lighton_datetime = datetime.now() + timedelta(minutes=wait_time_min)

try:
    while True:
        current_datetime = datetime.now()
        if motion_sensor.motion_detected is True:
            if ldr.light_detected is False:
                ir.send(IR_GPIO_NUMBER, light_code_file_path, "normal_on")
                logger.info("LED:ON")
                lighton_datetime = datetime.now() + timedelta(minutes=wait_time_min)
        if lighton_datetime < current_datetime:
            if ldr.light_detected is True:
                ir.send(IR_GPIO_NUMBER, light_code_file_path, "off")
                logger.info("LED:OFF")
        time.sleep(1)  # 1秒間待つ
except KeyboardInterrupt:  # CTRL+C
    print("Bye.")

(注釈)ここでir_from_sensor.pyにてセンサーに反応があったらそのまま赤外線送信していますが、一旦データベースに赤外線送信の命令を格納して、赤外線の送信の役割をir_from_firebase.pyに一本化した方が良いかもしれません。複雑になってくると命令のバッティングが発生しそうです。
とりあえず現状はデータベース読み書きの時間が発生する時間が惜しいというスピード優先のこれでいきます。

Pythonコードの自動起動

 作ったPythonコードを再起動しても自動起動するようにします。
 こちらの記事が参考になります。

Raspberry Pi で systemd を使ってプログラムを自動実行する
https://qiita.com/molchiro/items/ee32a11b81fa1dc2fd8d

/opt/mybin フォルダを作り、そこにPythonコードや秘密鍵のJSONファイル、赤外線信号JSONファイルを保存します。

  • ir.py
  • ir_from_firebase.py
  • ir_from_sensor.py
  • firebase-adminsdk-key.json
  • light.json

サービス設定ファイルは次のようにしました。
私の環境のユーザーは user なので、User=user になっています。
ラズパイのデフォルトユーザーは pi なので、その場合は User=pi と設定してください。

ir_from_firebase.service
[Unit]
Description=IR from firebase
After=network-online.target
Wants=network-online.target

[Service]
User=user
ExecStart=/usr/bin/python3 /opt/mybin/ir_from_firebase.py
Restart=always
Type=simple
RestartSec=60s

[Install]
WantedBy=multi-user.target

サービスファイルについての説明

 このPythonコードは、クラウドのfirebaseを使用しているため、サービス起動はオンラインになるまで待つ必要があります。
 設定の After と Wants をnetwork-online.targetに設定することでオンラインになってからサービスを起動します。

After=network-online.target
Wants=network-online.target

参考:(詳しい話)Systemdのネットワーク関連ユニット解説(After=network.targetあたり)

 しかしながらこのAfterとWantsの設定だけでは解決しませんでした。どうしても次のようなエラーが発生してサービスが落ちてしまいます。

 1月 27 01:20:13 raspberrypi python3[571]:     request_succeeded, response_data, retryable_error = _perform_request()
 1月 27 01:20:13 raspberrypi python3[571]:   File "/home/user/.local/lib/python3.9/site-packages/google/oauth2/_client.py", line 180, in _perform_request
 1月 27 01:20:13 raspberrypi python3[571]:     response = request(
 1月 27 01:20:13 raspberrypi python3[571]:   File "/home/user/.local/lib/python3.9/site-packages/google/auth/transport/requests.py", line 199, in __call__
 1月 27 01:20:13 raspberrypi python3[571]:     six.raise_from(new_exc, caught_exc)
 1月 27 01:20:13 raspberrypi python3[571]:   File "<string>", line 3, in raise_from
 1月 27 01:20:13 raspberrypi python3[571]: google.auth.exceptions.TransportError: HTTPSConnectionPool(host='oauth2.googleapis.com', port=443): Max retries exceeded with url: 

 しばらく待ってrestartすれば上手く動くので、自動で再起動をさせようと Restart=alwaysの設定にしてみることにしました。しかしそれでは、start request repeated too quickly for ~が出てしまいます。(デフォルトで5回再起動を繰り返すと出現)
 そこで再起動の間隔を 60s に設定することで、強引に解決しています。(デフォルト値は100ms)

RestartSec=60s

(参考)オプション設定:RestartSec= systemd.service — Service unit configuration

 センサーから赤外線送信する方のサービスは次の通りです。こちらは必ずしもオンラインである必要がないので、After=network.target としてあります。RestartSecの設定も省略です。

[Unit]
Description=IR from sensor
After=network.target

[Service]
User=user
ExecStart=/usr/bin/python3 /opt/mybin/ir_from_sensor.py
Restart=always
Type=simple

[Install]
WantedBy=multi-user.target

まとめ

 Amazon Echo(Alexa) + Raspberry Pi を使ってスマートホームシステムの基本を構築してみました。
 現状は「アレクサ、部屋(台所)の電気つけて(消して)」といった音声での命令と、ライトOFFの状態で置いてあるラズパイの近くにいくとONになることと、15分間ずっと人がいないとOFFになるという機能が実装できました。
 (動きがないと人感センサーは反応しないので、ずっと座っているような部屋の方は人感センサーの機能は切った方がいいかも)
 これをさらに応用することでエアコンのスイッチを冷房、暖房、ドライにしたり、扇風機のON・OFFも可能になりそうです。
 またハード部分はブレッドボードの状態なので、これでしばらく運用してみて、問題なさそうならユニバーサル基板に半田付けしていきたいですね。

 ぶっちゃけ電子工作は素人もいいところなので何かありましたらお気軽にコメント頂けると嬉しいです。

次回予定

 音声で部屋と台所の電気をON、OFFできるようになりましたが、出掛ける時などは「Alexa いってきます」といった一つの命令で複数のデバイスを操作できるようにしたいです。
 現状だと「アレクサ、部屋の電気を消して」→Alexa「はい、分かりました」(部屋の電気が消える)、「アレクサ、台所の電気を消して」といったように、順番に命令をしなければなりませんが、そんなの面倒です。手でやった方が早いです。
 そこでAlexaのカスタムスキルを構築して、一つの命令でまとめてデバイスを操作することを実現したいと思います。

追記:記事を公開しました。Alexa のカスタムスキルから Firebase のデータベースに書き込む

他参考にした書籍とキット

 電子工作の基本を学習するために使用したものです。

以下Amazonリンク

使いやすくクラス化した赤外線送受信プログラム

途中で出てきた ir.pyのコードを記載しておきます。

こちらのプログラムは使いづらかったので、自分用にクラス化したものです。

ir.py
import json
import logging
import os
import time
from typing import Optional

import pigpio

logger = logging.getLogger(__name__)


class IR:
    """赤外線の送受信を担当するクラス
    Python3.9 以上
    元スクリプトを使いやすくクラス化したもの
    http://abyz.me.uk/rpi/pigpio/examples.html#Python_irrp_py

    Raises:
        Exception: _description_
        Exception: _description_

    Returns:
        _type_: _description_
    """

    GLITCH = 100
    SHORT = 10
    FREQ: float = 38.0

    def __init__(self) -> None:
        self.last_tick = 0
        self.in_code = False
        self.code: list[float] = []
        self.fetching_code = False

        PRE_MS = 200
        self.PRE_US = PRE_MS * 1000

        self.POST_MS = 15
        self.POST_US = self.POST_MS * 1000

        TOLERANCE = 15
        self.TOLER_MIN = (100 - TOLERANCE) / 100.0
        self.TOLER_MAX = (100 + TOLERANCE) / 100.0

        GAP_MS: int = 100
        self.GAP_S = GAP_MS / 1000.0

    def _backup(self, f: str):
        """f -> f.bak -> f.bak1 -> f.bak2

        Args:
            f (str): [description]
        """
        file_path = os.path.realpath(f)
        try:
            os.rename(f"{file_path}.bak1", f"{file_path}.bak2")
        except Exception:
            pass

        try:
            os.rename(f"{file_path}.bak", f"{file_path}.bak1")
        except Exception:
            pass

        try:
            os.rename(file_path, f"{file_path}.bak")
        except Exception:
            pass

    def _normalise(self, c: list[float]):
        """
        Typically a code will be made up of two or three distinct
        marks (carrier) and spaces (no carrier) of different lengths.

        Because of transmission and reception errors those pulses
        which should all be x micros long will have a variance around x.

        This function identifies the distinct pulses and takes the
        average of the lengths making up each distinct pulse.  Marks
        and spaces are processed separately.

        This makes the eventual generation of waves much more efficient.

        Input

        M    S   M   S   M   S   M    S   M    S   M
        9000 4500 600 540 620 560 590 1660 620 1690 615

        Distinct marks

        9000                average 9000
        600 620 590 620 615 average  609

        Distinct spaces

        4500                average 4500
        540 560             average  550
        1660 1690           average 1675

        Output

        M    S   M   S   M   S   M    S   M    S   M
        9000 4500 609 550 609 550 609 1675 609 1675 609
        """

        logger.debug("before normalise", c)

        entries = len(c)
        p = [0] * entries  # Set all entries not processed.
        for i in range(entries):
            if not p[i]:  # Not processed?
                v = c[i]
                tot = v
                similar = 1.0

                # Find all pulses with similar lengths to the start pulse.
                for j in range(i + 2, entries, 2):
                    if not p[j]:  # Unprocessed.
                        # Similar.
                        if (c[j] * self.TOLER_MIN) < v < (c[j] * self.TOLER_MAX):
                            tot = tot + c[j]
                            similar += 1.0

                # Calculate the average pulse length.
                newv = round(tot / similar, 2)
                c[i] = newv

                # Set all similar pulses to the average value.
                for j in range(i + 2, entries, 2):
                    if not p[j]:  # Unprocessed.
                        # Similar.
                        if (c[j] * self.TOLER_MIN) < v < (c[j] * self.TOLER_MAX):
                            c[j] = newv
                            p[j] = 1

        logger.debug("after normalise", c)

    def _compare(self, p1: list[float], p2: list[float]) -> Optional[list[float]]:
        """Check that both recodings correspond in pulse length to within
            TOLERANCE%.  If they do average the two recordings pulse lengths.
            Input

                M    S   M   S   M   S   M    S   M    S   M
            1: 9000 4500 600 560 600 560 600 1700 600 1700 600
            2: 9020 4570 590 550 590 550 590 1640 590 1640 590

            Output

            A: 9010 4535 595 555 595 555 595 1670 595 1670 595
        Args:
            p1 (List[float]): [description]
            p2 (List[float]): [description]

        Returns:
            Optional[List[int]]: [description]
        """
        if len(p1) != len(p2):
            return None

        for i in range(len(p1)):
            v = p1[i] / p2[i]
            if (v < self.TOLER_MIN) or (v > self.TOLER_MAX):
                return None

        ave_p: list[float] = []
        for i in range(len(p1)):
            ave_p.append(int(round((p1[i] + p2[i]) / 2.0)))

        logger.debug("after compare", p1)

        return ave_p

    def _tidy_mark_space(self, records: dict[str, list[float]], base: int):

        ms: dict[float, int] = {}

        # Find all the unique marks (base=0) or spaces (base=1)
        # and count the number of times they appear,

        for rec in records:
            rl = len(records[rec])
            for i in range(base, rl, 2):
                if records[rec][i] in ms:
                    ms[records[rec][i]] += 1
                else:
                    ms[records[rec][i]] = 1

        logger.debug("t_m_s A", ms)

        e: list[float] = []
        v = 0
        tot = 0
        similar = 0
        for plen in sorted(ms):

            # Now go through in order, shortest first, and collapse
            # pulses which are the same within a tolerance to the
            # same value.  The value is the weighted average of the
            # occurences.
            #
            # E.g. 500x20 550x30 600x30  1000x10 1100x10  1700x5 1750x5
            #
            # becomes 556(x80) 1050(x20) 1725(x10)
            #
            if len(e) == 0:
                e = [plen]
                v = plen
                tot = plen * ms[plen]
                similar = ms[plen]

            elif plen < (v * self.TOLER_MAX):
                e.append(plen)
                tot += plen * ms[plen]
                similar += ms[plen]

            else:
                v = int(round(tot / float(similar)))
                # set all previous to v
                for i in e:
                    ms[i] = v
                e = [plen]
                v = plen
                tot = plen * ms[plen]
                similar = ms[plen]

        v = int(round(tot / float(similar)))
        # set all previous to v
        for i in e:
            ms[i] = v

        logger.debug("t_m_s B", ms)

        for rec in records:
            rl = len(records[rec])
            for i in range(base, rl, 2):
                records[rec][i] = ms[records[rec][i]]

    def _tidy(self, records: dict[str, list[float]]):

        self._tidy_mark_space(records, 0)  # Marks.

        self._tidy_mark_space(records, 1)  # Spaces.

    def _end_of_code(self):
        if len(self.code) > self.SHORT:
            self._normalise(self.code)
            self.fetching_code = False
        else:
            self.code = []
            print("Short code, probably a repeat, try again")

    def _cbf(self, gpio: int, level: int, tick: int):

        if level != pigpio.TIMEOUT:

            edge = pigpio.tickDiff(self.last_tick, tick)
            self.last_tick = tick

            if self.fetching_code:

                # Start of a code.
                if (edge > self.PRE_US) and (not self.in_code):
                    self.in_code = True

                    # Start watchdog.
                    self.pi.set_watchdog(self.input_gpio, self.POST_MS)

                elif (edge > self.POST_US) and self.in_code:  # End of a code.
                    self.in_code = False

                    # Cancel watchdog.
                    self.pi.set_watchdog(self.input_gpio, 0)
                    self._end_of_code()

                elif self.in_code:
                    self.code.append(edge)

        else:
            # Cancel watchdog.
            self.pi.set_watchdog(self.input_gpio, 0)
            if self.in_code:
                self.in_code = False
                self._end_of_code()

    def record(self, gpio: int, file_name: str, code_name: str, confirm: bool = False):
        """赤外線を記録する

        Args:
            gpio (int): 使用するGPIO番号
            file_name (str): 記録するファイル名
            code_name (str): 記録するコード名
            CONFIRM (bool, optional): 確認するか否か. Defaults to False.
        """
        self.input_gpio = gpio

        self.pi = pigpio.pi()  # Connect to Pi.
        records: dict[str, list[float]] = {}
        if not self.pi.connected:
            exit(0)

        try:
            f = open(file_name, "r")
            records = json.load(f)
            f.close()
        except Exception:
            pass

        # IR RX connected to this GPIO.
        self.pi.set_mode(self.input_gpio, pigpio.INPUT)

        # Ignore glitches.
        self.pi.set_glitch_filter(self.input_gpio, self.GLITCH)

        self.pi.callback(self.input_gpio, pigpio.EITHER_EDGE, self._cbf)

        # Process each id
        logger.info("Recording")
        print("Recording")

        print(f"Press key for '{code_name}'")
        self.code = []
        self.fetching_code = True
        while self.fetching_code:
            time.sleep(0.1)
        print("Okay")
        time.sleep(0.5)
        if confirm:
            press_1: list[float] = self.code[:]
            done = False

            tries = 0
            while not done:
                print(f"Press key for '{code_name}' to confirm")
                self.code = []
                self.fetching_code = True
                while self.fetching_code:
                    time.sleep(0.1)
                press_2: list[float] = self.code[:]
                ave_p = self._compare(press_1, press_2)
                if ave_p:
                    done = True
                    records[code_name] = ave_p[:]
                    print("Okay")
                    time.sleep(0.5)
                else:
                    tries += 1
                    if tries <= 3:
                        print("No match")
                    else:
                        print(f"Giving up on key '{code_name}'")
                        done = True
                    time.sleep(0.5)
        else:  # No confirm.
            records[code_name] = self.code[:]

        # Cancel glitch filter.
        self.pi.set_glitch_filter(self.input_gpio, 0)

        # Cancel watchdog.
        self.pi.set_watchdog(self.input_gpio, 0)

        self._tidy(records)

        self._backup(file_name)

        f = open(file_name, "w")
        f.write(json.dumps(records, sort_keys=True).replace("],", "],\n") + "\n")
        f.close()

        self.pi.stop()  # Disconnect from Pi.

    def _carrier(self, gpio: int, frequency: float, micros: int):
        """Generate carrier square wave.

        Args:
            gpio (int): [description]
            frequency (float): [description]
            micros (int): [description]

        Returns:
            [type]: [description]
        """
        wf: list[pigpio.pulse] = []
        cycle = 1000.0 / frequency
        cycles = int(round(micros / cycle))
        on = int(round(cycle / 2.0))
        sofar = 0
        for c in range(cycles):
            target = int(round((c + 1) * cycle))
            sofar += on
            off = target - sofar
            sofar += off
            wf.append(pigpio.pulse(1 << gpio, 0, on))
            wf.append(pigpio.pulse(0, 1 << gpio, off))
        return wf

    def send(self, gpio: int, file_name: str, code_name: str):
        """赤外線を送信

        Args:
            gpio (int):使用するGPIO番号
            file_name (str): 使用するファイル名
            code_name (str): 使用するコード名

        Raises:
            Exception: _description_
            Exception: _description_
        """
        pi = pigpio.pi()  # Connect to Pi.

        if not pi.connected:
            logger.error("pi not connected")
            raise Exception("")

        with open(file_name, "r") as f:
            records = json.load(f)

        # IR TX connected to this GPIO.
        pi.set_mode(gpio, pigpio.OUTPUT)

        pi.wave_add_new()

        emit_time = time.time()

        logger.debug("Playing")

        if code_name not in records:
            logger.error(f"Id {code_name} not found")
            raise Exception("")

        code: list[int] = records[code_name]

        # Create wave
        marks_wid: dict[int, int] = {}
        spaces_wid: dict[int, int] = {}

        wave = [0] * len(code)

        for i in range(0, len(code)):
            ci = code[i]
            if i & 1:  # Space
                if ci not in spaces_wid:
                    pi.wave_add_generic([pigpio.pulse(0, 0, ci)])
                    spaces_wid[ci] = pi.wave_create()  # type: ignore
                wave[i] = spaces_wid[ci]
            else:  # Mark
                if ci not in marks_wid:
                    wf = self._carrier(gpio, self.FREQ, ci)
                    pi.wave_add_generic(wf)
                    marks_wid[ci] = pi.wave_create()  # type: ignore
                wave[i] = marks_wid[ci]

        delay = emit_time - time.time()

        if delay > 0.0:
            time.sleep(delay)

        pi.wave_chain(wave)

        logger.debug(f"key {code_name}")

        while pi.wave_tx_busy():
            time.sleep(0.002)

        emit_time = time.time() + self.GAP_S

        for i in marks_wid:
            pi.wave_delete(marks_wid[i])

        marks_wid = {}

        for i in spaces_wid:
            pi.wave_delete(spaces_wid[i])

        spaces_wid = {}

        pi.stop()  # Disconnect from Pi.
12
7
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
7

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?