LoginSignup
1
0

ネットスーパー配達員が出発したことをずんだもんにGoogle Home経由で教えてもらう

Last updated at Posted at 2024-05-02

はじめに

ネットスーパー配達員 と 猫

食べ物にこだわりがないと、スーパーで買う商品は同じものになりがち。ということでネットスーパー利用してみたらめちゃくちゃ便利!配達指定も2時間単位で出来る!
ところで、やることない休みの日なんか猫と遊ぶに決まってる。お腹の上に乗って遊んじゃったりする。そして鳴り響くチャイム音と、お腹に爪を立てられて苦悶の声を発する私。チャイム音にビックリしたら爪も立てちゃうよね。
来訪直前まで遊んでたいんだけどなー、が動機

やりたくなったこと その①

そういえば配達員さんがお店を出発したタイミングでこんなメール届いてた。
このメールが来てから7分くらいでチャイムが鳴る。このメールをトリガーに何か出来ない?
image.png

やりたくなったこと その②

最近Amazonの配達でヤマトさんでも佐川さんでもない業者がたまに配達してくる。この場合なぜか在宅でも玄関の外に置き配されてしまう。ワタシ家いますよ・・?
(再配達問題は知ってるしほぼ100%受け取るようにしてます!配送業の方には感謝の日々)
夜遅くまで気付かずに、玄関の外に荷物が置きっぱなしなんてことが何度かあった。
でも置き配が完了するとメールが届いてる。このメールをトリガーに何か出来ない?
image.png

この記事で実現したこと

supermarket@xxxx.jp から「配達開始のご連絡」が含まれる件名のメールが来たら、
ずんだもんに「ネットスーパー 担当者さんが出発しました!」とGoogle Nest Miniから教えてもらう。

同様にorder-update@amazon.co.jp から 「ご注文商品の配達が完了しました」が含まれる件名のメールが来たら、「Amazonの置き配が完了しました!」と教えてもらう。

実行は9時~20時の5分間隔

前提

・ヤマハのルーターRTX1220を使う。インターネットとはPPPoE接続する。Web GUIを使う。
・wifiはAPモード
・AWS EC2インスタンス Amazon Linux 2023
・Pythonのバージョン3.9
・プログラムは/home/ec2-user/project/speak_google_home ディレクトリに入れる

構成図

Google Home(=Google Nest Mini)に音声ファイルの再生要求を行うためには、VPN対応ルーターがあれば出来そう
image.png

AWSとルーターの設定

画面を張り付けていく

仮想プライベートゲートウェイ

VPCとはVirtual Private Cloudの略。
VPCの仮想プライベートゲートウェイから作成を選ぶ
image.png

名前は適当にデフォルト値でも入力して、仮想プライベートゲートウェイを作成
image.png

正常に作成できたら、アクションの「VPCへアタッチ」を選ぶ
image.png

さっき作ったVPCを選んでアタッチ
image.png

Site-to-Site VPN接続

Site-to-Site VPN接続を選んで、VPN接続を作成する
image.png

名前は適当に入力、仮想プライベートゲートウェイはさっきのを選択、カスタマーゲートウェイは新規にして、IPアドレスはプロバイダから割り振られたグローバルIPアドレスを入力する。以下のように確認して、VPN接続を作成する。
image.png

正常にVPN接続が作成できたことを確認する。
この赤枠内のVPN IDは後で使う。
image.png

IAM

IAMとはidentity and Access Managementの略。
ルーター側のVPN接続で必要になるアクセスキーとシークレットアクセスキーを取得するための内容。
IAMのユーザーの作成を選択
image.png

ユーザー名は適当に決める。
image.png

グループを作成
image.png

グループ名も適当に決める。許可ポリシータイプにAmazonVPCReadOnlyAccessを選んでグループを作成する。ポリシーはたくさんあるので、検索機能を使うと楽。
image.png

グループ名を選択して!次へ
image.png

確認してユーザーの作成
image.png

正常にユーザーが追加されたことを確認する。追加したユーザーをクリック
image.png

「アクセスキーを作成」を選択
image.png

AWSの外部で実行されるアプリケーションを選択して次へ
image.png

自分で管理しやすい説明を入れてから、アクセスキーを作成
image.png

これでアクセスキーが発行された。この画面は2度と表示されないので、忘れずにcsvファイルをダウンロードする。この中にアクセスキーとシークレットアクセスキーが入っている。完了押して、AWS上の作業はいったん終わり。
image.png

ヤマハルーター(RTX1220)からVPN接続

ブラウザからルーターにログインする。デフォルトURLはhttp://192.168.100.1
かんたん設定->VPN->クラウド接続を選び、新規をクリック
image.png

次へ
image.png

設定名は管理しやすい名前を入力し、AWS上で設定した内容を入力して次へ
・アクセスキーID
 IAMのユーザー追加時に取得したアクセスキー
・シークレットアクセスキー
 IAMのユーザー追加時に取得したシークレットアクセスキー
・VPN ID
 Site-to-Site VPNで作ったVPN ID

image.png

確認してから「設定の確定」
image.png

うまくいけば以下画面が表示される。おめでとう!
image.png

こんな画面が表示される。そうなんだ~って感じでルーター側の作業おわり
image.png

Site-to-Site VPN のトンネル状態確認

Site-to-Site VPN 接続->作成したVPNを選択->画面真ん中のトンネルの詳細 を選択
トンネルのステータスが2つともアップになっていることを確認する。
1,2分かかるみたいなので、何も設定は変えずに最低でも3分は待とう
image.png

ルートテーブル

ルートテーブル->アクション->ルート伝播の編集 を選ぶ
image.png

有効化にチェックを入れて保存
image.png

続けてルートタブを選び、ルートを編集 を選択
image.png

ローカルIPネットワークを追加する。ターゲットは仮想プライベートゲートウェイにして、作成したゲートウェイを選択して変更を保存
image.png

VPNの設定はこれでおわり

EC2インスタンスからルーターやgoogle nest miniに対してpingを打って確認してみよう。うまく接続できていれば通るはず。pingの確認については、EC2セキュリティグループにICMPを許可する必要はない。

プログラム

Pythonを書いてく

Google Nest Mini のIPを調べる

スマホにGoogle Homeのアプリを入れて、
デバイス->部屋の名前->設定ボタン->デバイス情報
とタップしていくとIPアドレスが確認できる。ヤマハのルーターに繋がっているので192.168.100.0 のネットワークのはず。今回は192.168.100.10だった。
image.png

Google Home に指定音声ファイルを読み上げさせる

pychromecastをインストールして、ホスト名指定で公開URLの音声が再生されることを確認する。
pychromecastのバージョンは14.0.1

google_home.py
import pychromecast

def play_sound(url_path):

    # ホスト名指定でデバイス取得する。UUIDとかChromecastはなんでも良い模様。
    cast = pychromecast.get_chromecast_from_host(host=['192.168.100.10', 8009, 'UUID', 'Chromecast', 'リビングルーム'], tries = 1)

    # どう判断すればいいか分からなかったのでメーカー名でチェック!(笑)
    if cast.cast_info.manufacturer != 'Google Inc.':
        return

    # 指定URLの音声ファイルを再生
    cast.wait()
    cast.media_controller.play_media(url_path, 'audio/mp3')
    cast.media_controller.block_until_active()

def main():
    play_sound('https://storage.googleapis.com/xxxxx/google_home.wave')

if __name__ == "__main__":
    main()

VOICEVOX

音声ファイルを作成する環境とプログラム(voicevox.py)については前の記事に書いた

Google Cloud Storage へ保存と削除

Google Cloud Storageへファイル保存するプログラムを単体で動作確認しておく。
けっこう前なので覚えてないけど、ChatGPTに手伝ってもらったのかな。
今回の記事で削除機能は使わない。

google_storage.py
from google.cloud import storage
import re

NINSYO_JSON = 'vast-xxxxxx.json'
BUCKET_NAME = 'xxxxxx'

# google cloud storageにファイルを保存する
def savefile(filepath, upfile):

    # 認証情報を設定
    client = storage.Client.from_service_account_json(NINSYO_JSON)

    # バケットを指定
    bucket = client.bucket(BUCKET_NAME)

    # アップロードするファイル名と保存先のパスを指定
    blob = bucket.blob(upfile)

    # 画像ファイルをアップロード
    with open(filepath, 'rb') as f:
        blob.upload_from_file(f)

    # 公開URLを返却
    return 'https://storage.googleapis.com/' + BUCKET_NAME + '/' + upfile

# 正規表現で削除
def re_delfile(file_name):

    # 削除するファイルにマッチする正規表現パターンを指定します
    file_pattern = re.compile(f'^{file_name}')

    # 認証情報を設定
    client = storage.Client.from_service_account_json(NINSYO_JSON)

    # バケットオブジェクトを取得します
    bucket = client.get_bucket(BUCKET_NAME)

    # バケット内のすべてのファイルを取得します
    all_files = bucket.list_blobs()

    # ファイルを削除します
    for file in all_files:
        if file_pattern.match(file.name):
            file.delete()

# メイン関数
def main():
    savefile('bbb.mp3', 'bbb.mp3')

if __name__ == "__main__":
    main()

Gmail Api

現在から過去N分前までのgmail情報を取得するプログラムを作成する。ごちゃごちゃはしてる。
一応メール本文も取得するようにしてるけど、この記事では使わないし、どの程度役立つか知らないし、合ってるかどうかも知らない。もうほんと何も知らない。

google_gmail.py
from googleapiclient.discovery import build
from httplib2 import Http
from oauth2client import file
import base64
from datetime import datetime, timedelta
from pytz import timezone

# 現在時間からの取得対象時間を分で指定する
# gmailのAPIは最新日付からしか取得できないっぽいので、fromtoとか汎用的なものは作らないよ
def get_mail_info(n_minute):

    # 日本時間の現在時刻を取得
    utc_now = datetime.now(timezone('UTC'))
    jst_now = utc_now.astimezone(timezone('Asia/Tokyo'))

    # 現在日時を分単位で取得し、指定分を引く
    n_minutes_ago = jst_now.replace(second=0, microsecond=0) - timedelta(minutes=n_minute)
    n_minutes_epoc = int(n_minutes_ago.timestamp())

    store = file.Storage('token.json')
    creds = store.get()
    if not creds or creds.invalid:
        print('token.pickelを作ってください')
        return

    service = build('gmail', 'v1', http=creds.authorize(Http()))

    # メールIDの一覧を取得する(最大N件)
    messageIDlist = service.users().messages().list(userId="me", maxResults=10).execute()

    if not 'messages' in messageIDlist:
        print('1件もない!')
        return

    mail_info_list = []  # メール情報の辞書のリスト

    # メッセージIDを元に、メールの詳細情報を取得
    for message in messageIDlist['messages']:
        msg = service.users().messages().get(userId="me", id=message["id"]).execute()

        internalDate = int(msg['internalDate']) // 1000

        # UNIXエポック秒からdatetimeオブジェクトを作成
        received_time = datetime.fromtimestamp(internalDate)
        # datetimeオブジェクトを任意のフォーマットの文字列に変換
        received_time_str = received_time.strftime('%Y-%m-%d %H:%M:%S')
        hour_minute = str(received_time.hour) + '' + str(received_time.minute) + ''

        # 指定時間を過ぎたら終わり
        if internalDate < n_minutes_epoc:
            break

        Subject = ''
        Date = ''
        for part in msg["payload"]["headers"]:
            if part['name'] == 'From':     # 送信元アドレス
                FromAddress = part['value']
            elif part['name'] == 'Subject':  # 件名
                Subject = part['value']

        b64_message = ''
        # Such as text/plain
        if 'data' in msg['payload']['body']:
            b64_message = msg['payload']['body']['data']
        # Such as text/html
        elif 'parts' in msg['payload']:
            if 'data' in msg['payload']['parts'][0]['body']:
                b64_message = msg['payload']['parts'][0]['body']['data']

        honbun = base64.urlsafe_b64decode(b64_message)
        encoded_string = honbun.strip().decode('utf-8')

        # メール情報の辞書を作成し、リストに追加
        mail_info = {
            'From': FromAddress,
            'Subject': Subject,
            'ReceivedTime': received_time_str,
            'HourMinute': hour_minute,
            'Body': encoded_string
        }
        mail_info_list.append(mail_info)

    return mail_info_list

def main():

    mail_infos = get_mail_info(5)
    print(mail_infos)

if __name__ == '__main__':
    main()

メイン関数

上記までのプログラムを組み合わせたメインとなるプログラム
処理概要
・現在から5分前までのGメールを取得
・メールアドレスとメールの件名がマッチするかチェック
・指定発声内容をvoicevoxで音声ファイルに変換
・google cloud storage へ音声ファイルを保存
・pychromecastを使って上記公開URLを google homeへ再生要求

speak_google_home_by_gmail.py
import tempfile
import voicevox
import google_storage
import google_home
import google_gmail

def main():
    # メールアドレス、件名、発声内容
    GLISET = [
                 ['supermarket@xxxx.jp', '配達開始のご連絡', 'ネットスーパー 担当者さんが出発しました!'],
                 ['order-update@amazon.co.jp', 'ご注文商品の配達が完了しました', 'Amazonの置き配が完了しました!'],
             ]

    # 辞書のリストにしたいから。パイソニスタには程遠い
    target_list = []
    for x in GLISET:
        add = {
            'FromAddress': x[0],
            'Subject'    : x[1],
            'voice'      : x[2]
        }
        target_list.append(add)

    # 現在からN分前までのメール取得
    # ※cronからの呼び出し間隔と合わせる!
    mail_infos = google_gmail.get_mail_info(5)

    # 取得情報か探索
    for mail_info in mail_infos:
        # 発声リスト
        for x in target_list:
            # Fromアドレスチェック
            if not x['FromAddress'] in mail_info['From']:
                continue
            # 件名チェック
            if not x['Subject'] in mail_info['Subject']:
                continue

            # 指定発声内容をvoicevoxで一時ファイルに保存
            fp = tempfile.NamedTemporaryFile()
            voice_text = 'ピッ、ピッ、ピッ、ポーン!! ずんだもんがお知らせします!' + \
                         ' ' + mail_info['HourMinute'] + ' ' + x['voice']
            if not voicevox.put_wave(voice_text, fp.name, speaker_id = 3):
                print('音声ファイルの作成に失敗。。')
                continue

            # google storageに保存
            wave_path = google_storage.savefile(fp.name, 'google_home.wave')

            # google homeを使って音声ファイルの読み上げ!
            google_home.play_sound(wave_path)

if __name__ == '__main__':
    main()

cronに登録

もちろん自動起動させたいので、rootでcronに登録する。
aliasとかパスの問題でハマるので、絶対パス&pythonのバージョン指定で登録しておく。これがスマートなやり方なのかどうかは知らない。

crontab -e
以下を追記
*/5 9-19 * * * cd /home/ec2-user/project/speak_google_home; python3.9 speak_google_home_by_gmail.py

これで、9時から19時55分まで5分毎に起動する。

おわりに

これでやりたいことが完成した。
平日朝7時に天気予報情報をつけて目覚まし時計代わり、とか他にも色々できそう。
なんといってもVOICEVOXを使えるのがうれしい。ずんだもんに教えてもらうとか最高かよ

グローバルIP変わる問題

個人向けグローバルIPアドレスって気付いたらコソッと変わる。今までも何か月に1回か変わっていたような?
VPC設定でグローバルIPは指定しているので、変わると当然動かなくなってしまう。
変わったらメール通知するような仕組みを後で考えようかな

Google Nest Mini もIP変わる問題

ルーターのDHCPでIP割り振られているので、当然Google Nest MiniのIPも変わる。
pychromecastはIP指定で利用しているので、ここも動かなくなる。ヤマハのルーターで、このMACアドレスならこのIPアドレスにする、っていう設定はあるはず。長くなったので別記事にしよう

インターネット越しでGoogle Home に音声ファイルを再生させる試行錯誤の流れ

大変だったので書き残しておきたい。これでも省略している。

qiitaや関連サイトを参考にして、pychromecastを使えばローカル環境で鳴らせた。
しかしインターネット越しだと無理そうで、VPN対応ルーターが必要なことを理解する。
勉強のためと自分に言い聞かせてちょっと高いけど買う。

色々がんばる。VPN接続できた!鳴るやろ!

鳴らない

ルートテーブルにローカルIP入れたら、EC2インスタンスからGoogle Nest Miniにpingが通った!これもう鳴るやろ!

鳴らない

tcpdumpで確認するとマルチキャストパケットが出てる

マルチキャストなん・・? igmp・・?
IPマルチキャスト対応のヤマハルーターはRTX3000とRTX1500だけ?RTX1220はだめなんか?

心折れてくる。最後にpychromecastの公式ページみて諦めよ

google翻訳抜粋

マルチキャスト UDP は WiFI ルーターによって転送される必要があります。 一部の WiFi ルーターはマルチキャスト UDP トラフィックをドロップすることが知られています。


ほらやっぱダメやん。買ったルーターも対応してるか分からんし、wifiルーターの方でもダメかもしれんやん

これらの条件がすべて満たされていない場合、検出は機能しません。 これらの条件を満たすことが不可能な場合は、既知の IP アドレスまたはホスト名のリストを検出機能に渡すことができます。

!?
IP指定のライブラリがあるってこと? それならマルチキャストは使わんのちゃう!?

githubからソース落として見てみる

get_listed_chromecasts じゃなくて get_chromecast_from_host を使えばいいんか!?

鳴る

こんなに頑張ったけど

VPCって料金高いみたい!止めよう!(笑)
ほんと残念。安く済む他の方法探そう

感想

ネットワークって知らないことたくさんある。でもけっこう色々勉強になったので、やって良かった。達成感はあった。
あとヤマハのルーターを買ってから、お家のwifi接続が快適になってうれしい

参考

VPCの解説記事はとても参考になった。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0