1. はじめに
Alexaって便利ですよね。
もし、Alexaのように話しかけて動くPCがあればかっこいいと思いませんか。
(そういうサービスがすでにあるかどうかは知りません。)
少なくともWindowsにはSiriのような機能は無いと思っています。
…無いなら、作ればいいですかね。
2. 何ができるの
① PC起動と同時に作成したAssistantBotが裏で起動
② F9キーを押すとマイクが起動して録音開始
③ 話しかけた内容に対して、返答が返ってくる
○○を検索して:検索ウィンドウが開く
上記以外:ChatGPTの回答がスピーカーから出力
④ botは常駐/タスクトレイから終了させることも可能
3. 全体図
以下が最終的な全体の流れです
① ローカルPCからマイクで入力された音声ファイルをS3に保存
② S3の保存をトリガーにLambdaが音声ファイルを取得
③ 音声ファイルをTranscribeでテキスト化
④ テキスト化された文章をOpenAIに投げて回答を作成
⑤ 回答結果をPollyで音声ファイルに変換
⑥ 回答の音声ファイルをS3に保存
⑦ SQSに処理が完了したことを通知
⑧ ローカルPCがS3から回答結果の音声ファイルを取得してスピーカーで出力
4. 著者の環境
OS:Windows11
Python:3.11.9
5. 使用したAWSのサービス(スキップ可能?)
AWSで使用したサービスをざっくりと説明します。(ざっくり過ぎて説明ではない)
詳しくは公式サイトを参考にしてください。
5-1. Amazon S3
Amazon S3(Simple Storage Service)は、AWSが提供するオブジェクトストレージサービス。安い。使いやすい。
5-2. AWS Lambda
AWS Lambdaは、サーバーレスコンピューティングサービス。
コードさえ書ければ動かせちゃう。便利。
5-3. Amazon Transcribe
Amazon Transcribeは、機械学習を利用した音声認識サービス。
音声をテキストに変換してくれる。
5-4. Amazon Polly
Amazon Polly は、文章をリアルな音声に変換するサービス。
テキストを音声に変換してくれる。
5-5. Amazon SQS
Amazon SQS(Simple Queue Service)は、システムやプロセス間でデータを非同期でやり取りするメッセージキューサービス
6. 具体的なコード内容
具体的なコードの中身というよりは、各ファイルが何をしているのかが重要だと思ったので、コードは気になった方だけどうぞ。
長くなるので、特にここでは具体的なコードの説明はしない。
(本当はしたい。どっかで別で出そうかな…)
6-1. ローカルPC側
PCのマイクとスピーカーを使用する仕様上、録音、出力はPC側で処理する必要があり、クラウドと混合する形になりました。(使用と仕様は狙いました。)
ローカルのフォルダ構成
voice-assistant/
├── .env
├── icon.png # トレイアイコン用画像
├── pipeline_runner.py
├── tray_app.py
├── dist/
│ ├── tray_app.exe
│ └── (その他ビルド生成物)
└── build/
.env
AWSの認証情報やS3/SQSの設定値を記載した環境設定ファイル
コード
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
AWS_REGION=
S3_BUCKET=
S3_INPUT_KEY=
S3_OUTPUT_KEY=
SQS_QUEUE_URL=
OPENAI_API_KEY =
pipline_runner.py
F9キー録音〜SQS受信・再生までの実装
コード
import keyboard
import sounddevice as sd
import soundfile as sf
import time
import os
import boto3
import pygame
import json
import webbrowser
from dotenv import load_dotenv
load_dotenv()
s3 = boto3.client(
's3',
region_name = os.getenv("AWS_REGION"),
aws_access_key_id = os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key = os.getenv("AWS_SECRET_ACCESS_KEY")
)
sqs = boto3.client(
'sqs',
region_name=os.getenv("AWS_REGION"),
aws_access_key_id=os.getenv("AWS_ACCESS_KEY_ID"),
aws_secret_access_key=os.getenv("AWS_SECRET_ACCESS_KEY")
)
DURATION = 4
SAMPLE_RATE = 44100
# PCマイクから音声を録音し、WAVファイルとして保存
def record_audio(filename="input.wav"):
recording = sd.rec(int(DURATION * SAMPLE_RATE), samplerate=SAMPLE_RATE, channels=1)
sd.wait()
sf.write(filename, recording, SAMPLE_RATE)
# 録音したWAVファイルをAWS S3にアップロード
def upload_to_s3():
s3.upload_file("input.wav", os.getenv("S3_BUCKET"), os.getenv("S3_INPUT_KEY"))
# クラウド処理の完了をSQS通知で待機・取得
def wait_for_sqs_message(queue_url, total_timeout=60):
start = time.time()
while time.time() - start < total_timeout:
response = sqs.receive_message(
QueueUrl=queue_url,
WaitTimeSeconds=20,
MaxNumberOfMessages=1
)
messages = response.get('Messages', [])
if messages:
body = json.loads(messages[0]['Body'])
receipt_handle = messages[0]['ReceiptHandle']
sqs.delete_message(QueueUrl=queue_url, ReceiptHandle=receipt_handle)
return body
return None
# 取得した音声ファイル(MP3)をPCスピーカーで再生
def play_audio(file_path="output.mp3"):
try:
pygame.mixer.init()
pygame.mixer.music.load(file_path)
pygame.mixer.music.play()
while pygame.mixer.music.get_busy():
time.sleep(0.1)
pygame.mixer.music.stop()
pygame.mixer.quit()
except Exception as e:
print("再生エラー")
# 使用済み音声ファイルを削除
def cleanup():
for f in ["input.wav", "output.mp3"]:
if os.path.exists(f):
os.remove(f)
def run_pipeline():
while not keyboard.is_pressed("F9"):
time.sleep(0.1)
while keyboard.is_pressed("F9"):
time.sleep(0.1)
record_audio()
upload_to_s3()
sqs_message = wait_for_sqs_message(os.getenv("SQS_QUEUE_URL"))
print(sqs_message)
if sqs_message:
message_type = sqs_message.get("type")
if message_type == "search": # 検索してくださいの処理
keyword = sqs_message.get("keyword", "").strip()
print(keyword)
webbrowser.open(f"https://www.google.com/search?q={keyword}")
elif message_type == "audio": # 上記以外の処理
bucket = sqs_message["bucket"]
key = sqs_message["key"]
s3.download_file(bucket, key, "output.mp3")
play_audio()
cleanup()
time.sleep(1)
tray_app.py
トレイアイコン実装&バックグラウンド常駐起動
コード
import threading
import time
from pystray import Icon, Menu, MenuItem
from PIL import Image
from pipeline_runner import run_pipeline
running = True
processing = False
def main_loop():
global processing
while running:
if not processing:
processing = True
run_pipeline()
processing = False
time.sleep(1)
def on_quit(icon, item):
global running
running = False
icon.stop()
def setup_tray():
icon_path = "icon.png"
image = Image.open(icon_path)
menu = Menu(MenuItem("終了", on_quit))
icon = Icon("VoiceAssistant", image, "アシスタント起動中", menu)
thread = threading.Thread(target=main_loop, daemon=True)
thread.start()
icon.run()
if __name__ == "__main__":
setup_tray()
dist/tray_app.exe
tray_app.py を PyInstaller でビルドした実行形式ファイル。
Windows環境でダブルクリックまたはスタートアップ自動起動に設定して常駐運用できる。
build/
PyInstaller がビルド時に使用する一時ファイル群
6-2. AWS Lambda側
① S3から録音されたファイルを取得
② Transcribeで音声ファイルを文字起こし
③ 録音の文字起こし結果をChatGPTのAPIに投げ、回答生成
④ 生成結果を取得してPollyで音声ファイルに変換
⑤ S3に保存してSQSに保存完了を通知
コード
import boto3
import os
import time
import json
import urllib.request
import openai
s3 = boto3.client("s3")
transcribe = boto3.client("transcribe")
polly = boto3.client("polly")
sqs = boto3.client("sqs")
openai.api_key = os.environ["OPENAI_API_KEY"]
# ChatGPTから返答を取得
def get_chatgpt_response(prompt: str) -> str:
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "あなたは親しみやすいアシスタントです。"},
{"role": "user", "content": prompt}
],
max_tokens=10,
temperature=0.7
)
return response.choices[0].message['content'].strip()
# SQS通知送信
def notify_via_sqs(bucket, message: dict):
sqs.send_message(
QueueUrl=os.environ["SQS_QUEUE_URL"],
MessageBody=json.dumps(message)
)
def handler(event, context):
bucket = event['Records'][0]['s3']['bucket']['name']
key = event['Records'][0]['s3']['object']['key']
input_uri = f"s3://{bucket}/{key}"
job_name = f"transcribe-{int(time.time())}"
# 音声認識(文字起こし)
transcribe.start_transcription_job(
TranscriptionJobName = job_name,
Media={'MediaFileUri': input_uri},
MediaFormat='wav',
LanguageCode = "ja-JP",
)
while True:
result = transcribe.get_transcription_job(TranscriptionJobName=job_name)
status = result['TranscriptionJob']['TranscriptionJobStatus']
if status in ['COMPLETED', 'FAILED']:
break
time.sleep(2)
transcript_url = result['TranscriptionJob']['Transcript']['TranscriptFileUri']
response = urllib.request.urlopen(transcript_url)
transcript_text = json.loads(response.read())['results']['transcripts'][0]['transcript']
# 分岐処理(検索 or ChatGPT返答)
if "を検索して" in transcript_text:
keyword = transcript_text.split("を検索して")[0].strip()
if keyword:
notify_via_sqs(bucket, {"type": "search", "keyword": keyword})
else:
return {'statusCode': 200, 'body': json.dumps('キーワードなし')}
else:
answer = get_chatgpt_response(transcript_text)
polly_response = polly.synthesize_speech(
Text=answer,
OutputFormat="mp3",
VoiceId="Mizuki",
LanguageCode="ja-JP"
)
audio_key = "output/output.mp3"
s3.put_object(
Body=polly_response['AudioStream'].read(),
Bucket=bucket,
Key=audio_key
)
notify_via_sqs(bucket, {"type": "audio", "bucket": bucket, "key": audio_key})
return {'statusCode': 200, 'body': json.dumps('完了')}
7. つまずいたところ
AWS LambdaでOpenAIのライブラリを使う
openaiのライブラリを使用するためにレイヤーを追加して動かしたらエラーが発生。
エラー内容:No module named 'jiter.jiter'
いろいろ漁りましたが、結局解決したのはバージョンしかなかった。。。
直接解決に使用した参考サイトを添付しておきます
非同期処理による弊害
ローカルPCから回答結果を出力する関係上、クラウドの処理の実行完了を待たなくてはならない。
回答結果がS3に保存される前にローカルがS3から回答を持ってこようとするため、更新前の回答結果(ひとつ前の回答結果)を出力してしまう。
ChatGPTにどうするかを聞いてみた結果
改善案:S3オブジェクトのタイムスタンプ確認
んー、さすがにめんどくさいなあ。他にないですか
改善案:Lambda完了通知をSQSやSNSで受け取る
お!ええやん!
ホットワード起動
本当はSiriやAlexaのように何か特定のワード(Alexa:「Alexa!」のような)を言って動かそうと思ったが、Windowsでそれをやるのがめんどくさそうだったので一旦断念。。。
いつかやりたい
8. おわりに
ちょうどAWSの資格勉強中で、多くのサービスを触ることができたことが勉強になった。