1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Multimodal Live APIを使った簡単なアプリ作ってみる

Posted at

はじめに

Multimodal Live APIが出てから、約半年。

Google AI studioのほうでは触っていたものの、APIのほうでは触っていないことに気づき、触ってみました。

せっかくAPIを触るなら簡単なwebアプリを作ろうと思い、作ってみました。
テキストと音声でリアルタイムな会話ができる機能を実装してみました。

Google公開している、以下のようなデモアプリをカスタマイズしてもよいと思いました。しかし、FastAPIを試してみたかったという思いもあり、今回はバックエンドでFastAPIを使った実装にしようと思います。

準備

以下のMultimodal Live APIを使うにあたって、Google Cloudのプロジェクトが必要なので準備します。

プロジェクトの作成方法については紹介しませんが、以下のサイトを参考にしていただければできると思います。

実装

それでは実装していきます。
フロントエンドはReact、バックエンドはpythonのFastAPIを使って実装していきます。

まずはバックエンドから実装していきます。

バックエンド

バックエンドでは、テキストデータと音声データの2つのエンドポイント構成を実装していきます。

テキストデータのエンドポイント

Gemini APIのlive.connectを使って、ストリーミング処理を実装していきます。

@app.websocket("/ws/text")
async def websocket_text_endpoint(websocket: WebSocket):
    await websocket.accept()
    print("Text WebSocket connection established.")
    model_id = "gemini-2.0-flash-exp"
    connection_manager = client.aio.live.connect(model=model_id, config=LiveConnectConfig(response_modalities=[Modality.TEXT]))
    session = await connection_manager.__aenter__()
    print("Gemini session started for text chat.")
    try:
        while True:
            raw_data = await websocket.receive_text()
            data = json.loads(raw_data)
            origin = data.get("origin", "text")
            user_input = data["text"]
            
            print(f"Received from frontend (origin: {origin}): {user_input}")

            prompt = system_prompt + "\n" + user_input
            await session.send(input=prompt, end_of_turn=True)

            tts_buffer = ""
            async for message in session.receive():
                if message.text:
                    text_payload = json.dumps({"type": "ai_text", "text": message.text})
                    print(f"Sending text to frontend: {text_payload}")
                    await websocket.send_text(text_payload)
                    
                    if origin == 'voice':
                        tts_buffer += message.text
                        while True:
                            match = re.search(r'(.*?([。!?]))', tts_buffer, re.DOTALL)
                            if match:
                                sentence = match.group(1)
                                tts_buffer = tts_buffer[len(sentence):]
                                await text_to_speech_and_send(sentence, websocket)
                            else:
                                break
            
            if origin == 'voice' and tts_buffer:
                await text_to_speech_and_send(tts_buffer, websocket)

    except WebSocketDisconnect:
        print("Text WebSocket connection closed.")
    except Exception as e:
        print(f"Text WebSocket error: {e}")
    finally:
        if session:
            await connection_manager.__aexit__(None, None, None)
            print("Gemini session closed.")

ユーザの入力関数が音声だった場合、以下の関数を使って、テキストを音声に変換しています。
※この理由については、音声データのエンドポイント部分で解説しています。

async def text_to_speech_and_send(text: str, websocket: WebSocket):
    if not text.strip():
        return
    try:
        print(f"Generating speech for: '{text}'")
        mp3_fp = io.BytesIO()
        tts = gTTS(text=text, lang='ja')
        tts.write_to_fp(mp3_fp)
        mp3_fp.seek(0)
        await websocket.send_bytes(mp3_fp.read())
        print(f"Sent speech audio to frontend.")
    except Exception as e:
        print(f"TTS Error: {e}")
音声データのエンドポイント

続いて、音声データに関するエンドポイントを作成していきます。
今回私が扱ったvertexAIの「gemini-2.0-flash-exp」ですが、2025年7月時点で音声ストリーミングに対応していないようで、音声データを直接Geminiに渡すことができないと作成時に判明しました。
そこで、本実装では以下の処理をすることで音声入力対応することにしました。

  1. 音声データをテキストに変換し、フロントエンドに返す(テキストを表示するため)
  2. フロントエンドからバックエンドにテキストを渡す
  3. テキストをGeminiに渡す
  4. 生成された応答テキストを音声に変換
  5. テキストと音声をフロントエンドに返す

したがって、音声データに関するAPIでは音声を文字起こしするだけの機能を実装していきます。

@app.websocket("/ws/audio")
async def websocket_audio_endpoint(websocket: WebSocket):
    await websocket.accept()
    print("Audio WebSocket connection established.")

    vad = webrtcvad.Vad(3)
    vad_ring_buffer = collections.deque(maxlen=15)
    frame_buffer_for_stt = collections.deque(maxlen=15)
    triggered = False
    voiced_frames = []
    stream_buffer = bytearray()
    try:
        while True:
            message = await websocket.receive()
            if "bytes" in message:
                stream_buffer.extend(message["bytes"])
            else: continue
            while len(stream_buffer) >= VAD_FRAME_BYTES:
                vad_frame = stream_buffer[:VAD_FRAME_BYTES]
                stream_buffer = stream_buffer[VAD_FRAME_BYTES:]
                frame_buffer_for_stt.append(vad_frame)
                is_speech = vad.is_speech(vad_frame, VAD_SAMPLE_RATE)
                vad_ring_buffer.append(is_speech)
                if not triggered and sum(vad_ring_buffer) > 12:
                    print("Speech started...")
                    triggered = True
                    voiced_frames.extend(list(frame_buffer_for_stt))
                elif triggered:
                    voiced_frames.append(vad_frame)
                if triggered and sum(vad_ring_buffer) < 2:
                    print("Speech ended.")
                    voiced_data = b''.join(voiced_frames)
                    try:
                        print(f"Original audio received: {len(voiced_data)} bytes at {VAD_SAMPLE_RATE}Hz.")
                        audio_segment = AudioSegment(data=voiced_data, sample_width=2, frame_rate=VAD_SAMPLE_RATE, channels=1)
                        resampled_segment = audio_segment.set_frame_rate(TARGET_STT_SAMPLE_RATE)
                        print(f"Resampled audio to {len(resampled_segment.raw_data)} bytes for STT.")
                        audio_data = sr.AudioData(resampled_segment.raw_data, TARGET_STT_SAMPLE_RATE, 2)
                        text = r.recognize_google(audio_data, language='ja-JP')
                        print(f"Recognized text: {text}")
                        response_data = json.dumps({"type": "stt_result", "text": text})
                        print(f"Sending STT result to frontend: {response_data}")
                        await websocket.send_text(response_data)
                    except sr.UnknownValueError:
                        print("Could not understand audio even after resampling.")
                    except Exception as e:
                        print(f"Error during STT or resampling: {e}")
                    finally:
                        triggered = False
                        voiced_frames.clear()
                        vad_ring_buffer.clear()
                        frame_buffer_for_stt.clear()
    except WebSocketDisconnect:
        print("Audio WebSocket connection closed.")
    except Exception as e:
        print(f"Audio WebSocket error: {e}")

ユーザの発話・終話の検知はwebrtcvadライブラリを、文字起こしにはGoogleのSpeech_recognitionライブラリを使用して実装しました。

フロントエンド

続いて、フロントエンドを実装していきます。重要な箇所だけ紹介していきます。

WebSocket接続

テキストデータと音声データの2つのエンドポイントへのWebSocket接続を確立します。
受信したデータがBlobかStringで処理を分けるようにしました。

useEffect(() => {
    const wsText = new WebSocket('ws://127.0.0.1:8000/ws/text');
    websocketTextRef.current = wsText;
    wsText.binaryType = 'blob';

    wsText.onopen = () => setIsConnected(true);
    wsText.onclose = () => setIsConnected(false);
    wsText.onerror = (error) => console.error('Text WebSocket error:', error);
    
    wsText.onmessage = (event: MessageEvent) => {
      if (event.data instanceof Blob) {
        audioQueue.current.push(event.data);
        playNextInQueue();
      } 
      else if (typeof event.data === 'string') {
        try {
          const data = JSON.parse(event.data);
          if(data.type === 'ai_text' && data.text){
            setMessages(prevMessages => {
              const lastMessage = prevMessages[prevMessages.length - 1];
              if (lastMessage && lastMessage.sender === 'ai') {
                const updatedLastMessage = { ...lastMessage, text: lastMessage.text + data.text };
                return [...prevMessages.slice(0, -1), updatedLastMessage];
              } else {
                return [...prevMessages, { sender: 'ai', text: data.text }];
              }
            });
          }
        } catch(e) {
          console.error("Failed to parse AI text response:", e)
        }
      }
    };

    const wsAudio = new WebSocket('ws://127.0.0.1:8000/ws/audio');
    websocketAudioRef.current = wsAudio;
    
    wsAudio.onopen = () => console.log('Audio WebSocket connected');
    wsAudio.onclose = () => console.log('Audio WebSocket disconnected');
    wsAudio.onerror = (error) => console.error('Audio WebSocket error:', error);
    wsAudio.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        if (data.type === 'stt_result' && data.text) {
          setMessages(prev => [...prev, { sender: 'user', text: data.text }]);
          sendTextToGemini(data.text, 'voice');
        }
      } catch (error) {
        console.error("STT結果のパースに失敗", error);
      }
    };

    return () => {
      wsText.close();
      wsAudio.close();
    };
  }, []);
マイク入力処理

microphone-streamライブラリを使用して、マイクからの音声入力をリアルタイムで取得します。

const toggleSpeaking = async () => {
    if (isSpeaking) {
      if (micStreamRef.current) micStreamRef.current.stop();
      setIsSpeaking(false);
    } else {
      setIsSpeaking(true);
      try {
        const stream = await navigator.mediaDevices.getUserMedia({ audio: { sampleRate: 48000, channelCount: 1 } });
        micStreamRef.current = new MicrophoneStream({ stream, objectMode: true });
        (micStreamRef.current as any).on('data', (audioBuffer: AudioBuffer) => {
          const pcmData = audioBuffer.getChannelData(0);
          const int16Data = new Int16Array(pcmData.length);
          for (let i = 0; i < pcmData.length; i++) {
            const s = Math.max(-1, Math.min(1, pcmData[i]));
            int16Data[i] = s < 0 ? s * 0x8000 : s * 0x7FFF;
          }
          if (websocketAudioRef.current?.readyState === WebSocket.OPEN) {
            websocketAudioRef.current.send(int16Data.buffer);
          }
        });
      } catch (error) {
        console.error("マイクの取得に失敗しました:", error);
        setIsSpeaking(false);
      }
    }
  };

サンプリングレートはバックエンドの設定に合わせてください。設定を失敗すると、文字起こしに失敗します。

音声データの再生

バックエンドから送られてくる音声データ(Blob)をキューにためて、再生します。(キューにためないと、音声の順番が逆転したり、同時再生されたりします。)

const audioQueue = useRef<Blob[]>([]);
const isPlaying = useRef<boolean>(false);

const playNextInQueue = () => {
if (isPlaying.current || audioQueue.current.length === 0) return;
isPlaying.current = true;
const audioBlob = audioQueue.current.shift();
if (audioBlob) {
  const audioUrl = URL.createObjectURL(audioBlob);
  const audio = new Audio(audioUrl);
  audio.play();
  audio.onended = () => {
    URL.revokeObjectURL(audioUrl);
    isPlaying.current = false;
    playNextInQueue();
  };
  audio.onerror = (e) => {
    console.error("Audio playback error:", e);
    isPlaying.current = false;
    playNextInQueue();
  };
} else {
  isPlaying.current = false;
}
};

ソースコードについて

ソースコード全体は以下にあるので、ご参照ください。

動かしてみる

それでは実行していきます。実行方法はソースコードのREADME.mdに記載していますので、そちらをご参照ください。

まずはテキスト入力を試してみます。

image.png

うまく応答が返ってきていますね。

続いて、音声を試してみます。
Gifなので、音声再生されている様子をお届けできないのが残念ですが、
音声認識されて、会話できている様子は確認できると思います。

gemini-live.gif

おわりに

今回はフロントエンドがReact、バックエンドがPythonであるMultimodal Live APIのアプリを作ってみました。

FastAPIを触ること、Multimodal Live APIを触ることができたので、非常に勉強になりました。

今後も興味を持ったものをいろいろ触っていこうと思います。

最後まで読んでいただきありがとうございました。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?