はじめに
- UL Systems Advent Calendar 2019 の12/25の記事になります。
「スマートホーム」が益々定着していますね。
先日、**Amazon、Apple、Googleがスマートホーム機器接続の統一規格に向けて提携**し、更に注目を集めているようです。
自宅でも、Google Homeを中心に電灯、テレビ、エアコン、空気清浄機、鍵等、色々操作するようにしています。但し、Google Homeの欠点として、能動的に動作しない(話しかけないと始まらない)という点があります。
家に着いたら、存在を認識して、
・「テレビでこんな面白い番組やっていますよ、見ますか?」とか
・「お風呂を入れておきますね」とか
勝手に対話や行動を始めてくれたらいいなぁ、と思いました。
そこで、その入口となるスクリプトを紹介します。
ゴール
- 今回は、以下の3点を実装し、「帰宅時に人を認識してGoogle Homeに「おかえり」と言わせる」ことをゴールとします。
- 家に帰ってきたことをSesami解錠にて感知する(画像処理量軽減も兼ねて)
- 家に入った人をAWS Rekognitionにて認証させる(セキュリティ対策も兼ねて)
- 認証結果が正しければ、Google Homeに話しかけさせる
前提・準備
-
物理的なものとして、以下が無いと始められません。
- Sesami
- Google Home
- カメラ付きのPC
※将来的なことを考えて、私はRaspberryPiとそれに繋げられるカメラ(「5MP OV5647」という製品を使用)を用いて環境を整えました。
-
Pythonで実装します。
- AWS、Google Home、カメラ、画像処理、音声処理のライブラリが充実しているので採用しています。
-
各種APIを使いたいため、以下のアカウントを準備してください。
実装:1. 家に帰ってきたことをSesami解錠にて感知する
1-1. CANDY HOUSEにアクセスして、API Keyを取得します。
1-2. 念の為、API Keyで正常に動くかを検証します。
curl -H "Authorization: [Your API Key]" https://api.candyhouse.co/public/sesames
1-3. 無事動くことを検証できたら、「解錠を検知するスレッドクラス」を作成します。
sesami.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,json,logging,requests,threading,time
from datetime import datetime
logger = logging.getLogger()
# Sesami API情報
API_URL = 'https://api.candyhouse.co/public'
API_KEY = '[Your API Key]'
# 解錠を検知する周期(秒)
WAITTIME = 5
# 解錠を検知する時間(開始、終了)
ACTIVETIMES = [
('0000','0200'),
('1800','2400'), # 夕方18時から深夜2時まで検知
]
# Sesamiの解錠を検知するスレッドクラス
class SesamiEventHandler(threading.Thread) :
devices = None
# Sesami解錠状態を検知するクラスメソッド
@classmethod
def get_devices(cls) :
# 認証しているSesami情報一覧を取得
if cls.devices is None :
res = requests.get(
'%s/sesames' % (API_URL),
headers={
'Authorization' : API_KEY
}
)
logger.debug('got sesamis : %s' % (res.text))
cls.devices = json.loads(res.text)
# Sesami解錠状態を検知するクラスメソッド
@classmethod
def is_open(cls) :
cls.get_devices()
# 認証しているSesamiの中で解錠状態になっているものがあればTrueを返す
# 一つもなければ、Falseを返す
for v in cls.devices :
device_id = v.get('device_id')
res = requests.get(
'%s/sesame/%s' % (API_URL,device_id),
headers={
'Authorization' : API_KEY
}
)
logger.debug('got sesami condition(%s) : %s' % (device_id,res.text))
if json.loads(res.text).get('locked',True) == False : return True
return False
# Sesamiにコマンドを送信するクラスメソッド
@classmethod
def send_command(cls,command) :
cls.get_devices()
# 認証しているSesamiにコマンドを送信する
for v in cls.devices :
device_id = v.get('device_id')
res = requests.post(
'%s/sesame/%s' % (API_URL,device_id),
json.dumps({
'command' : command
}),
headers={
'Authorization' : API_KEY,
'Content-Type' : 'application/json'
}
)
logger.debug('send sesami command (%s, %s) : %s' % (device_id,command,res.text))
# Sesami施錠メソッド
@classmethod
def lock(cls) :
cls.send_command('lock')
# Sesami解錠メソッド
@classmethod
def unlock(cls) :
cls.send_command('unlock')
# Sesami状態の強制取得メソッド
@classmethod
def sync(cls) :
cls.send_command('sync')
# コンストラクタ
def __init__(self,listener=None) :
threading.Thread.__init__(self)
self.enable = True
self.listeners = []
self.waittime = WAITTIME
self.actives = ACTIVETIMES
self.add_listener(listener)
# スレッドの停止メソッド
def stop(self) :
self.enable = False
# 検知リスナーの追加メソッド
def add_listener(self,listener) :
if listener is not None :
self.listeners.append(listener)
# スレッド処理メソッド
def run(self) :
# 有効な間は繰り返し処理
while self.enable :
# 現在時間が有効時間内で、且つ解錠状態であれば
# 検知リスナーを順次実行する
logger.debug('checking sesami...')
t = datetime.now().strftime('%H%M')
for start,end in self.actives :
if start <= t and t <= end and self.is_open() :
for lsn in self.listeners : lsn()
time.sleep(self.waittime)
```
テストコードはこんな感じで。鍵を開けたときに「Opened!」と出ることを確認。
````python:test_sesami.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys,time
from sesami import SesamiEventHandler
try :
# Sesamiハンドラスレッドの生成、検知開始
th = SesamiEventHandler(lambda : print('Opened!'))
th.start()
time.sleep(10)
SesamiEventHandler.unlock()
time.sleep(10)
SesamiEventHandler.lock()
time.sleep(10)
finally :
# 終わったら、Sesamiハンドラスレッドの停止して終了
th.stop()
th.join()
sys.exit(1)
```
# 実装:2. 家に入った人をAWS Rekognitionにて認証させる
2-1. [AWS IAM](https://console.aws.amazon.com/iam/home)にて、API利用のためのユーザーを用意します。
![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/122711/461fd117-0432-3719-1fcf-f90204f4c4b9.png)
2-2. 用意したユーザーに、「AmazonRekognitionFullAccess」のアクセス権限を付与します。
![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/122711/ed5e3fae-4c13-16d7-3d53-b4e697cce7e1.png)
2-3. 更にユーザーの、アクセスキーを作成します。
![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/122711/465cbbf0-af76-2345-4615-428b6fa3d134.png)
2-4. 用意したアクセスキーを使えるよう、ローカル側に認証ファイルを保存しておきます。
````ini:~/.aws/config
[default]
output = json
region = [Your Region]
```
```ini:~/.aws/credentials
[default]
aws_access_key_id = [AWS Access Key]
aws_secret_access_key = [AWS Secret Access Key]
```
2-5. 顔認証をする「AWS Rekognitionクライアント」を作成します。
```python:rekognition.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import os,json,codecs,logging,boto3,cv2
logger = logging.getLogger()
WORKDIR = os.path.dirname(os.path.abspath(__file__))
# AWSセッティング
COLLECTION_ID = 'MyFaces' # 顔コレクションID
THRESHOLD = 97.5 # 顔の一致具合(97.5%以下は対象としない)
MAX_FACES = 3 # 一度に分析する最大人数(3名まで)
FACEIDSDUMP = os.path.join(WORKDIR ,'faceids.json') # 顔認証IDの保管ファイル名
# AWS Rekognitionサービスクライアント
class AWSRekognitionClient(object) :
camera = None
# カメラ操作オブジェクトの生成
@classmethod
def open_camera(cls) :
try : cls.camera = cv2.VideoCapture(0)
except : pass
# カメラ操作オブジェクトの開放
@classmethod
def close_camera(cls) :
try : cls.camera.release()
except : pass
@classmethod
def read_image(cls,path) :
# 画像があれば、データを取得
# 画像がなければ、カメラから撮影
try :
with open(path,'rb') as fp : buf = fp.read()
except :
_, buf = cls.camera.read()
_, buf = cv2.imencode('.png',buf)
buf = buf.tobytes()
return buf
# コンストラクタ
def __init__(self) :
self.open_camera()
self.client = boto3.client('rekognition')
self.faceids = []
# 顔認証IDの保管ファイルが既にあれば、初回に読み込み
if os.path.exists(FACEIDSDUMP) :
try :
with codecs.open(FACEIDSDUMP,'rb','utf-8') as fp :
self.faceids = json.load(fp)
except : pass
# コレクションIDの生成
def create_colleciton(self) :
self.client.create_collection(CollectionId=COLLECTION_ID)
# コレクションIDの削除
def delete_collection(self) :
self.client.delete_collection(CollectionId=COLLECTION_ID)
self.faceids = []
if os.path.exists(FACEIDSDUMP) :
os.remove(FACEIDSDUMP)
# 画像から顔認証IDを生成し、保管する
def regist_faceid(self,src_path=None) :
logger.info('regist faceid : image = %s' % (src_path if src_path is not None else '(camera picture)'))
# 画像をアップロードして、顔認証IDを取得
buf = self.read_image(src_path)
res = self.client.index_faces(
CollectionId = COLLECTION_ID,
Image = {
'Bytes' : buf
}
)
# 顔認証IDリストに追加(退避用ファイルにも保管)
self.faceids.extend(res.get('FaceRecords',[]))
logger.debug('registed : %s' % (json.dumps(self.faceids,indent=4,ensure_ascii=False)))
with codecs.open(FACEIDSDUMP,'wb','utf-8') as fp :
fp.write(json.dumps(self.faceids,indent=4,ensure_ascii=False))
logger.info('faceids = %s' % (json.dumps(self.faceids)))
# 画像から合致する顔認証IDを取得する
def is_match_faceid(self,src_path=None) :
logger.info('check faceid : image = %s' % (src_path if src_path is not None else '(camera picture)'))
# 画像をアップロードして、顔認証IDを検索
buf = self.read_image(src_path)
res = self.client.search_faces_by_image(
CollectionId = COLLECTION_ID,
Image = {
'Bytes' : buf
},
FaceMatchThreshold = THRESHOLD,
MaxFaces = MAX_FACES
)
logger.debug('got faceids : %s' % (json.dumps(res,indent=4,ensure_ascii=False)))
# 念の為、FaceIDが一致しているか確認
fids = [id.get('Face',{}).get('FaceId') for id in self.faceids]
for r in res.get('FaceMatches',[]) :
fid = r.get('Face',{}).get('FaceId')
if fid in fids :
return True
return False
```
テストコードはこんな感じで。自分の顔写真を1度撮影・登録した上で、2度目の自分の顔写真は「True」で、他人の顔写真「otherface.jpg」は「False」で返ってくることを確認。
```python:test_rekognition.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from rekognition import AWSRekognitionClient
# クライアントの生成
client = AWSRekognitionClient()
# 初回にコレクションIDを削除して、作り直し
try : client.delete_collection()
except : pass
client.create_colleciton()
# 自分の顔写真1枚撮影・登録
client.regist_faceid()
# 2度目の自分の顔写真を検証(True)
print(client.is_match_faceid())
# 他人の顔写真を検証(Falseが返ってくる)
print(client.is_match_faceid('otherface.jpg'))
```
# 実装:3. 認証結果が正しければ、Google Homeに話しかけさせる
3-1. 1、2で作成したスクリプトを組み合わせて、解錠を検知したら、顔認証して、Google Homeに「おかえり」と言わせるスクリプトを作成します。
```python:homeservice.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys,os,json,logging,time,pychromecast
from bottle import route, run as brun, static_file
from sesami import SesamiEventHandler
from rekognition import AWSRekognitionClient
logger = logging.getLogger()
REKOGNITION_TIME = 60 # 鍵を開けてから顔認証する最大時間(一旦60秒)
VOICEDIR = 'voices' # 音声ファイル管理ディレクトリ
CHROMECAST_IP = '[Your Google Home IP Address]' # Google Home IPアドレス
BOTTLE_HOST = '[Your Machine IP Address]' # ローカルPC IPアドレス(Bottle用)
BOTTLE_PORT = '[Your Port]' # Bottleポート
WELCOME_VOICE = 'welcome_back.mp3' # 音声ファイル名
WORKDIR = os.path.dirname(os.path.abspath(__file__))
# 音声ファイルが無ければ事前に作成
voicedir = os.path.join(WORKDIR,VOICEDIR)
if not os.path.exists(voicedir) :
os.makedirs(voicedir)
voicepath = os.path.join(WORKDIR,VOICEDIR,WELCOME_VOICE)
if not os.path.exists(voicepath) :
from gtts import gTTS
tts = gTTS(text='おかえりなさい',lang='ja')
tts.save(voicepath)
# Bottleリクエスト処理(音声ファイルのレスポンス)
@route('/%s/<file_path:path>' % (VOICEDIR))
def get_talk_path(file_path) :
return static_file(file_path, root=os.path.join(WORKDIR,VOICEDIR))
# 顔認証処理
is_rekognition = False
def rekognize_face() :
# 二重起動防止の為の排他処理
global is_rekognition
try :
if is_rekognition : return
is_rekognition = True
# Google Homeを取得
cast = pychromecast.Chromecast(CHROMECAST_IP)
cast.wait()
# 顔認証クライアントの準備
client = AWSRekognitionClient()
cnt = 0
while cnt < REKOGNITION_TIME :
# 顔認証が一致したら、Google Homeに音声を流す
# 終わったら、施錠をする
if client.is_match_faceid() :
url = 'http://%s:%s/%s/%s' % (BOTTLE_HOST,BOTTLE_PORT,VOICEDIR,WELCOME_VOICE)
cast.media_controller.play_media(url,'audio/%s' % (WELCOME_VOICE.split('.')[-1]))
cast.media_controller.block_until_active()
SesamiEventHandler.lock()
client.close_camera()
time.sleep(30)
break
cnt += 1
time.sleep(1)
except Exception as e :
exc_type, exc_obj, tb = sys.exc_info()
lineno = tb.tb_lineno
logger.error('happend error : %s,%s,%s' % (exc_type, exc_obj, lineno))
finally :
is_rekognition = False
try : client.close_camera()
except : pass
# メイン処理
def main() :
th = None
try :
# Sesamiイベントハンドラを起動
# イベントリスナーとして顔認証処理を追加
th = SesamiEventHandler(rekognize_face)
th.start()
brun(host=BOTTLE_HOST,port=BOTTLE_PORT)
finally :
th.stop()
th.join()
sys.exit(1)
if __name__=='__main__' :
import logging.config
logging.config.fileConfig(os.path.join(WORKDIR,'logging.conf'))
main()
```
実行の結果、帰宅して部屋に入ると、「おかえりなさい」と言ってくれました。よしよし。
[![帰宅時に人を認識してGoogle Homeに「おかえり」と言わせる(Sesami×AWS Rekognition)](http://img.youtube.com/vi/k9ZwyBsT7Uw/0.jpg)](http://www.youtube.com/watch?v=k9ZwyBsT7Uw)
# おわりに
- 上記を色々いじって、我が家に着くと、「おかえり」と言ってくれるだけでなく、電灯とテレビ、エアコン等を自動的に点けたりしてくれます。<br><br>今後、規格統一化が図られると、もっと家電と結びつけやすくなると思います。そうなれば生活はより便利になるでしょうね。<br><br>ではでは。