0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

RealtimeAPIを用いてAIと会話するシステムを作ってみた(音声制御の改善)

Last updated at Posted at 2025-10-31

はじめに

Unitree G1は決まった音源しか再生できず、自由な音声出力が難しいという制約があります。
しかしG1とインタラクティブに会話したいというニーズがあり、また搭載アプリのChat機能は中国語と英語のみ対応しているのが現状です。
そこで、日本語で自然に会話できるようにするため、OpenAI Realtime API を活用して、リアルタイムで音声会話できる環境を構築しました。

参考にしたのは、以下の記事です:
🔗 https://qiita.com/YumaShinoki/items/195105fe5520dc005ef8 [1]

しかし実際に動かしてみると、次のような課題が発生しました。

🔊 音量を上げるとAIの返答をAI自身が拾ってハウリングが起きる

⚡ 突発的な短い音も会話として認識されてしまう

💡 強制停止依存な設計のため、安全に終了できない

本記事では、これらの課題を解決するために実装した 「突発音対策」 と 「ハウリング対策(RMS制御)」 、「スレッドとWebSocket通信の安全な終了設計」の仕組みを紹介します。

この記事でわかる・できること

  • OpenAI Realtime API を使って音声でAIと会話する方法
  • 突発的な短音を無視 して自然な会話を続ける方法
  • AIの返答をマイクが拾ってハウリングする問題 の対処方法

この記事の対象者

  • AI会話デモを実施したい人
  • Pythonでリアルタイム音声通信(録音/再生)を扱いたい人
  • OpenAI Realtime APIを活用したい人
  • ハウリングやノイズを抑えたい人

動作環境・使用するツールや言語

  • OS:Windows 10 / 11
  • 言語:Python 3.11
  • 使用モデル:gpt-4o-mini-realtime-preview-2024-12-17
  • ライブラリ:pyaudio, asyncio, websockets, numpy, threading
  • API:OpenAI Realtime API

突発的な音への対処

展示会などの環境では「カメラのシャッター音」「咳」「机を叩く音」などが頻繁に発生します。
これをAIが「話しかけられた」と誤認識すると、会話が途切れてしまいます。

そのため、以下のような 音声開始判定ロジック を導入しました。

# 一定以上の音量が連続したら音声開始とみなす(3フレーム = 約0.3秒)
MIN_ACTIVE_FRAMES = 3

音声RMS値をモニタリングし、
3フレーム(約0.3秒)以上音が続いたら「会話開始」と判断
それ未満は「突発音」とみなして無視
また、開始直後の音を切り捨てないように、直前3フレーム分もバッファに保持して送信します。

if not audio_started:
    if rms > rms_avg + START_THRESHOLD:
        pre_buffer.append(audio_data)
        active_count += 1
        if active_count >= MIN_ACTIVE_FRAMES:
            audio_started = True
            print("🎤 音声検出!送信開始")
            for frame in pre_buffer:
                audio_send_queue.put(frame)
            pre_buffer = []

→ これにより、「あっ」「ドン」などの短い大きな雑音は無視し、
自然に「こんにちは」「お願いします」といった発話から反応するようになります。

ハウリングへの対処(RMS制御)

AIが話している最中にマイクがその音を拾ってしまうと、
AIがAIの音声を再帰的に入力してしまい、ハウリングが起きます。

これを防ぐため、RMS(Root Mean Square:音量の平均平方根)を用いた制御を導入しました。

原理

音量を常にモニタリング(rms_avg)
平均音量より START_THRESHOLD(例: +1000) 上回ったら録音開始
平均音量より STOP_THRESHOLD(例: +500) 下がったら録音停止
これにより、AIの音声出力をノイズとして扱うことでハウリングを防ぎつつ、
ユーザーが話し始めたときに自動で録音を開始し、話し終えたあとに自動で録音を停止することができます。

if rms < rms_avg + STOP_THRESHOLD:
    silence_counter += 1
    if silence_counter >= SILENCE_FRAMES:
        audio_started = False
        print("🔇 音声停止、送信停止")

スレッドとWebSocket通信の安全な終了設計

終了時にスレッドが強制終了したり、WebSocket接続が不正に切断されたりしないように、安全に実行を終了する必要があります。

スレッドの安全な終了

スレッド(マイク入力と音声再生処理)には、
「終了シグナル用のフラグ」を設定し、プログラム終了時にこのフラグを立てることで、ループを安全に抜ける仕組みを導入しました。

mic_thread_running = threading.Event()

また、スレッドで利用している キュー(queue.Queue) に None を送信することで、スレッド側のループが自然に終了できるようにしました。

これにより、強制的な thread.join() や例外によって停止することなく、スレッドを終了させることが可能です。

リソース解放による、スレッドとWebSocketの安全なクローズ

通信中に発生する ConnectionClosedError や CancelledError などの例外も try-except でキャッチし、異常終了時にもリソースリークが起きないようにしました。

# WebSocket受信処理での例外ハンドリング
try:
    while True:
        response = await websocket.recv()
        ...
except websockets.exceptions.ConnectionClosedError as e:
    print(f"🔌 WebSocket切断: {e}")
except Exception as e:
    print(f"⚠️ 受信エラー: {e}")
finally:
    audio_receive_queue.put(None)
    print("🧹 音声受信終了")
# 全体のクリーンアップ処理
try:
    await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
    print("\n🛑 KeyboardInterrupt 検出、終了処理へ")
finally:
    mic_thread_running.clear()
    audio_send_queue.put(None)
    audio_receive_queue.put(None)
    if stream.is_active():
        stream.stop_stream()
    stream.close()
    if output_stream.is_active():
        output_stream.stop_stream()
    output_stream.close()
    p.terminate()
    print("🧹 クリーンアップ完了。終了します。")

動作

デモ動作になります。

全体のコードです。

コード全体
import asyncio 
import websockets
import pyaudio
import base64
import os
import json
import queue
import threading
import numpy as np
import time

## OpenAI APIキー
API_KEY = os.getenv("OPENAI_API_KEY")
if not API_KEY:
    raise ValueError("OPENAI_API_KEY環境変数が設定されていません")

WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-mini-realtime-preview-2024-12-17"
HEADERS = {"Authorization": "Bearer " + API_KEY, "OpenAI-Beta": "realtime=v1"}

## キューを初期化
audio_send_queue = queue.Queue()
audio_receive_queue = queue.Queue()

## RMS判定パラメータ
RMS_WINDOW_SIZE = 30
START_THRESHOLD = 250
STOP_THRESHOLD = 50
SILENCE_FRAMES = 25
MIN_ACTIVE_FRAMES = 3

# マイク送信スレッド制御イベント
mic_thread_running = threading.Event()

# PCM16変換
def base64_to_pcm16(base64_audio):
    return base64.b64decode(base64_audio)

# ----------------- マイク入力(RMS制御) -----------------
def read_audio_rms_control(stream, chunk):
    rms_window = []
    silence_counter = 0
    audio_started = False
    active_count = 0
    pre_buffer = []

    while True:
        if not mic_thread_running.is_set():
            time.sleep(0.01)
            continue

        audio_data = stream.read(chunk, exception_on_overflow=False)
        pcm16 = np.frombuffer(audio_data, dtype=np.int16)
        rms = np.sqrt(np.mean(pcm16.astype(np.float32) ** 2))

        rms_window.append(rms)
        if len(rms_window) > RMS_WINDOW_SIZE:
            rms_window.pop(0)
        rms_avg = np.mean(rms_window)

        if not audio_started:
            if rms > rms_avg + START_THRESHOLD:
                pre_buffer.append(audio_data)
                active_count += 1
                if active_count >= MIN_ACTIVE_FRAMES:
                    audio_started = True
                    print("🎤 音声検出!送信開始")
                    for frame in pre_buffer:
                        audio_send_queue.put(frame)
                    pre_buffer = []
            else:
                active_count = 0
                pre_buffer = []
        else:
            audio_send_queue.put(audio_data)
            if rms < rms_avg + STOP_THRESHOLD:
                silence_counter += 1
                if silence_counter >= SILENCE_FRAMES:
                    audio_started = False
                    print("🔇 音声停止、送信停止")
            else:
                silence_counter = 0

# ----------------- WebSocket送信 -----------------
async def send_audio_from_queue(websocket):
    print("【システム】マイクと接続中...")
    while mic_thread_running.is_set():
        audio_data = await asyncio.get_event_loop().run_in_executor(None, audio_send_queue.get)
        if audio_data is None:
            await asyncio.sleep(0.05)
            continue
        base64_audio = base64.b64encode(audio_data).decode("utf-8")
        audio_event = {"type": "input_audio_buffer.append", "audio": base64_audio}
        try:
            await websocket.send(json.dumps(audio_event))
        except Exception as e:
            print(f"⚠️ 送信エラー: {e}")
            break
        await asyncio.sleep(0)

# ----------------- WebSocket受信 -----------------
async def receive_audio_to_queue(websocket):
    init_request = {
        "type": "session.update",
        "session": {
            "modalities": ["audio", "text"],
            "instructions": "あなたは日本語で話すAIです。いつも短い返答しかしません。また、自分に体があるように振舞ってください。たとえば握手してくださいと聞かれたら「もちろん」と答えてください。",
            "voice": "ash",
            "turn_detection": {"type": "server_vad", "threshold": 0.9},
            "input_audio_transcription": {"model": "whisper-1"},
        }
    }
    await websocket.send(json.dumps(init_request))
    await websocket.send(json.dumps({"type": "response.create"}))

    try:
        while True:
            response = await websocket.recv()
            if not response:
                continue

            data = json.loads(response)

            if data.get("type") == "response.audio_transcript.delta":
                print(data["delta"], end="", flush=True)

            elif data.get("type") == "response.done":
                print("\n【AI 】", end="", flush=True)

            elif data.get("type") == "conversation.item.input_audio_transcription.completed":
                user_text = data["transcript"].strip()
                print("【ユーザー】", user_text)

                if user_text in ["終了", "またね", "さようなら", "さよなら", "終わり", "ばいばい"]:
                    print("👋 終了します。")
                    mic_thread_running.clear()
                    audio_send_queue.put(None)
                    audio_receive_queue.put(None)
                    await websocket.close()
                    return

            elif data.get("type") == "response.audio.delta":
                base64_audio_response = data["delta"]
                if base64_audio_response:
                    pcm_data = base64_to_pcm16(base64_audio_response)
                    if pcm_data:
                        audio_receive_queue.put(pcm_data)

    except websockets.exceptions.ConnectionClosedError as e:
        print(f"🔌 WebSocket切断: {e}")
    except Exception as e:
        print(f"⚠️ 受信エラー: {e}")
    finally:
        audio_receive_queue.put(None)
        print("🧹 音声受信終了")

# ----------------- 音声再生 -----------------
def play_audio_from_queue(output_stream):
    while mic_thread_running.is_set():
        try:
            pcm16_audio = audio_receive_queue.get(timeout=0.1)
            if pcm16_audio is None:
                break
            output_stream.write(pcm16_audio)
        except queue.Empty:
            continue
        except OSError:
            break
    print("🎧 再生スレッド終了")

# ----------------- メイン -----------------
async def stream_audio_and_receive_response():
    async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket:
        print("【システム】WebSocketに接続しました")

        INPUT_CHUNK = 2400
        OUTPUT_CHUNK = 2400
        FORMAT = pyaudio.paInt16
        CHANNELS = 1
        INPUT_RATE = 24000
        OUTPUT_RATE = 24000

        p = pyaudio.PyAudio()
        stream = p.open(format=FORMAT, channels=CHANNELS, rate=INPUT_RATE, input=True, frames_per_buffer=INPUT_CHUNK)
        output_stream = p.open(format=FORMAT, channels=CHANNELS, rate=OUTPUT_RATE, output=True, frames_per_buffer=OUTPUT_CHUNK)

        mic_thread_running.set()
        threading.Thread(target=read_audio_rms_control, args=(stream, INPUT_CHUNK), daemon=True).start()
        threading.Thread(target=play_audio_from_queue, args=(output_stream,), daemon=True).start()

        print("【システム】マイク入力およびサーバー音声再生開始")

        send_task = asyncio.create_task(send_audio_from_queue(websocket))
        receive_task = asyncio.create_task(receive_audio_to_queue(websocket))

        try:
            await asyncio.gather(send_task, receive_task)
        except KeyboardInterrupt:
            print("\n🛑 KeyboardInterrupt 検出、終了処理へ")
        finally:
            mic_thread_running.clear()
            audio_send_queue.put(None)
            audio_receive_queue.put(None)
            if stream.is_active():
                stream.stop_stream()
            stream.close()
            if output_stream.is_active():
                output_stream.stop_stream()
            output_stream.close()
            p.terminate()
            print("🧹 クリーンアップ完了。終了します。")

if __name__ == "__main__":
    try:
        asyncio.run(stream_audio_and_receive_response())
    except KeyboardInterrupt:
        print("\n🛑 強制終了しました。")
    finally:
        print("✅ プログラムを正常に終了しました。")

課題・今後の改善

文字起こしの揺れ

終了 / さようなら と言うとプログラムを終了する仕組みは、音声ではなく、音声を文字お越しして判定しています。そのため、「終了。」や「さよなら」といった文字起こしのバリエーションによって終了判定が漏れる場合があります。

閾値設定の難しさ

展示会のように突発的な雑音が生じる頻度によって、
START_THRESHOLD / STOP_THRESHOLD の値を環境ごとに調整する必要があります。
→ 将来的には自動キャリブレーション機能を入れたいところです。

参考資料

[1]

おわりに・まとめ

今回は、突発音への対策とハウリング抑制、安全な終了方法に焦点を当てて紹介しました。
性能悪化することなく、ノイズ強化を行うことができ、
展示会やイベントなど、ノイズが多い環境でも快適にAIと会話できるようになります。

今後は以下を目指して改善していきます👇

  • 自動閾値調整
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?