8
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

UL Systems (ウルシステムズ)Advent Calendar 2019

Day 25

帰宅時に人を認識してGoogle Homeに「おかえり」と言わせる(Sesami×AWS Rekognition)

Last updated at Posted at 2019-12-24

はじめに

  • UL Systems Advent Calendar 2019 の12/25の記事になります。

    スマートホーム」が益々定着していますね。
    先日、**Amazon、Apple、Googleがスマートホーム機器接続の統一規格に向けて提携**し、更に注目を集めているようです。

    自宅でも、Google Homeを中心に電灯、テレビ、エアコン、空気清浄機、鍵等、色々操作するようにしています。但し、Google Homeの欠点として、能動的に動作しない(話しかけないと始まらない)という点があります。

    家に着いたら、存在を認識して、
     ・「テレビでこんな面白い番組やっていますよ、見ますか?」とか
     ・「お風呂を入れておきますね」とか
    勝手に対話や行動を始めてくれたらいいなぁ、と思いました。

    そこで、その入口となるスクリプトを紹介します。

ゴール

  • 今回は、以下の3点を実装し、「帰宅時に人を認識してGoogle Homeに「おかえり」と言わせる」ことをゴールとします。
    1. 家に帰ってきたことをSesami解錠にて感知する(画像処理量軽減も兼ねて)
    2. 家に入った人をAWS Rekognitionにて認証させる(セキュリティ対策も兼ねて)
    3. 認証結果が正しければ、Google Homeに話しかけさせる

前提・準備

実装:1. 家に帰ってきたことをSesami解錠にて感知する

1-1. CANDY HOUSEにアクセスして、API Keyを取得します。
image.png
1-2. 念の為、API Keyで正常に動くかを検証します。

curl -H "Authorization: [Your API Key]" https://api.candyhouse.co/public/sesames

1-3. 無事動くことを検証できたら、「解錠を検知するスレッドクラス」を作成します。

sesami.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os,json,logging,requests,threading,time
from datetime import datetime

logger = logging.getLogger()

# Sesami API情報
API_URL = 'https://api.candyhouse.co/public'
API_KEY = '[Your API Key]'

# 解錠を検知する周期(秒)
WAITTIME = 5
# 解錠を検知する時間(開始、終了)
ACTIVETIMES = [
    ('0000','0200'),
    ('1800','2400'), # 夕方18時から深夜2時まで検知
]

# Sesamiの解錠を検知するスレッドクラス
class SesamiEventHandler(threading.Thread) :
    devices = None
    # Sesami解錠状態を検知するクラスメソッド
    @classmethod
    def get_devices(cls) :
        # 認証しているSesami情報一覧を取得
        if cls.devices is None :
            res = requests.get(
                '%s/sesames' % (API_URL),
                headers={
                    'Authorization' : API_KEY
                }
            )
            logger.debug('got sesamis : %s' % (res.text))
            cls.devices = json.loads(res.text)
    # Sesami解錠状態を検知するクラスメソッド
    @classmethod
    def is_open(cls) :
        cls.get_devices()
        # 認証しているSesamiの中で解錠状態になっているものがあればTrueを返す
        # 一つもなければ、Falseを返す
        for v in cls.devices :
            device_id = v.get('device_id')
            res = requests.get(
                '%s/sesame/%s' % (API_URL,device_id),
                headers={
                    'Authorization' : API_KEY
                }
            )
            logger.debug('got sesami condition(%s) : %s' % (device_id,res.text))
            if json.loads(res.text).get('locked',True) == False : return True
        return False
    # Sesamiにコマンドを送信するクラスメソッド
    @classmethod
    def send_command(cls,command) :
        cls.get_devices()
        # 認証しているSesamiにコマンドを送信する
        for v in cls.devices :
            device_id = v.get('device_id')
            res = requests.post(
                '%s/sesame/%s' % (API_URL,device_id),
                json.dumps({
                    'command' : command
                }),
                headers={
                    'Authorization' : API_KEY,
                    'Content-Type'  : 'application/json'
                }
            )
            logger.debug('send sesami command (%s, %s) : %s' % (device_id,command,res.text))
    # Sesami施錠メソッド
    @classmethod
    def lock(cls) :
        cls.send_command('lock')
    # Sesami解錠メソッド
    @classmethod
    def unlock(cls) :
        cls.send_command('unlock')
    # Sesami状態の強制取得メソッド
    @classmethod
    def sync(cls) :
        cls.send_command('sync')
    # コンストラクタ
    def __init__(self,listener=None) :
        threading.Thread.__init__(self)
        self.enable = True
        self.listeners = []
        self.waittime = WAITTIME
        self.actives  = ACTIVETIMES
        self.add_listener(listener)
    # スレッドの停止メソッド
    def stop(self) :
        self.enable = False
    # 検知リスナーの追加メソッド
    def add_listener(self,listener) :
        if listener is not None :
            self.listeners.append(listener)
    # スレッド処理メソッド
    def run(self) :
        # 有効な間は繰り返し処理
        while self.enable :
            # 現在時間が有効時間内で、且つ解錠状態であれば
            # 検知リスナーを順次実行する
            logger.debug('checking sesami...')
            t = datetime.now().strftime('%H%M')
            for start,end in self.actives :
                if start <= t and t <= end and self.is_open() :
                    for lsn in self.listeners : lsn()
            time.sleep(self.waittime)
```
テストコードはこんな感じで鍵を開けたときにOpened!」と出ることを確認

````python:test_sesami.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys,time
from sesami import SesamiEventHandler

try :
    # Sesamiハンドラスレッドの生成、検知開始
    th = SesamiEventHandler(lambda : print('Opened!'))
    th.start()
    time.sleep(10)
    SesamiEventHandler.unlock()
    time.sleep(10)
    SesamiEventHandler.lock()
    time.sleep(10)
finally :
    # 終わったら、Sesamiハンドラスレッドの停止して終了
    th.stop()
    th.join()
    sys.exit(1)
```


# 実装:2. 家に入った人をAWS Rekognitionにて認証させる
2-1. [AWS IAM](https://console.aws.amazon.com/iam/home)にてAPI利用のためのユーザーを用意します
![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/122711/461fd117-0432-3719-1fcf-f90204f4c4b9.png)

2-2. 用意したユーザーに、「AmazonRekognitionFullAccessのアクセス権限を付与します
![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/122711/ed5e3fae-4c13-16d7-3d53-b4e697cce7e1.png)

2-3. 更にユーザーのアクセスキーを作成します
![image.png](https://qiita-image-store.s3.ap-northeast-1.amazonaws.com/0/122711/465cbbf0-af76-2345-4615-428b6fa3d134.png)

2-4. 用意したアクセスキーを使えるようローカル側に認証ファイルを保存しておきます


````ini:~/.aws/config
[default]
output = json
region = [Your Region]
```

```ini:~/.aws/credentials
[default]
aws_access_key_id = [AWS Access Key]
aws_secret_access_key = [AWS Secret Access Key]
```

2-5. 顔認証をするAWS Rekognitionクライアントを作成します

```python:rekognition.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os,json,codecs,logging,boto3,cv2

logger  = logging.getLogger()
WORKDIR = os.path.dirname(os.path.abspath(__file__))

# AWSセッティング
COLLECTION_ID  = 'MyFaces'          # 顔コレクションID
THRESHOLD      = 97.5               # 顔の一致具合(97.5%以下は対象としない)
MAX_FACES      = 3                  # 一度に分析する最大人数(3名まで)
FACEIDSDUMP    = os.path.join(WORKDIR ,'faceids.json')     # 顔認証IDの保管ファイル名

# AWS Rekognitionサービスクライアント
class AWSRekognitionClient(object) :
    camera = None
    # カメラ操作オブジェクトの生成
    @classmethod
    def open_camera(cls) :
        try : cls.camera = cv2.VideoCapture(0)
        except : pass
    # カメラ操作オブジェクトの開放
    @classmethod
    def close_camera(cls) :
        try : cls.camera.release()
        except : pass
    @classmethod
    def read_image(cls,path) :
        # 画像があれば、データを取得
        # 画像がなければ、カメラから撮影
        try :
            with open(path,'rb') as fp : buf = fp.read()
        except :
            _, buf = cls.camera.read()
            _, buf = cv2.imencode('.png',buf)
            buf = buf.tobytes()
        return buf
    # コンストラクタ
    def __init__(self) :
        self.open_camera()
        self.client = boto3.client('rekognition')
        self.faceids = []
        # 顔認証IDの保管ファイルが既にあれば、初回に読み込み
        if os.path.exists(FACEIDSDUMP) :
            try :
                with codecs.open(FACEIDSDUMP,'rb','utf-8') as fp :
                    self.faceids = json.load(fp)
            except : pass
    # コレクションIDの生成
    def create_colleciton(self) :
        self.client.create_collection(CollectionId=COLLECTION_ID)
    # コレクションIDの削除
    def delete_collection(self) :
        self.client.delete_collection(CollectionId=COLLECTION_ID)
        self.faceids = []
        if os.path.exists(FACEIDSDUMP) :
            os.remove(FACEIDSDUMP)
    # 画像から顔認証IDを生成し、保管する
    def regist_faceid(self,src_path=None) :
        logger.info('regist faceid : image = %s' % (src_path if src_path is not None else '(camera picture)'))
        # 画像をアップロードして、顔認証IDを取得
        buf = self.read_image(src_path)
        res = self.client.index_faces(
            CollectionId = COLLECTION_ID,
            Image = {
                'Bytes' : buf
            }
        )
        # 顔認証IDリストに追加(退避用ファイルにも保管)
        self.faceids.extend(res.get('FaceRecords',[]))
        logger.debug('registed : %s' % (json.dumps(self.faceids,indent=4,ensure_ascii=False)))
        with codecs.open(FACEIDSDUMP,'wb','utf-8') as fp :
            fp.write(json.dumps(self.faceids,indent=4,ensure_ascii=False))
        logger.info('faceids = %s' % (json.dumps(self.faceids)))
    # 画像から合致する顔認証IDを取得する
    def is_match_faceid(self,src_path=None) :
        logger.info('check  faceid : image = %s' % (src_path if src_path is not None else '(camera picture)'))
        # 画像をアップロードして、顔認証IDを検索
        buf = self.read_image(src_path)
        res = self.client.search_faces_by_image(
            CollectionId = COLLECTION_ID,
            Image = {
                'Bytes' : buf
            },
            FaceMatchThreshold = THRESHOLD,
            MaxFaces = MAX_FACES
        )
        logger.debug('got faceids : %s' % (json.dumps(res,indent=4,ensure_ascii=False)))
        # 念の為、FaceIDが一致しているか確認
        fids = [id.get('Face',{}).get('FaceId') for id in self.faceids]
        for r in res.get('FaceMatches',[]) :
            fid = r.get('Face',{}).get('FaceId')
            if fid in fids :
                return True
        return False
```

テストコードはこんな感じで自分の顔写真を1度撮影登録した上で2度目の自分の顔写真はTrue他人の顔写真otherface.jpgFalseで返ってくることを確認

```python:test_rekognition.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from rekognition import AWSRekognitionClient

# クライアントの生成
client = AWSRekognitionClient()

# 初回にコレクションIDを削除して、作り直し
try : client.delete_collection()
except : pass
client.create_colleciton()

# 自分の顔写真1枚撮影・登録
client.regist_faceid()

# 2度目の自分の顔写真を検証(True)
print(client.is_match_faceid())

# 他人の顔写真を検証(Falseが返ってくる)
print(client.is_match_faceid('otherface.jpg'))
```

# 実装:3. 認証結果が正しければ、Google Homeに話しかけさせる
3-1. 12で作成したスクリプトを組み合わせて解錠を検知したら顔認証してGoogle Homeにおかえりと言わせるスクリプトを作成します

```python:homeservice.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys,os,json,logging,time,pychromecast
from bottle import route, run as brun, static_file
from sesami import SesamiEventHandler
from rekognition import AWSRekognitionClient

logger = logging.getLogger()

REKOGNITION_TIME = 60 # 鍵を開けてから顔認証する最大時間(一旦60秒)
VOICEDIR = 'voices' # 音声ファイル管理ディレクトリ
CHROMECAST_IP = '[Your Google Home IP Address]' # Google Home IPアドレス
BOTTLE_HOST   = '[Your Machine IP Address]' # ローカルPC IPアドレス(Bottle用)
BOTTLE_PORT   = '[Your Port]' # Bottleポート
WELCOME_VOICE = 'welcome_back.mp3' # 音声ファイル名

WORKDIR  = os.path.dirname(os.path.abspath(__file__))

# 音声ファイルが無ければ事前に作成
voicedir  = os.path.join(WORKDIR,VOICEDIR)
if not os.path.exists(voicedir) :
    os.makedirs(voicedir)
voicepath = os.path.join(WORKDIR,VOICEDIR,WELCOME_VOICE)
if not os.path.exists(voicepath) :
    from gtts import gTTS
    tts = gTTS(text='おかえりなさい',lang='ja')
    tts.save(voicepath)

# Bottleリクエスト処理(音声ファイルのレスポンス)
@route('/%s/<file_path:path>' % (VOICEDIR))
def get_talk_path(file_path) :
    return static_file(file_path, root=os.path.join(WORKDIR,VOICEDIR))

# 顔認証処理
is_rekognition = False
def rekognize_face() :
    # 二重起動防止の為の排他処理
    global is_rekognition
    try :
        if is_rekognition : return
        is_rekognition = True
        # Google Homeを取得
        cast = pychromecast.Chromecast(CHROMECAST_IP)
        cast.wait()
        # 顔認証クライアントの準備
        client = AWSRekognitionClient()
        cnt = 0
        while cnt < REKOGNITION_TIME :
            # 顔認証が一致したら、Google Homeに音声を流す
            # 終わったら、施錠をする
            if client.is_match_faceid() :
                url = 'http://%s:%s/%s/%s' % (BOTTLE_HOST,BOTTLE_PORT,VOICEDIR,WELCOME_VOICE)
                cast.media_controller.play_media(url,'audio/%s' % (WELCOME_VOICE.split('.')[-1]))
                cast.media_controller.block_until_active()
                SesamiEventHandler.lock()
                client.close_camera()
                time.sleep(30)
                break
            cnt += 1
            time.sleep(1)
    except Exception as e :
        exc_type, exc_obj, tb = sys.exc_info()
        lineno = tb.tb_lineno
        logger.error('happend error : %s,%s,%s' % (exc_type, exc_obj, lineno))
    finally :
        is_rekognition = False
        try : client.close_camera()
        except : pass

# メイン処理
def main() :
    th = None
    try :
        # Sesamiイベントハンドラを起動
        # イベントリスナーとして顔認証処理を追加
        th = SesamiEventHandler(rekognize_face)
        th.start()
        brun(host=BOTTLE_HOST,port=BOTTLE_PORT)
    finally :
        th.stop()
        th.join()
        sys.exit(1)

if __name__=='__main__' :
    import logging.config
    logging.config.fileConfig(os.path.join(WORKDIR,'logging.conf'))
    main()
```
実行の結果帰宅して部屋に入ると、「おかえりなさいと言ってくれましたよしよし
[![帰宅時に人を認識してGoogle Homeにおかえりと言わせるSesami×AWS Rekognition](http://img.youtube.com/vi/k9ZwyBsT7Uw/0.jpg)](http://www.youtube.com/watch?v=k9ZwyBsT7Uw)

# おわりに
- 上記を色々いじって我が家に着くと、「おかえりと言ってくれるだけでなく電灯とテレビエアコン等を自動的に点けたりしてくれます<br><br>今後規格統一化が図られるともっと家電と結びつけやすくなると思いますそうなれば生活はより便利になるでしょうね<br><br>ではでは
8
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?