はじめに
Unitree G1は決まった音源しか再生できず、自由な音声出力が難しいという制約があります。
しかしG1とインタラクティブに会話したいというニーズがあり、また搭載アプリのChat機能は中国語と英語のみ対応しているのが現状です。
そこで、日本語で自然に会話できるようにするため、OpenAI Realtime API を活用して、リアルタイムで音声会話できる環境を構築しました。
参考にしたのは、以下の記事です:
🔗 https://qiita.com/YumaShinoki/items/195105fe5520dc005ef8 [1]
しかし実際に動かしてみると、次のような課題が発生しました。
🔊 音量を上げるとAIの返答をAI自身が拾ってハウリングが起きる
⚡ 突発的な短い音も会話として認識されてしまう
💡 強制停止依存な設計のため、安全に終了できない
本記事では、これらの課題を解決するために実装した 「突発音対策」 と 「ハウリング対策(RMS制御)」 、「スレッドとWebSocket通信の安全な終了設計」の仕組みを紹介します。
この記事でわかる・できること
- OpenAI Realtime API を使って音声でAIと会話する方法
- 突発的な短音を無視 して自然な会話を続ける方法
- AIの返答をマイクが拾ってハウリングする問題 の対処方法
この記事の対象者
- AI会話デモを実施したい人
- Pythonでリアルタイム音声通信(録音/再生)を扱いたい人
- OpenAI Realtime APIを活用したい人
- ハウリングやノイズを抑えたい人
動作環境・使用するツールや言語
- OS:Windows 10 / 11
- 言語:Python 3.11
- 使用モデル:gpt-4o-mini-realtime-preview-2024-12-17
- ライブラリ:pyaudio, asyncio, websockets, numpy, threading
- API:OpenAI Realtime API
突発的な音への対処
展示会などの環境では「カメラのシャッター音」「咳」「机を叩く音」などが頻繁に発生します。
これをAIが「話しかけられた」と誤認識すると、会話が途切れてしまいます。
そのため、以下のような 音声開始判定ロジック を導入しました。
# 一定以上の音量が連続したら音声開始とみなす(3フレーム = 約0.3秒)
MIN_ACTIVE_FRAMES = 3
音声RMS値をモニタリングし、
3フレーム(約0.3秒)以上音が続いたら「会話開始」と判断
それ未満は「突発音」とみなして無視
また、開始直後の音を切り捨てないように、直前3フレーム分もバッファに保持して送信します。
if not audio_started:
if rms > rms_avg + START_THRESHOLD:
pre_buffer.append(audio_data)
active_count += 1
if active_count >= MIN_ACTIVE_FRAMES:
audio_started = True
print("🎤 音声検出!送信開始")
for frame in pre_buffer:
audio_send_queue.put(frame)
pre_buffer = []
→ これにより、「あっ」「ドン」などの短い大きな雑音は無視し、
自然に「こんにちは」「お願いします」といった発話から反応するようになります。
ハウリングへの対処(RMS制御)
AIが話している最中にマイクがその音を拾ってしまうと、
AIがAIの音声を再帰的に入力してしまい、ハウリングが起きます。
これを防ぐため、RMS(Root Mean Square:音量の平均平方根)を用いた制御を導入しました。
原理
音量を常にモニタリング(rms_avg)
平均音量より START_THRESHOLD(例: +1000) 上回ったら録音開始
平均音量より STOP_THRESHOLD(例: +500) 下がったら録音停止
これにより、AIの音声出力をノイズとして扱うことでハウリングを防ぎつつ、
ユーザーが話し始めたときに自動で録音を開始し、話し終えたあとに自動で録音を停止することができます。
if rms < rms_avg + STOP_THRESHOLD:
silence_counter += 1
if silence_counter >= SILENCE_FRAMES:
audio_started = False
print("🔇 音声停止、送信停止")
スレッドとWebSocket通信の安全な終了設計
終了時にスレッドが強制終了したり、WebSocket接続が不正に切断されたりしないように、安全に実行を終了する必要があります。
スレッドの安全な終了
スレッド(マイク入力と音声再生処理)には、
「終了シグナル用のフラグ」を設定し、プログラム終了時にこのフラグを立てることで、ループを安全に抜ける仕組みを導入しました。
mic_thread_running = threading.Event()
また、スレッドで利用している キュー(queue.Queue) に None を送信することで、スレッド側のループが自然に終了できるようにしました。
これにより、強制的な thread.join() や例外によって停止することなく、スレッドを終了させることが可能です。
リソース解放による、スレッドとWebSocketの安全なクローズ
通信中に発生する ConnectionClosedError や CancelledError などの例外も try-except でキャッチし、異常終了時にもリソースリークが起きないようにしました。
# WebSocket受信処理での例外ハンドリング
try:
while True:
response = await websocket.recv()
...
except websockets.exceptions.ConnectionClosedError as e:
print(f"🔌 WebSocket切断: {e}")
except Exception as e:
print(f"⚠️ 受信エラー: {e}")
finally:
audio_receive_queue.put(None)
print("🧹 音声受信終了")
# 全体のクリーンアップ処理
try:
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
print("\n🛑 KeyboardInterrupt 検出、終了処理へ")
finally:
mic_thread_running.clear()
audio_send_queue.put(None)
audio_receive_queue.put(None)
if stream.is_active():
stream.stop_stream()
stream.close()
if output_stream.is_active():
output_stream.stop_stream()
output_stream.close()
p.terminate()
print("🧹 クリーンアップ完了。終了します。")
動作
デモ動作になります。
全体のコードです。
コード全体
import asyncio
import websockets
import pyaudio
import base64
import os
import json
import queue
import threading
import numpy as np
import time
## OpenAI APIキー
API_KEY = os.getenv("OPENAI_API_KEY")
if not API_KEY:
raise ValueError("OPENAI_API_KEY環境変数が設定されていません")
WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-mini-realtime-preview-2024-12-17"
HEADERS = {"Authorization": "Bearer " + API_KEY, "OpenAI-Beta": "realtime=v1"}
## キューを初期化
audio_send_queue = queue.Queue()
audio_receive_queue = queue.Queue()
## RMS判定パラメータ
RMS_WINDOW_SIZE = 30
START_THRESHOLD = 250
STOP_THRESHOLD = 50
SILENCE_FRAMES = 25
MIN_ACTIVE_FRAMES = 3
# マイク送信スレッド制御イベント
mic_thread_running = threading.Event()
# PCM16変換
def base64_to_pcm16(base64_audio):
return base64.b64decode(base64_audio)
# ----------------- マイク入力(RMS制御) -----------------
def read_audio_rms_control(stream, chunk):
rms_window = []
silence_counter = 0
audio_started = False
active_count = 0
pre_buffer = []
while True:
if not mic_thread_running.is_set():
time.sleep(0.01)
continue
audio_data = stream.read(chunk, exception_on_overflow=False)
pcm16 = np.frombuffer(audio_data, dtype=np.int16)
rms = np.sqrt(np.mean(pcm16.astype(np.float32) ** 2))
rms_window.append(rms)
if len(rms_window) > RMS_WINDOW_SIZE:
rms_window.pop(0)
rms_avg = np.mean(rms_window)
if not audio_started:
if rms > rms_avg + START_THRESHOLD:
pre_buffer.append(audio_data)
active_count += 1
if active_count >= MIN_ACTIVE_FRAMES:
audio_started = True
print("🎤 音声検出!送信開始")
for frame in pre_buffer:
audio_send_queue.put(frame)
pre_buffer = []
else:
active_count = 0
pre_buffer = []
else:
audio_send_queue.put(audio_data)
if rms < rms_avg + STOP_THRESHOLD:
silence_counter += 1
if silence_counter >= SILENCE_FRAMES:
audio_started = False
print("🔇 音声停止、送信停止")
else:
silence_counter = 0
# ----------------- WebSocket送信 -----------------
async def send_audio_from_queue(websocket):
print("【システム】マイクと接続中...")
while mic_thread_running.is_set():
audio_data = await asyncio.get_event_loop().run_in_executor(None, audio_send_queue.get)
if audio_data is None:
await asyncio.sleep(0.05)
continue
base64_audio = base64.b64encode(audio_data).decode("utf-8")
audio_event = {"type": "input_audio_buffer.append", "audio": base64_audio}
try:
await websocket.send(json.dumps(audio_event))
except Exception as e:
print(f"⚠️ 送信エラー: {e}")
break
await asyncio.sleep(0)
# ----------------- WebSocket受信 -----------------
async def receive_audio_to_queue(websocket):
init_request = {
"type": "session.update",
"session": {
"modalities": ["audio", "text"],
"instructions": "あなたは日本語で話すAIです。いつも短い返答しかしません。また、自分に体があるように振舞ってください。たとえば握手してくださいと聞かれたら「もちろん」と答えてください。",
"voice": "ash",
"turn_detection": {"type": "server_vad", "threshold": 0.9},
"input_audio_transcription": {"model": "whisper-1"},
}
}
await websocket.send(json.dumps(init_request))
await websocket.send(json.dumps({"type": "response.create"}))
try:
while True:
response = await websocket.recv()
if not response:
continue
data = json.loads(response)
if data.get("type") == "response.audio_transcript.delta":
print(data["delta"], end="", flush=True)
elif data.get("type") == "response.done":
print("\n【AI 】", end="", flush=True)
elif data.get("type") == "conversation.item.input_audio_transcription.completed":
user_text = data["transcript"].strip()
print("【ユーザー】", user_text)
if user_text in ["終了", "またね", "さようなら", "さよなら", "終わり", "ばいばい"]:
print("👋 終了します。")
mic_thread_running.clear()
audio_send_queue.put(None)
audio_receive_queue.put(None)
await websocket.close()
return
elif data.get("type") == "response.audio.delta":
base64_audio_response = data["delta"]
if base64_audio_response:
pcm_data = base64_to_pcm16(base64_audio_response)
if pcm_data:
audio_receive_queue.put(pcm_data)
except websockets.exceptions.ConnectionClosedError as e:
print(f"🔌 WebSocket切断: {e}")
except Exception as e:
print(f"⚠️ 受信エラー: {e}")
finally:
audio_receive_queue.put(None)
print("🧹 音声受信終了")
# ----------------- 音声再生 -----------------
def play_audio_from_queue(output_stream):
while mic_thread_running.is_set():
try:
pcm16_audio = audio_receive_queue.get(timeout=0.1)
if pcm16_audio is None:
break
output_stream.write(pcm16_audio)
except queue.Empty:
continue
except OSError:
break
print("🎧 再生スレッド終了")
# ----------------- メイン -----------------
async def stream_audio_and_receive_response():
async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket:
print("【システム】WebSocketに接続しました")
INPUT_CHUNK = 2400
OUTPUT_CHUNK = 2400
FORMAT = pyaudio.paInt16
CHANNELS = 1
INPUT_RATE = 24000
OUTPUT_RATE = 24000
p = pyaudio.PyAudio()
stream = p.open(format=FORMAT, channels=CHANNELS, rate=INPUT_RATE, input=True, frames_per_buffer=INPUT_CHUNK)
output_stream = p.open(format=FORMAT, channels=CHANNELS, rate=OUTPUT_RATE, output=True, frames_per_buffer=OUTPUT_CHUNK)
mic_thread_running.set()
threading.Thread(target=read_audio_rms_control, args=(stream, INPUT_CHUNK), daemon=True).start()
threading.Thread(target=play_audio_from_queue, args=(output_stream,), daemon=True).start()
print("【システム】マイク入力およびサーバー音声再生開始")
send_task = asyncio.create_task(send_audio_from_queue(websocket))
receive_task = asyncio.create_task(receive_audio_to_queue(websocket))
try:
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
print("\n🛑 KeyboardInterrupt 検出、終了処理へ")
finally:
mic_thread_running.clear()
audio_send_queue.put(None)
audio_receive_queue.put(None)
if stream.is_active():
stream.stop_stream()
stream.close()
if output_stream.is_active():
output_stream.stop_stream()
output_stream.close()
p.terminate()
print("🧹 クリーンアップ完了。終了します。")
if __name__ == "__main__":
try:
asyncio.run(stream_audio_and_receive_response())
except KeyboardInterrupt:
print("\n🛑 強制終了しました。")
finally:
print("✅ プログラムを正常に終了しました。")
課題・今後の改善
文字起こしの揺れ
終了 / さようなら と言うとプログラムを終了する仕組みは、音声ではなく、音声を文字お越しして判定しています。そのため、「終了。」や「さよなら」といった文字起こしのバリエーションによって終了判定が漏れる場合があります。
閾値設定の難しさ
展示会のように突発的な雑音が生じる頻度によって、
START_THRESHOLD / STOP_THRESHOLD の値を環境ごとに調整する必要があります。
→ 将来的には自動キャリブレーション機能を入れたいところです。
参考資料
[1]
おわりに・まとめ
今回は、突発音への対策とハウリング抑制、安全な終了方法に焦点を当てて紹介しました。
性能悪化することなく、ノイズ強化を行うことができ、
展示会やイベントなど、ノイズが多い環境でも快適にAIと会話できるようになります。
今後は以下を目指して改善していきます👇
- 自動閾値調整