ローカル端末だけで、いま相手が話している内容を日本語でサクッと文字起こししたい。クラウドASRは便利だけど、事情によっては手元完結が良いときもある──ということで、Vosk × Twilio Media Streamsで双方向(inbound/outbound)を同時に処理する仕組みを組みました。実装のポイントとつまずき、回避策をまとめます。
TL;DR(最初に結論)
- Voskは軽い&オフライン。μ‑law → PCM16 変換を挟めば、Twilio Media Streamsと相性よし。
- both_tracksを使うと inbound/outbound が別トラックで来る。同じバッファに混ぜると音声が“倍の長さ”問題が発生するので、トラック別バッファ&別Recognizerが必須。
- 250ms(約4000バイト)単位バッファでVoskに投げると、レイテンシと精度のバランスが良かった。
- 品質はクラウドASR(Google/Deepgram等)に及ばないが、セキュリティ/オフライン要件には十分使える。まずは手元で試す価値あり。
- 今回作った成果物はこちら
背景と狙い
普段はTwilio標準のクラウドASR(GoogleやDeepgram)を使うことが多いのですが、ローカル端末でのリアルタイムSTTを試したくなったので検証しました。GPUなしのMac mini前提、リアルタイムで双方向を文字起こし、依存は最小限(Pythonのみ)が今回の条件です。
ローカルASRの候補はざっくりこんな感じ:
- faster‑whisper(CTranslate2):品質と遅延のバランス最強。GPU/CPUどちらでも速い
- whisper.cpp(GGUF):GPUなしでも軽い。超低レイテンシ用途で小モデル運用が楽
- Vosk:軽量・オフライン・日本語可。語彙拡張が簡単、省リソース端末向け
- NVIDIA Riva(GPUあり):超低遅延、業務用途で堅い
今回はVoskを先に触って、その後にwhisper.cppで比較予定です(別記事にします)。
システム概要
- リアルタイム処理:Twilioからの音声を即時STT
- 双方向認識:発信者(inbound)と受信者(outbound)を同時処理
- オフライン動作:Voskでネット不要
- 完全Python実装:ネイティブ依存の地雷を最小化
アーキテクチャ
構成要素(4つ)
-
WebSocketサーバ
-
aiohttpでTwilioのMedia Streamsを受ける。接続毎にsession_idを払い出し、並列通話OK。
async def websocket_handler(request): ws = web.WebSocketResponse() await ws.prepare(request) session_id = generate_session_id() async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) await handle_media_stream_message(ws, data, session_id) -
-
音声ユーティリティ
- μ‑law → PCM16(G.711)デコード、WAV保存。
def mulaw_to_pcm(mulaw_data: bytes) -> bytes: """μ-lawをPCM16へ(G.711)""" import struct pcm_samples = [] for b in mulaw_data: u = ~b & 0xFF sign = (u & 0x80) >> 7 exp = (u & 0x70) >> 4 mant = (u & 0x0F) linear = ((mant << 1) + 33) << exp linear -= 33 if sign == 0: linear = -linear pcm_samples.append(max(-32768, min(32767, linear))) return struct.pack(f'<{len(pcm_samples)}h', *pcm_samples) -
Voskエンジン
- モデルのロード、セッション/トラック別Recognizerの生成と管理。
class VoskEngine: def __init__(self, model_path: str, sample_rate: int = 8000): self.model_path = model_path self.sample_rate = sample_rate self.model = None self.recognizers = {} def initialize(self): self.model = Model(self.model_path) def create_recognizer(self, rid: str): r = KaldiRecognizer(self.model, self.sample_rate) r.SetWords(True) self.recognizers[rid] = r return r def process_audio(self, rid: str, audio: bytes): r = self.recognizers.get(rid) if not r: return None if r.AcceptWaveform(audio): res = json.loads(r.Result()) txt = res.get('text', '').strip() if txt: return { 'type': 'final', 'text': txt, 'result': res.get('result', []) } else: p = json.loads(r.PartialResult()) pt = p.get('partial', '').strip() if pt: return { 'type': 'partial', 'text': pt } return None def finalize_session(self, rid: str): r = self.recognizers.get(rid) if not r: return None res = json.loads(r.FinalResult()) txt = res.get('text', '').strip() del self.recognizers[rid] return { 'type': 'final', 'text': txt, 'result': res.get('result', []) } -
ロギング
-
app / vosk / ws / transcriptionの4系統。問題切り分けが楽。
-
実装でハマった点と解決
1) μ‑lawデコード
Twilio Media StreamsはG.711 μ‑lawで音声を送ってくるので、ASR前にPCM16へ変換が必要。OpenAI Realtimeのように抽象化されているAPIに慣れていると忘れがちですが、多くのASRはμ‑lawを直接受けません。上のmulaw_to_pcmで解消。
検証ポイント:定期的に振幅の統計をログ出しして、デコード異常を早期検知。
if session['audio_chunks'] % 100 == 1:
samples = struct.unpack(f'<{len(pcm_buffer)//2}h', pcm_buffer)
max_amp = max(abs(s) for s in samples)
avg_amp = sum(abs(s) for s in samples) / len(samples)
ws_logger.debug(f"amp: max={max_amp}, avg={avg_amp:.1f}")
2) 双方向(both_tracks)の混線問題
track="both_tracks"を使うと、inbound/outboundが別トラックで届きます。最初、これを同じバッファにappendしてしまい、生成される音声が実時間の“2倍の長さ”になる問題が発生。
解決:トラック別のバッファ&Recognizerで完全分離。
sessions[session_id] = {
'audio_buffer_inbound': bytearray(),
'audio_buffer_outbound': bytearray(),
}
buffer_key = f'audio_buffer_{track}' # 'inbound' or 'outbound'
session[buffer_key].extend(pcm_buffer)
# Recognizerも inbound/outbound でID分け
rid = f"{session_id}_{track}"
result = vosk_engine.process_audio(rid, bytes(session[buffer_key]))
3) バッファサイズの最適化(レイテンシvs精度)
Twilioは20ms / 160B刻みで届くので、そのままVoskに投げると非効率。約250ms(4000B)貯めてから渡すと、認識安定&CPU効率の両立が取れました。
if len(session[buffer_key]) >= 4000: # 約250ms
rid = f"{session_id}_{track}"
result = vosk_engine.process_audio(rid, bytes(session[buffer_key]))
session[buffer_key].clear()
理由(ざっくり)
- 短すぎる → 文脈不足で品質低下
- 長すぎる → レイテンシ増大
- 250ms前後は実用上ちょうど良かった(体感)
ストリーム終了時は残バッファとFinalResultを忘れずに回収。
Media Streamsのイベント処理(流れ)
async def handle_media_stream_message(ws, message, session_id):
event_type = message.get('event')
if event_type == 'connected':
ws_logger.info(f"connected: {session_id}")
elif event_type == 'start':
start = message.get('start', {})
sessions[session_id] = {
'call_sid': start.get('callSid'),
'stream_sid': start.get('streamSid'),
'start_time': datetime.now(),
'audio_buffer_inbound': bytearray(),
'audio_buffer_outbound': bytearray(),
}
vosk_engine.create_recognizer(f"{session_id}_inbound")
vosk_engine.create_recognizer(f"{session_id}_outbound")
elif event_type == 'media':
media = message.get('media', {})
track = media.get('track') # inbound/outbound
audio_payload = base64.b64decode(media.get('payload'))
pcm_buffer = mulaw_to_pcm(audio_payload)
key = f'audio_buffer_{track}'
session = sessions[session_id]
session[key].extend(pcm_buffer)
if len(session[key]) >= 4000:
rid = f"{session_id}_{track}"
result = vosk_engine.process_audio(rid, bytes(session[key]))
session[key].clear()
if result:
await ws.send_json({
'event': 'transcription',
'type': result['type'],
'text': result['text'],
'track': track,
'session_id': session_id,
'timestamp': datetime.now().isoformat(),
})
elif event_type == 'stop':
# 残バッファ処理 → FinalResult → クリーンアップ
...
TwiML(both_tracks 指定)
<Response>
<Start>
<Stream url="wss://your-server.example.com/media"
name="my-stream"
track="both_tracks">
<Parameter name="callUseCase" value="live-stt"/>
<Parameter name="tenantId" value="jp001"/>
</Stream>
</Start>
</Response>
デバッグ運用
音声の保全(任意)
環境変数 DEBUG_SAVE_AUDIO=true で、inbound/outbound それぞれの生PCMを貯めてWAV書き出し。通話終了時に保存します。
ディレクトリ構成は CallSID単位:
debug_audio/
└── CAxxxxxxxxxxxxxxxxxxxxxx/
├── session_XXXX_inbound_YYYYMMDD_HHMMSS.wav
├── session_XXXX_outbound_YYYYMMDD_HHMMSS.wav
└── transcription.txt
書き出し(抜粋):
filename = save_audio_to_wav(
f"{session_id}_inbound",
bytes(session_info['all_pcm_data_inbound']),
WAV_SAMPLE_RATE,
session_info['call_sid']
)
文字起こしログ
transcription.txtにFinalのみ追記。日本語の不要スペース除去もここで実施。
[2025-10-12 17:49:44] [発信者] こんにちは
[2025-10-12 17:49:45] [受信者] はい、こんにちは
[2025-10-12 17:49:50] [発信者] 今日はいい天気ですね
ログレイヤ(4本)
-
app_logger:ライフサイクル -
ws_logger:WebSocket I/O -
vosk_logger:ASR内部 -
transcription_logger:結果専用
終了時に track_counts を吐くとパケット落ちも見やすいです。
パフォーマンス・運用Tips
- 非同期I/O:各WS接続は独立タスク。処理はブロックしない。
-
メモリ管理:Vosk投入後は即
bytearray.clear()。長通話でも安定。 - サンプルレート:Media Streamsは8kHz/mono/G.711 μ‑law想定。Voskモデル側のレートと合わせる(8kでOKな日本語モデルを使用)。
-
語彙拡張:Voskの
spk/model辞書追加で固有名詞の取りこぼしが減らせるらしい。やってないから後日別途まとめます。
Node.js → Pythonに切り替えた理由
最初はNodeでやりましたが、ffi-napi周りのネイティブ依存が面倒。VoskはPythonが親和的で、標準ライブラリだけでμ‑law処理も完結。ということでPythonに寄せたら運用が安定しました。
- ネイティブビルド不要
- Vosk APIをラッパ無しで直叩き
-
structでPCM処理が完了
今後の改善(予定)
- whisper.cppとの遅延/精度/CPU負荷の比較(tiny/small中心)
- 語彙拡張と日本語固有名詞の最適化
- UI:ブラウザに吹き出し表示(inbound=左 / outbound=右)
- 双方向翻訳モード(日本語↔英語)
セットアップ(ざっくり)
- Vosk日本語モデルを取得(例:
vosk-model-small-ja-xx) - Python依存インストール
pip install aiohttp vosk -
server.pyのWSエンドポイントを公開(ngrok/Cloud Run など環境に応じて) - TwiMLで
<Stream track="both_tracks">を指定して発着信
<Response>
<Start>
<Stream url="wss://your-server.example.com/stream"
track="both_tracks" >
</Stream>
</Start>
</Response>
※ モデル名やサイズは端末性能と相談。8kHzモデル推奨。16kHzモデルを使う場合はレート変換が必要。
参考リンク
-
Twilio Media Streams(公式)
- クイックスタート / イベント仕様 / セキュア接続 など
-
Vosk(公式GitHub/Docs)
- モデル配布 / API / 語彙拡張
※ URLは記事公開時点の最新を掲載してください。
ソースコード
GitHubに全体を置いています:
https://github.com/MitsuharuNakamura/websocket-vosk
この記事の断片コードは可読性優先で省略しています。実装はGitHubを参照してください。
まとめ(所感)
クラウドASRに”乗っかる”と、正直ここまで意識しなくても動きます。ただ、μ‑law/PCM、トラック分離、バッファ設計を自分で握ってみると、仕組み理解が一段深くなるのを実感。クラウドに出せない要件やオフライン前提の現場では、Voskは十分に戦えます。
一方で、当たり前ですが取って出しだと純粋な精度はクラウド勢に軍配。要件次第で使い分け、ですね。まずは軽量なモデルから、手元のマシンで動かしてみてください。