Help us understand the problem. What is going on with this article?

帰宅時に人を認識してGoogle Homeに「おかえり」と言わせる(Sesami×AWS Rekognition)

はじめに

  • UL Systems Advent Calendar 2019 の12/25の記事になります。

    スマートホーム」が益々定着していますね。
    先日、Amazon、Apple、Googleがスマートホーム機器接続の統一規格に向けて提携し、更に注目を集めているようです。

    自宅でも、Google Homeを中心に電灯、テレビ、エアコン、空気清浄機、鍵等、色々操作するようにしています。但し、Google Homeの欠点として、能動的に動作しない(話しかけないと始まらない)という点があります。

    家に着いたら、存在を認識して、
     ・「テレビでこんな面白い番組やっていますよ、見ますか?」とか
     ・「お風呂を入れておきますね」とか
    勝手に対話や行動を始めてくれたらいいなぁ、と思いました。

    そこで、その入口となるスクリプトを紹介します。

ゴール

  • 今回は、以下の3点を実装し、「帰宅時に人を認識してGoogle Homeに「おかえり」と言わせる」ことをゴールとします。
    1. 家に帰ってきたことをSesami解錠にて感知する(画像処理量軽減も兼ねて)
    2. 家に入った人をAWS Rekognitionにて認証させる(セキュリティ対策も兼ねて)
    3. 認証結果が正しければ、Google Homeに話しかけさせる

前提・準備

実装:1. 家に帰ってきたことをSesami解錠にて感知する

1-1. CANDY HOUSEにアクセスして、API Keyを取得します。
image.png
1-2. 念の為、API Keyで正常に動くかを検証します。

curl -H "Authorization: [Your API Key]" https://api.candyhouse.co/public/sesames

1-3. 無事動くことを検証できたら、「解錠を検知するスレッドクラス」を作成します。

sesami.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os,json,logging,requests,threading,time
from datetime import datetime

logger = logging.getLogger()

# Sesami API情報
API_URL = 'https://api.candyhouse.co/public'
API_KEY = '[Your API Key]'

# 解錠を検知する周期(秒)
WAITTIME = 5
# 解錠を検知する時間(開始、終了)
ACTIVETIMES = [
    ('0000','0200'),
    ('1800','2400'), # 夕方18時から深夜2時まで検知
]

# Sesamiの解錠を検知するスレッドクラス
class SesamiEventHandler(threading.Thread) :
    devices = None
    # Sesami解錠状態を検知するクラスメソッド
    @classmethod
    def get_devices(cls) :
        # 認証しているSesami情報一覧を取得
        if cls.devices is None :
            res = requests.get(
                '%s/sesames' % (API_URL),
                headers={
                    'Authorization' : API_KEY
                }
            )
            logger.debug('got sesamis : %s' % (res.text))
            cls.devices = json.loads(res.text)
    # Sesami解錠状態を検知するクラスメソッド
    @classmethod
    def is_open(cls) :
        cls.get_devices()
        # 認証しているSesamiの中で解錠状態になっているものがあればTrueを返す
        # 一つもなければ、Falseを返す
        for v in cls.devices :
            device_id = v.get('device_id')
            res = requests.get(
                '%s/sesame/%s' % (API_URL,device_id),
                headers={
                    'Authorization' : API_KEY
                }
            )
            logger.debug('got sesami condition(%s) : %s' % (device_id,res.text))
            if json.loads(res.text).get('locked',True) == False : return True
        return False
    # Sesamiにコマンドを送信するクラスメソッド
    @classmethod
    def send_command(cls,command) :
        cls.get_devices()
        # 認証しているSesamiにコマンドを送信する
        for v in cls.devices :
            device_id = v.get('device_id')
            res = requests.post(
                '%s/sesame/%s' % (API_URL,device_id),
                json.dumps({
                    'command' : command
                }),
                headers={
                    'Authorization' : API_KEY,
                    'Content-Type'  : 'application/json'
                }
            )
            logger.debug('send sesami command (%s, %s) : %s' % (device_id,command,res.text))
    # Sesami施錠メソッド
    @classmethod
    def lock(cls) :
        cls.send_command('lock')
    # Sesami解錠メソッド
    @classmethod
    def unlock(cls) :
        cls.send_command('unlock')
    # Sesami状態の強制取得メソッド
    @classmethod
    def sync(cls) :
        cls.send_command('sync')
    # コンストラクタ
    def __init__(self,listener=None) :
        threading.Thread.__init__(self)
        self.enable = True
        self.listeners = []
        self.waittime = WAITTIME
        self.actives  = ACTIVETIMES
        self.add_listener(listener)
    # スレッドの停止メソッド
    def stop(self) :
        self.enable = False
    # 検知リスナーの追加メソッド
    def add_listener(self,listener) :
        if listener is not None :
            self.listeners.append(listener)
    # スレッド処理メソッド
    def run(self) :
        # 有効な間は繰り返し処理
        while self.enable :
            # 現在時間が有効時間内で、且つ解錠状態であれば
            # 検知リスナーを順次実行する
            logger.debug('checking sesami...')
            t = datetime.now().strftime('%H%M')
            for start,end in self.actives :
                if start <= t and t <= end and self.is_open() :
                    for lsn in self.listeners : lsn()
            time.sleep(self.waittime)

テストコードはこんな感じで。鍵を開けたときに「Opened!」と出ることを確認。

test_sesami.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import sys,time
from sesami import SesamiEventHandler

try :
    # Sesamiハンドラスレッドの生成、検知開始
    th = SesamiEventHandler(lambda : print('Opened!'))
    th.start()
    time.sleep(10)
    SesamiEventHandler.unlock()
    time.sleep(10)
    SesamiEventHandler.lock()
    time.sleep(10)
finally :
    # 終わったら、Sesamiハンドラスレッドの停止して終了
    th.stop()
    th.join()
    sys.exit(1)

実装:2. 家に入った人をAWS Rekognitionにて認証させる

2-1. AWS IAMにて、API利用のためのユーザーを用意します。
image.png

2-2. 用意したユーザーに、「AmazonRekognitionFullAccess」のアクセス権限を付与します。
image.png

2-3. 更にユーザーの、アクセスキーを作成します。
image.png

2-4. 用意したアクセスキーを使えるよう、ローカル側に認証ファイルを保存しておきます。

~/.aws/config
[default]
output = json
region = [Your Region]
~/.aws/credentials
[default]
aws_access_key_id = [AWS Access Key]
aws_secret_access_key = [AWS Secret Access Key]

2-5. 顔認証をする「AWS Rekognitionクライアント」を作成します。

rekognition.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os,json,codecs,logging,boto3,cv2

logger  = logging.getLogger()
WORKDIR = os.path.dirname(os.path.abspath(__file__))

# AWSセッティング
COLLECTION_ID  = 'MyFaces'          # 顔コレクションID
THRESHOLD      = 97.5               # 顔の一致具合(97.5%以下は対象としない)
MAX_FACES      = 3                  # 一度に分析する最大人数(3名まで)
FACEIDSDUMP    = os.path.join(WORKDIR ,'faceids.json')     # 顔認証IDの保管ファイル名

# AWS Rekognitionサービスクライアント
class AWSRekognitionClient(object) :
    camera = None
    # カメラ操作オブジェクトの生成
    @classmethod
    def open_camera(cls) :
        try : cls.camera = cv2.VideoCapture(0)
        except : pass
    # カメラ操作オブジェクトの開放
    @classmethod
    def close_camera(cls) :
        try : cls.camera.release()
        except : pass
    @classmethod
    def read_image(cls,path) :
        # 画像があれば、データを取得
        # 画像がなければ、カメラから撮影
        try :
            with open(path,'rb') as fp : buf = fp.read()
        except :
            _, buf = cls.camera.read()
            _, buf = cv2.imencode('.png',buf)
            buf = buf.tobytes()
        return buf
    # コンストラクタ
    def __init__(self) :
        self.open_camera()
        self.client = boto3.client('rekognition')
        self.faceids = []
        # 顔認証IDの保管ファイルが既にあれば、初回に読み込み
        if os.path.exists(FACEIDSDUMP) :
            try :
                with codecs.open(FACEIDSDUMP,'rb','utf-8') as fp :
                    self.faceids = json.load(fp)
            except : pass
    # コレクションIDの生成
    def create_colleciton(self) :
        self.client.create_collection(CollectionId=COLLECTION_ID)
    # コレクションIDの削除
    def delete_collection(self) :
        self.client.delete_collection(CollectionId=COLLECTION_ID)
        self.faceids = []
        if os.path.exists(FACEIDSDUMP) :
            os.remove(FACEIDSDUMP)
    # 画像から顔認証IDを生成し、保管する
    def regist_faceid(self,src_path=None) :
        logger.info('regist faceid : image = %s' % (src_path if src_path is not None else '(camera picture)'))
        # 画像をアップロードして、顔認証IDを取得
        buf = self.read_image(src_path)
        res = self.client.index_faces(
            CollectionId = COLLECTION_ID,
            Image = {
                'Bytes' : buf
            }
        )
        # 顔認証IDリストに追加(退避用ファイルにも保管)
        self.faceids.extend(res.get('FaceRecords',[]))
        logger.debug('registed : %s' % (json.dumps(self.faceids,indent=4,ensure_ascii=False)))
        with codecs.open(FACEIDSDUMP,'wb','utf-8') as fp :
            fp.write(json.dumps(self.faceids,indent=4,ensure_ascii=False))
        logger.info('faceids = %s' % (json.dumps(self.faceids)))
    # 画像から合致する顔認証IDを取得する
    def is_match_faceid(self,src_path=None) :
        logger.info('check  faceid : image = %s' % (src_path if src_path is not None else '(camera picture)'))
        # 画像をアップロードして、顔認証IDを検索
        buf = self.read_image(src_path)
        res = self.client.search_faces_by_image(
            CollectionId = COLLECTION_ID,
            Image = {
                'Bytes' : buf
            },
            FaceMatchThreshold = THRESHOLD,
            MaxFaces = MAX_FACES
        )
        logger.debug('got faceids : %s' % (json.dumps(res,indent=4,ensure_ascii=False)))
        # 念の為、FaceIDが一致しているか確認
        fids = [id.get('Face',{}).get('FaceId') for id in self.faceids]
        for r in res.get('FaceMatches',[]) :
            fid = r.get('Face',{}).get('FaceId')
            if fid in fids :
                return True
        return False

テストコードはこんな感じで。自分の顔写真を1度撮影・登録した上で、2度目の自分の顔写真は「True」で、他人の顔写真「otherface.jpg」は「False」で返ってくることを確認。

test_rekognition.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from rekognition import AWSRekognitionClient

# クライアントの生成
client = AWSRekognitionClient()

# 初回にコレクションIDを削除して、作り直し
try : client.delete_collection()
except : pass
client.create_colleciton()

# 自分の顔写真1枚撮影・登録
client.regist_faceid()

# 2度目の自分の顔写真を検証(True)
print(client.is_match_faceid())

# 他人の顔写真を検証(Falseが返ってくる)
print(client.is_match_faceid('otherface.jpg'))

実装:3. 認証結果が正しければ、Google Homeに話しかけさせる

3-1. 1、2で作成したスクリプトを組み合わせて、解錠を検知したら、顔認証して、Google Homeに「おかえり」と言わせるスクリプトを作成します。

homeservice.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-
import sys,os,json,logging,time,pychromecast
from bottle import route, run as brun, static_file
from sesami import SesamiEventHandler
from rekognition import AWSRekognitionClient

logger = logging.getLogger()

REKOGNITION_TIME = 60 # 鍵を開けてから顔認証する最大時間(一旦60秒)
VOICEDIR = 'voices' # 音声ファイル管理ディレクトリ
CHROMECAST_IP = '[Your Google Home IP Address]' # Google Home IPアドレス
BOTTLE_HOST   = '[Your Machine IP Address]' # ローカルPC IPアドレス(Bottle用)
BOTTLE_PORT   = '[Your Port]' # Bottleポート
WELCOME_VOICE = 'welcome_back.mp3' # 音声ファイル名

WORKDIR  = os.path.dirname(os.path.abspath(__file__))

# 音声ファイルが無ければ事前に作成
voicedir  = os.path.join(WORKDIR,VOICEDIR)
if not os.path.exists(voicedir) :
    os.makedirs(voicedir)
voicepath = os.path.join(WORKDIR,VOICEDIR,WELCOME_VOICE)
if not os.path.exists(voicepath) :
    from gtts import gTTS
    tts = gTTS(text='おかえりなさい',lang='ja')
    tts.save(voicepath)

# Bottleリクエスト処理(音声ファイルのレスポンス)
@route('/%s/<file_path:path>' % (VOICEDIR))
def get_talk_path(file_path) :
    return static_file(file_path, root=os.path.join(WORKDIR,VOICEDIR))

# 顔認証処理
is_rekognition = False
def rekognize_face() :
    # 二重起動防止の為の排他処理
    global is_rekognition
    try :
        if is_rekognition : return
        is_rekognition = True
        # Google Homeを取得
        cast = pychromecast.Chromecast(CHROMECAST_IP)
        cast.wait()
        # 顔認証クライアントの準備
        client = AWSRekognitionClient()
        cnt = 0
        while cnt < REKOGNITION_TIME :
            # 顔認証が一致したら、Google Homeに音声を流す
            # 終わったら、施錠をする
            if client.is_match_faceid() :
                url = 'http://%s:%s/%s/%s' % (BOTTLE_HOST,BOTTLE_PORT,VOICEDIR,WELCOME_VOICE)
                cast.media_controller.play_media(url,'audio/%s' % (WELCOME_VOICE.split('.')[-1]))
                cast.media_controller.block_until_active()
                SesamiEventHandler.lock()
                client.close_camera()
                time.sleep(30)
                break
            cnt += 1
            time.sleep(1)
    except Exception as e :
        exc_type, exc_obj, tb = sys.exc_info()
        lineno = tb.tb_lineno
        logger.error('happend error : %s,%s,%s' % (exc_type, exc_obj, lineno))
    finally :
        is_rekognition = False
        try : client.close_camera()
        except : pass

# メイン処理
def main() :
    th = None
    try :
        # Sesamiイベントハンドラを起動
        # イベントリスナーとして顔認証処理を追加
        th = SesamiEventHandler(rekognize_face)
        th.start()
        brun(host=BOTTLE_HOST,port=BOTTLE_PORT)
    finally :
        th.stop()
        th.join()
        sys.exit(1)

if __name__=='__main__' :
    import logging.config
    logging.config.fileConfig(os.path.join(WORKDIR,'logging.conf'))
    main()

実行の結果、帰宅して部屋に入ると、「おかえりなさい」と言ってくれました。よしよし。
帰宅時に人を認識してGoogle Homeに「おかえり」と言わせる(Sesami×AWS Rekognition)

おわりに

  • 上記を色々いじって、我が家に着くと、「おかえり」と言ってくれるだけでなく、電灯とテレビ、エアコン等を自動的に点けたりしてくれます。

    今後、規格統一化が図られると、もっと家電と結びつけやすくなると思います。そうなれば生活はより便利になるでしょうね。

    ではでは。
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした