1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Vosk + Twilioで実現する双方向リアルタイム音声文字起こしシステム

Posted at

ローカル端末だけで、いま相手が話している内容を日本語でサクッと文字起こししたい。クラウドASRは便利だけど、事情によっては手元完結が良いときもある──ということで、Vosk × Twilio Media Streamsで双方向(inbound/outbound)を同時に処理する仕組みを組みました。実装のポイントとつまずき、回避策をまとめます。

TL;DR(最初に結論)

  • Voskは軽い&オフライン。μ‑law → PCM16 変換を挟めば、Twilio Media Streamsと相性よし。
  • both_tracksを使うと inbound/outbound が別トラックで来る。同じバッファに混ぜると音声が“倍の長さ”問題が発生するので、トラック別バッファ&別Recognizerが必須。
  • 250ms(約4000バイト)単位バッファでVoskに投げると、レイテンシと精度のバランスが良かった。
  • 品質はクラウドASR(Google/Deepgram等)に及ばないが、セキュリティ/オフライン要件には十分使える。まずは手元で試す価値あり。
  • 今回作った成果物はこちら

背景と狙い

普段はTwilio標準のクラウドASR(GoogleやDeepgram)を使うことが多いのですが、ローカル端末でのリアルタイムSTTを試したくなったので検証しました。GPUなしのMac mini前提、リアルタイム双方向を文字起こし、依存は最小限(Pythonのみ)が今回の条件です。

ローカルASRの候補はざっくりこんな感じ:

  • faster‑whisper(CTranslate2):品質と遅延のバランス最強。GPU/CPUどちらでも速い
  • whisper.cpp(GGUF):GPUなしでも軽い。超低レイテンシ用途で小モデル運用が楽
  • Vosk:軽量・オフライン・日本語可。語彙拡張が簡単、省リソース端末向け
  • NVIDIA Riva(GPUあり):超低遅延、業務用途で堅い

今回はVoskを先に触って、その後にwhisper.cppで比較予定です(別記事にします)。

システム概要

  • リアルタイム処理:Twilioからの音声を即時STT
  • 双方向認識:発信者(inbound)と受信者(outbound)を同時処理
  • オフライン動作:Voskでネット不要
  • 完全Python実装:ネイティブ依存の地雷を最小化

アーキテクチャ

構成要素(4つ)

  1. WebSocketサーバ

    • aiohttpでTwilioのMedia Streamsを受ける。接続毎にsession_idを払い出し、並列通話OK。
    async def websocket_handler(request):
        ws = web.WebSocketResponse()
        await ws.prepare(request)
    
        session_id = generate_session_id()
    
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                data = json.loads(msg.data)
                await handle_media_stream_message(ws, data, session_id)
    
  2. 音声ユーティリティ

    • μ‑law → PCM16(G.711)デコード、WAV保存。
    def mulaw_to_pcm(mulaw_data: bytes) -> bytes:
        """μ-lawをPCM16へ(G.711)"""
        import struct
        pcm_samples = []
        for b in mulaw_data:
            u = ~b & 0xFF
            sign = (u & 0x80) >> 7
            exp  = (u & 0x70) >> 4
            mant = (u & 0x0F)
            linear = ((mant << 1) + 33) << exp
            linear -= 33
            if sign == 0:
                linear = -linear
            pcm_samples.append(max(-32768, min(32767, linear)))
        return struct.pack(f'<{len(pcm_samples)}h', *pcm_samples)
    
  3. Voskエンジン

    • モデルのロード、セッション/トラック別Recognizerの生成と管理。
    class VoskEngine:
        def __init__(self, model_path: str, sample_rate: int = 8000):
            self.model_path = model_path
            self.sample_rate = sample_rate
            self.model = None
            self.recognizers = {}
    
        def initialize(self):
            self.model = Model(self.model_path)
    
        def create_recognizer(self, rid: str):
            r = KaldiRecognizer(self.model, self.sample_rate)
            r.SetWords(True)
            self.recognizers[rid] = r
            return r
    
        def process_audio(self, rid: str, audio: bytes):
            r = self.recognizers.get(rid)
            if not r:
                return None
            if r.AcceptWaveform(audio):
                res = json.loads(r.Result())
                txt = res.get('text', '').strip()
                if txt:
                    return { 'type': 'final', 'text': txt, 'result': res.get('result', []) }
            else:
                p = json.loads(r.PartialResult())
                pt = p.get('partial', '').strip()
                if pt:
                    return { 'type': 'partial', 'text': pt }
            return None
    
        def finalize_session(self, rid: str):
            r = self.recognizers.get(rid)
            if not r:
                return None
            res = json.loads(r.FinalResult())
            txt = res.get('text', '').strip()
            del self.recognizers[rid]
            return { 'type': 'final', 'text': txt, 'result': res.get('result', []) }
    
  4. ロギング

    • app / vosk / ws / transcriptionの4系統。問題切り分けが楽。

実装でハマった点と解決

1) μ‑lawデコード

Twilio Media StreamsはG.711 μ‑lawで音声を送ってくるので、ASR前にPCM16へ変換が必要。OpenAI Realtimeのように抽象化されているAPIに慣れていると忘れがちですが、多くのASRはμ‑lawを直接受けません。上のmulaw_to_pcmで解消。

検証ポイント:定期的に振幅の統計をログ出しして、デコード異常を早期検知。

if session['audio_chunks'] % 100 == 1:
    samples = struct.unpack(f'<{len(pcm_buffer)//2}h', pcm_buffer)
    max_amp = max(abs(s) for s in samples)
    avg_amp = sum(abs(s) for s in samples) / len(samples)
    ws_logger.debug(f"amp: max={max_amp}, avg={avg_amp:.1f}")

2) 双方向(both_tracks)の混線問題

track="both_tracks"を使うと、inbound/outboundが別トラックで届きます。最初、これを同じバッファにappendしてしまい、生成される音声が実時間の“2倍の長さ”になる問題が発生。

解決トラック別のバッファ&Recognizerで完全分離。

sessions[session_id] = {
  'audio_buffer_inbound': bytearray(),
  'audio_buffer_outbound': bytearray(),
}

buffer_key = f'audio_buffer_{track}'  # 'inbound' or 'outbound'
session[buffer_key].extend(pcm_buffer)

# Recognizerも inbound/outbound でID分け
rid = f"{session_id}_{track}"
result = vosk_engine.process_audio(rid, bytes(session[buffer_key]))

3) バッファサイズの最適化(レイテンシvs精度)

Twilioは20ms / 160B刻みで届くので、そのままVoskに投げると非効率。約250ms(4000B)貯めてから渡すと、認識安定&CPU効率の両立が取れました。

if len(session[buffer_key]) >= 4000:  # 約250ms
    rid = f"{session_id}_{track}"
    result = vosk_engine.process_audio(rid, bytes(session[buffer_key]))
    session[buffer_key].clear()

理由(ざっくり)

  • 短すぎる → 文脈不足で品質低下
  • 長すぎる → レイテンシ増大
  • 250ms前後は実用上ちょうど良かった(体感)

ストリーム終了時は残バッファとFinalResultを忘れずに回収。

Media Streamsのイベント処理(流れ)

async def handle_media_stream_message(ws, message, session_id):
    event_type = message.get('event')

    if event_type == 'connected':
        ws_logger.info(f"connected: {session_id}")

    elif event_type == 'start':
        start = message.get('start', {})
        sessions[session_id] = {
            'call_sid': start.get('callSid'),
            'stream_sid': start.get('streamSid'),
            'start_time': datetime.now(),
            'audio_buffer_inbound': bytearray(),
            'audio_buffer_outbound': bytearray(),
        }
        vosk_engine.create_recognizer(f"{session_id}_inbound")
        vosk_engine.create_recognizer(f"{session_id}_outbound")

    elif event_type == 'media':
        media = message.get('media', {})
        track = media.get('track')  # inbound/outbound
        audio_payload = base64.b64decode(media.get('payload'))
        pcm_buffer = mulaw_to_pcm(audio_payload)

        key = f'audio_buffer_{track}'
        session = sessions[session_id]
        session[key].extend(pcm_buffer)

        if len(session[key]) >= 4000:
            rid = f"{session_id}_{track}"
            result = vosk_engine.process_audio(rid, bytes(session[key]))
            session[key].clear()
            if result:
                await ws.send_json({
                    'event': 'transcription',
                    'type': result['type'],
                    'text': result['text'],
                    'track': track,
                    'session_id': session_id,
                    'timestamp': datetime.now().isoformat(),
                })

    elif event_type == 'stop':
        # 残バッファ処理 → FinalResult → クリーンアップ
        ...

TwiML(both_tracks 指定)

<Response>
  <Start>
    <Stream url="wss://your-server.example.com/media"
            name="my-stream"
            track="both_tracks">
      <Parameter name="callUseCase" value="live-stt"/>
      <Parameter name="tenantId"   value="jp001"/>
    </Stream>
  </Start>
</Response>

デバッグ運用

音声の保全(任意)

環境変数 DEBUG_SAVE_AUDIO=true で、inbound/outbound それぞれの生PCMを貯めてWAV書き出し。通話終了時に保存します。

ディレクトリ構成は CallSID単位

debug_audio/
└── CAxxxxxxxxxxxxxxxxxxxxxx/
    ├── session_XXXX_inbound_YYYYMMDD_HHMMSS.wav
    ├── session_XXXX_outbound_YYYYMMDD_HHMMSS.wav
    └── transcription.txt

書き出し(抜粋):

filename = save_audio_to_wav(
    f"{session_id}_inbound",
    bytes(session_info['all_pcm_data_inbound']),
    WAV_SAMPLE_RATE,
    session_info['call_sid']
)

文字起こしログ

transcription.txtFinalのみ追記。日本語の不要スペース除去もここで実施。

[2025-10-12 17:49:44] [発信者] こんにちは
[2025-10-12 17:49:45] [受信者] はい、こんにちは
[2025-10-12 17:49:50] [発信者] 今日はいい天気ですね

ログレイヤ(4本)

  • app_logger:ライフサイクル
  • ws_logger:WebSocket I/O
  • vosk_logger:ASR内部
  • transcription_logger:結果専用

終了時に track_counts を吐くとパケット落ちも見やすいです。


パフォーマンス・運用Tips

  • 非同期I/O:各WS接続は独立タスク。処理はブロックしない
  • メモリ管理:Vosk投入後は即 bytearray.clear()。長通話でも安定。
  • サンプルレート:Media Streamsは8kHz/mono/G.711 μ‑law想定。Voskモデル側のレートと合わせる(8kでOKな日本語モデルを使用)。
  • 語彙拡張:Voskのspk/model辞書追加で固有名詞の取りこぼしが減らせるらしい。やってないから後日別途まとめます。

Node.js → Pythonに切り替えた理由

最初はNodeでやりましたが、ffi-napi周りのネイティブ依存が面倒。VoskはPythonが親和的で、標準ライブラリだけでμ‑law処理も完結。ということでPythonに寄せたら運用が安定しました。

  • ネイティブビルド不要
  • Vosk APIをラッパ無しで直叩き
  • structでPCM処理が完了

今後の改善(予定)

  • whisper.cppとの遅延/精度/CPU負荷の比較(tiny/small中心)
  • 語彙拡張と日本語固有名詞の最適化
  • UI:ブラウザに吹き出し表示(inbound=左 / outbound=右)
  • 双方向翻訳モード(日本語↔英語)

セットアップ(ざっくり)

  1. Vosk日本語モデルを取得(例:vosk-model-small-ja-xx
  2. Python依存インストール
    pip install aiohttp vosk
    
  3. server.py のWSエンドポイントを公開(ngrok/Cloud Run など環境に応じて)
  4. TwiMLで <Stream track="both_tracks"> を指定して発着信
<Response>
  <Start>
    <Stream url="wss://your-server.example.com/stream"
            track="both_tracks" >
    </Stream>
  </Start>
</Response>

※ モデル名やサイズは端末性能と相談。8kHzモデル推奨。16kHzモデルを使う場合はレート変換が必要。

参考リンク

  • Twilio Media Streams(公式)
    • クイックスタート / イベント仕様 / セキュア接続 など
  • Vosk(公式GitHub/Docs)
    • モデル配布 / API / 語彙拡張

※ URLは記事公開時点の最新を掲載してください。

ソースコード

GitHubに全体を置いています:

https://github.com/MitsuharuNakamura/websocket-vosk

この記事の断片コードは可読性優先で省略しています。実装はGitHubを参照してください。

まとめ(所感)

クラウドASRに”乗っかる”と、正直ここまで意識しなくても動きます。ただ、μ‑law/PCM、トラック分離、バッファ設計を自分で握ってみると、仕組み理解が一段深くなるのを実感。クラウドに出せない要件オフライン前提の現場では、Voskは十分に戦えます。

一方で、当たり前ですが取って出しだと純粋な精度はクラウド勢に軍配。要件次第で使い分け、ですね。まずは軽量なモデルから、手元のマシンで動かしてみてください。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?