背景
音声対話システムを作っていると、いくつかの処理を並列で動かしたくなることがあります。
例えば、LLM音声対話システムの応答を高速化してみたでは音声認識(STT)の完了を待つことによる遅延を軽減するため、STTと発話区間検出(VAD)を並列に動かし、発話完了の予測はSTTより高速なVADで実施しています。
今回はこの実装にならって、STTとVADをpythonのthreadingで並列に実行し、スレッド間で結果を共有し、STTとVADの結果を組み合わせた処理をする方法を検討します。
実装1: STTとVADの結果を標準出力
環境
- macOS M1
- python 3.11.2
準備
STTはGoogle CloudのSpeech-to-Text APIを使用します。
google cloudのコンソール上でクレデンシャルのjsonを作成、ダウンロードし、環境変数にパスを登録します。以下などが参考になると思います。
今回はクレデンシャルのjsonをダウンロード後、.env
ファイルにパスを書き、python-dotenvでロードすることにします。
GOOGLE_APPLICATION_CREDENTIALS=[jsonのファイル名].json
必要なライブラリをpipでインストールします。仮想環境は必要に応じて作成してください。
(記入漏れで以下以外にも必要なものがあるかもしれません。適宜pip等でインストールしてください)
pip install webrtcvad google-cloud-speech python-dotenv
コード
ASRとVADを並列に実行し、それぞれの結果を逐次、標準出力するスクリプトです。
import pyaudio
import threading
import time
import queue
import sys
import webrtcvad
from google.cloud import speech
import dotenv
dotenv.load_dotenv()
RATE = 16000
# 最小単位を10msとする(10ms * 16000Hz = 160サンプル、16bit=2バイトなので320バイト)
BASE_CHUNK = 160
STT_CHUNK = 1600
VAD_CHUNK = 160
BYTE_PER_SAMPLE = 2 # 16bit
class GoogleSpeechRecognition(threading.Thread):
def __init__(self, stt_queue: queue.Queue,
callback=None,
chunk: int = STT_CHUNK,
rate: int = RATE,
single_utterance: bool = False,
max_buffer_size: int | None= None):
super().__init__(daemon=False)
self.stt_queue = stt_queue
self.callback = callback
self.rate = rate
self.chunk = chunk
self.chunk_bytes = chunk * BYTE_PER_SAMPLE
self.max_buffer_size = max_buffer_size or chunk * 10
self.single_utterance = single_utterance
self.stop_event = threading.Event()
def run(self):
# STTは100msごと(STT_NUM_FRAMES個のBASEフレーム)をまとめて認識リクエストに送る想定
client = speech.SpeechClient()
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=self.rate,
language_code="ja-JP",
)
streaming_config = speech.StreamingRecognitionConfig(
config=config,
interim_results=True,
single_utterance=self.single_utterance,
)
def request_generator():
buffer = b""
while not self.stop_event.is_set():
try:
base_frame = self.stt_queue.get(timeout=0.1)
except queue.Empty:
continue
buffer += base_frame
if len(buffer) >= self.chunk_bytes:
yield speech.StreamingRecognizeRequest(audio_content=buffer[:self.chunk_bytes])
buffer = buffer[self.chunk_bytes:]
# バッファが大きくなりすぎないようにクリア
if len(buffer) > self.max_buffer_size:
buffer = buffer[-self.max_buffer_size:]
raise StopIteration()
responses = client.streaming_recognize(
config=streaming_config, requests=request_generator()
)
try:
for response in responses:
if self.stop_event.is_set():
break
if not response.results:
continue
result = response.results[0]
if result.alternatives:
transcript = result.alternatives[0].transcript
result_type = "final" if result.is_final else "interim"
if self.callback:
self.callback({"type": result_type, "text": transcript})
if result_type == "final":
break
except Exception as e:
print("[STT] Error:", e)
def stop(self):
self.stop_event.set()
class GoogleWebRTCVAD(threading.Thread):
def __init__(self, vad_queue: queue.Queue,
callback=None,
rate: int = RATE,
chunk: int = VAD_CHUNK,
max_buffer_size: int | None= None):
super().__init__(daemon=False)
self.vad_queue = vad_queue
self.callback = callback
self.rate = rate
self.chunk = chunk
self.chunk_bytes = chunk * BYTE_PER_SAMPLE
self.max_buffer_size = max_buffer_size or chunk * 10
self.vad = webrtcvad.Vad()
self.vad.set_mode(3)
self.stop_event = threading.Event()
self.chunk_bytes = self.chunk * BYTE_PER_SAMPLE
self.stop_event = threading.Event()
def run(self):
# VADは10msごと(1フレーム)で判定
buffer = b""
try:
while not self.stop_event.is_set():
try:
base_frame = self.vad_queue.get(timeout=0.1)
except queue.Empty:
continue
buffer += base_frame
if len(buffer) >= self.chunk_bytes:
is_speech = self.vad.is_speech(buffer[:self.chunk_bytes], self.rate)
if self.callback:
self.callback(is_speech)
buffer = buffer[self.chunk_bytes:]
# バッファが大きくなりすぎないようにクリア
if len(buffer) > self.max_buffer_size:
buffer = buffer[-self.max_buffer_size:]
except Exception as e:
print("[VAD] Error:", e)
def stop(self):
self.stop_event.set()
class AudioCapture(threading.Thread):
def __init__(self, queue_list: list[queue.Queue], rate: int = RATE):
super().__init__(daemon=False)
self.queue_list = queue_list
self.rate = rate
self.audio_interface = pyaudio.PyAudio()
self.read_frame_count = BASE_CHUNK * BYTE_PER_SAMPLE
self.audio_stream = self.audio_interface.open(
format=pyaudio.paInt16,
channels=1,
rate=self.rate,
input=True,
frames_per_buffer=self.read_frame_count,
)
self.stop_event = threading.Event()
def run(self):
try:
while not self.stop_event.is_set():
audio_data = self.audio_stream.read(self.read_frame_count, exception_on_overflow=False)
for queue in self.queue_list:
queue.put(audio_data)
except Exception as e:
print("[AudioCapture] Error:", e)
finally:
self.audio_stream.stop_stream()
self.audio_stream.close()
self.audio_interface.terminate()
def stop(self):
self.stop_event.set()
def main():
def stt_callback(result):
print("[STT Callback]:", result)
def vad_callback(is_speech):
print("[VAD Callback]:", is_speech)
stt_queue = queue.Queue()
vad_queue = queue.Queue()
audio_capture = AudioCapture([stt_queue, vad_queue])
stt = GoogleSpeechRecognition(stt_queue, callback=stt_callback)
vad = GoogleWebRTCVAD(vad_queue, callback=vad_callback)
audio_capture.start()
stt.start()
vad.start()
print("Press Ctrl+C to stop.")
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
print("KeyboardInterrupt detected. Stopping...")
audio_capture.stop()
stt.stop()
vad.stop()
audio_capture.join()
stt.join()
vad.join()
print("Main thread exiting.")
sys.exit(0)
if __name__ == "__main__":
main()
実行して以下のようにSTTとVADの出力をリアルタイムに見ることができたら成功です。ctrl+Cでとまります。
% uv run googlestt.py
Press Ctrl+C to stop.
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[STT Callback]: {'type': 'interim', 'text': 'こんにちは'}
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[STT Callback]: {'type': 'interim', 'text': 'こんにちは'}
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[STT Callback]: {'type': 'interim', 'text': 'こんにちはあ'}
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: True
[VAD Callback]: False
[STT Callback]: {'type': 'interim', 'text': 'こんにちは あいう'}
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[STT Callback]: {'type': 'interim', 'text': 'こんにちは あいうえ'}
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[STT Callback]: {'type': 'interim', 'text': 'こんにちは あいうえお'}
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[STT Callback]: {'type': 'interim', 'text': 'こんにちは あいうえお'}
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
[VAD Callback]: False
^CKeyboardInterrupt detected. Stopping...
[VAD Callback]: True
[STT] Error: None Exception iterating requests!
Main thread exiting.
解説
いくつか実装のポイントを述べます。
主なクラス
- AudioCaptureは端末のデフォルトマイクから音声データを取得し、queueにコピーするスレッドです。
- GoogleSpeechRecognitionはAudioCaptureが複製したaudio_dataをAPIリクエストして結果をcallback関数に渡すループ処理を持つスレッドです。
- GoogleWebRTCVADはAudioCaptureが複製したaudio_dataから発話区間検出して結果をcallback関数に渡すループ処理を持つスレッドです。
- GoogleSpeechRecognitionとGoogleWebRTCVADはcallback関数を受け取ることができるので、今は、単に標準出力するcallback関数を渡して実行しています。
audio.streamをSTTとVADで共用
STTにGoogle Cloud、VADにwebrtcvadを使用しています。最初は、それぞれのサンプルコードを単にthreadで並列処理をすればよいのかと思ったのですが、うまく動くこともあるのですが、ときどき、
||PaMacCore (AUHAL)|| Error on line 2523: err='-10863', msg=Audio Unit: cannot do in current context
のようなエラーが出たり、エラーは出ないのですが出力が止まってしまうことがありました。出力が止まる場合はデバッグ出力をたどると、audioストリームのopenの手前でとまっていました。
これはサンプルコードでは、VADとSTTそれぞれでpyaudioのストリームを開いており、一つのチャネルを複数のストリームで開こうとしていたことが原因のようでした。
そこで、stream自体は1つだけにして、queueにデータを複製して、STTやVADに渡すということをしています。ストリームを開く機能自体も並列処理にする必要があるので、AudioCaptureというクラスを作っています。
readの単位はAudioCapture側では少なめの単位(10ms)にしておき、stt、vad側で適宜まとめる処理を行うようにします。
一つのチャネルを複数ストリームで問題なく開けるかどうかはOSなどに依存するようで、それぞれのクラスでaudio_streamを作っても動くこともあると思います。
STTでのジェネレータ利用
STTではgeneratorでリクエストを送る必要があるので、queueにされたデータを再度generatorにする工夫をしています。
また、bufferがたまって処理がどんどん遅れていくので、定期的にbufferをクリアする処理を入れています。
single_utteranceは継続的にSTTをするためにはFalseにする必要がありますが、FalseだとTrueの場合に比べてfinalが出現しにくい課題があります。対策としてはTrueにしたうえで、callbackで再実行するようにするか、発話終了の検出はVADなど別の処理に任せることが考えられます。前者のほうが手軽ですが、前者にしてもfinalの検出精度はあんまりよくないです。安定性を求めるなら、VADなど別の方法も組み合わせて発話終了を検出したほうが良いかなと思います。
def run(self):
# STTは100msごと(STT_NUM_FRAMES個のBASEフレーム)をまとめて認識リクエストに送る想定
client = speech.SpeechClient()
config = speech.RecognitionConfig(
encoding=speech.RecognitionConfig.AudioEncoding.LINEAR16,
sample_rate_hertz=self.rate,
language_code="ja-JP",
)
streaming_config = speech.StreamingRecognitionConfig(
config=config,
interim_results=True,
single_utterance=self.single_utterance,
)
def request_generator():
buffer = b""
while not self.stop_event.is_set():
try:
base_frame = self.stt_queue.get(timeout=0.1)
except queue.Empty:
continue
buffer += base_frame
if len(buffer) >= self.chunk_bytes:
yield speech.StreamingRecognizeRequest(audio_content=buffer[:self.chunk_bytes])
buffer = buffer[self.chunk_bytes:]
# バッファが大きくなりすぎないようにクリア
if len(buffer) > self.max_buffer_size:
buffer = buffer[-self.max_buffer_size:]
return
responses = client.streaming_recognize(
config=streaming_config, requests=request_generator()
)
try:
for response in responses:
if self.stop_event.is_set():
break
if not response.results:
continue
result = response.results[0]
if result.alternatives:
transcript = result.alternatives[0].transcript
result_type = "final" if result.is_final else "interim"
if self.callback:
self.callback({"type": result_type, "text": transcript})
if result_type == "final":
break
except Exception as e:
print("[STT] Error:", e)
実装2: STTとVADの結果を組み合わせた判定処理
最初の実装だと結果が標準出力されるだけで、メイン側でSTTとVADの結果を組み合わせて使うイメージが湧きにくいので、スレッド間で認識結果を共有する方法についても検討しておきます。
例えば以下のような実装で「VADで一定時間以上、無音判定されたら、発話終了とみなし、その時点で認識結果を確定させ、STTのセッションを更新する」ことができます。
from multiprocessing import Manager
from typing import Callable
class Observer(threading.Thread):
def __init__(self, callback: Callable[[str], None], interval: float = 0.1):
super().__init__(daemon=False)
self.shared_dict = Manager().dict()
self.callback = callback
self.stop_event = threading.Event()
self.interval = interval
def run(self):
try:
while not self.stop_event.is_set():
self.callback()
time.sleep(self.interval)
except Exception as e:
print("[Observer] Error:", e)
def stop(self):
self.stop_event.set()
def main2():
shared_dict = Manager().dict()
def stt_callback(result):
nonlocal shared_dict
#print("[STT]:", result)
shared_dict["stt_result"] = result
def vad_callback(is_speech):
nonlocal shared_dict
if is_speech:
shared_dict["silence_count"] = 0
else:
shared_dict["silence_count"] = shared_dict.get("silence_count", 0) + 1
def observer_callback():
nonlocal shared_dict
if shared_dict.get("silence_count", 0) > 10 and shared_dict.get("stt_result", {}).get("text"):
print("[Observer] Silence detected. Finalize STT.")
print(f"[Observer] Final utterance: '{shared_dict['stt_result']['text']}'")
shared_dict["silence_count"] = 0
shared_dict["stt_result"] = {}
stt.stop()
stt.join()
start_new_stt_session()
print("[Observer] Start new session.")
def start_new_stt_session():
nonlocal stt_queue
nonlocal stt
stt = GoogleSpeechRecognition(stt_queue, callback=stt_callback)
stt.start()
stt_queue = queue.Queue()
vad_queue = queue.Queue()
audio_capture = AudioCapture([stt_queue, vad_queue])
stt = GoogleSpeechRecognition(stt_queue, callback=stt_callback)
vad = GoogleWebRTCVAD(vad_queue, callback=vad_callback)
observer = Observer(observer_callback)
audio_capture.start()
stt.start()
vad.start()
observer.start()
print("Press Ctrl+C to stop.")
try:
while True:
time.sleep(0.5)
except KeyboardInterrupt:
print("KeyboardInterrupt detected. Stopping...")
audio_capture.stop()
stt.stop()
vad.stop()
observer.stop()
audio_capture.join()
stt.join()
vad.join()
observer.join()
print("Main thread exiting.")
sys.exit(0)
if __name__=="__main__":
main2()
実行すると以下のような出力が得られます。文字だけだとわかりませんが、発話が終了してから比較的早くにfinal utteranceが出力されるようになりました。
Press Ctrl+C to stop.
[Observer] Silence detected. Finalize STT.
[Observer] Final utterance: 'こんにちは'
[Observer] Start new session.
[Observer] Silence detected. Finalize STT.
[Observer] Final utterance: '来てくれてありがとう'
[Observer] Start new session.
[Observer] Silence detected. Finalize STT.
[Observer] Final utterance: 'さよなら'
[Observer] Start new session.
^CKeyboardInterrupt detected. Stopping...
Main thread exiting.
- AudioCapture、GoogleSpeechRecognition、GoogleWebRTCVADはそのままで、新たにObserverというクラス(スレッド)とmain2を定義しています。
- 各スレッドが周期ごとにshared_dictを更新します。STTとVADの結果を組み合わせた判定を行うObserver自体もスレッドで、これもやはりshared_dictを周期的にreadし、条件を満たした場合に処理を行うようにしています。vadのcallbackからstt_resultを読みに行くようにすればObserverなしでも実装可能ですが、スレッドの関係性がスパゲッティになっていくので、一つ一つのスレッドの処理はなるべくシンプルにしたほうが良いかと思います。
- STT、VADはshared_dictに結果を逐次書き込みます。VADはVADの結果そのものではなくis_speech=Falseの回数を記録するようにしています。
- Observerは0.1秒ごとにshared_dictを読み、silence_countがしきい値以上で、stt_resultが存在していればその結果を出力し、stt_sessionをrestartします。
- スレッド間の変数のやり取りはManager().dict()を使っています。これはスレッドセーフなクラスです。
おわりに
pythonのthreadingを使って、STTとVADを並列に実行する方法や、別のスレッドからSTTとVADの結果を組み合わせた処理をする方法を検討しました。
threading自体の仕様は色んなところで解説されているので本記事では触れず、音声のストリーム処理と組み合わせたときの詰まりポイントを中心に解説しました。
意外とこのあたりの実装コードはネットに転がっていないのではと思います。
今後の発展としては、Observerでllmを呼び出すようにしたりすると、高速に発話の終了を判定して音声合成をしたり、音声認識の途中結果を使って事前に応答を生成しておくみたいなこともできたりします。
本記事がどなたかのお役に立ちましたら幸いです。