1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

GPU+GenAIでAI電話を作ってみよう

Posted at

はじめに

前回の記事でリアルタイムSTT(Speech-to-text)までできることを確認しました。
今回はその続きで、

  • 生成AIのプロンプトによる自動応答処理と
  • リアルタイムTTS(text-to-speech)

を試してみます。
ここまで実装すると、AI電話サービスが構築可能です。

構成案

image.png

大まかな構成はこちらです。
webサーバは今回は実はgpuサーバ上ですべて構築していますが、実際の開発ではフロントのwebサーバで受けて諸々処理しつつ音声をパススルーするのが良いでしょう。
またTTSの部分はwhisperと同じGPU上で実行していますが、負荷によってはgpuを分けた方が良いです。
なお今回STTは下記voicevoxを使用しました。

ソース

まず前ポストのhtmlに以下を追加します。
サーバからTTSした結果の音声を受信して再生します。

websocket.html
ws.onmessage = function (event) {
    if (event.data instanceof Blob) {
        const audioBlob = event.data;
        const audioUrl = URL.createObjectURL(audioBlob);
        const audio = new Audio(audioUrl);
        audio.play();
    }else{
        console.log("no suppord audio data! ");
    }
};

GenAIのプロンプト含むソースはこちら。
プロンプトにどのように振舞うかを記載しています。

genai.py
import oci
from oci.generative_ai_inference.generative_ai_inference_client import GenerativeAiInferenceClient
from oci.generative_ai_inference.models import (
    ChatDetails,
    OnDemandServingMode,
    CohereChatRequest
)

compartment_id = "ocid1.compartment.oc1..(my_compartment_id)"
endpoint = "https://inference.generativeai.us-chicago-1.oci.oraclecloud.com"
config = oci.config.from_file(file_location='~/.oci/config')

generative_ai_inference_client = GenerativeAiInferenceClient(config=config, service_endpoint=endpoint)

previous_chat_system_message = oci.generative_ai_inference.models.CohereSystemMessage(
    role="SYSTEM", message="""
    あなたは優秀な企業のカスタマーサポートオペレーターです。今回は、お客様からのお問い合わせに基づき、製品の交換手続きを受け付けるタスクを実行します。
    お客様が最初に要件を話します。製品交換以外の場合は、丁重に対応していない旨を伝え、お断りしてください。
    以下の順番で対応してください。
    1. 製品名と購入日:
    「まず、交換を希望される製品名と購入日をお伺いします。」
    2. 交換理由:
    上記が確認できた後、
    「次に、交換を希望される理由を教えてください。」
    3. 購入製品の製造番号:
    上記が確認できた後、
    「続いて、購入された製品の製造番号を教えてください。」
    4. 製造番号の再確認:
    「製造番号は○○ですね」として、上記で聞いた製造番号をリピートして再確認する。
    5. お客様のお名前:
    上記が確認できた後、
    「では次に、お客様のお名前をフルネームで教えてください。」
    6. お客様の電話番号:
    上記が確認できた後、
    「最後に、お客様の電話番号を教えてください。」
    7. 電話番号の再確認:
    「電話番号は○○ですね」として、上記で聞いた電話番号をリピートして再確認する。
    上記6項目が全て確認できたら、以下の通りに回答して終了してください。
    「製品交換のお申し込みを受付いたしました。
    後ほど担当者よりご連絡を差し上げますので、少々お待ちください。
    お電話ありがとうございました。それでは失礼いたします。」
    """)

chat_history = [previous_chat_system_message]

def generate_voice_from_message(question):
    # 新しい質問を追加
    chat_request = CohereChatRequest(
        message=question,
        max_tokens=1000,
        is_stream=False,
        is_echo=True,
    )
    chat_request.chat_history = chat_history

    # API呼び出し
    response = generative_ai_inference_client.chat(
        chat_details=ChatDetails(
            compartment_id=compartment_id,
            serving_mode=OnDemandServingMode(
                model_id="cohere.command-r-plus"
            ),
            chat_request=chat_request
        )
    )

    # 質問と回答をhistoryに追加
    chat_history.append(
        oci.generative_ai_inference.models.CohereUserMessage(
            role="USER", message=question)
    )
    chat_history.append(
        oci.generative_ai_inference.models.CohereChatBotMessage(
            role="CHATBOT", message=response.data.chat_response.text)
    )
    return response.data.chat_response.text

続いて、先ほどのGenAI応答処理を使用してSTT&TTSを実行します。
前回ポストのソースと重複部分が多いですが改めて全部載せます。

whisper.py
import asyncio
import websockets
import io
from faster_whisper import WhisperModel
import numpy as np
import json
from scipy.signal import resample
import webrtcvad
import requests
import time

from genai import generate_voice_from_message

# Whisperモデルのロード
model = WhisperModel("large-v3", device="cuda", compute_type="float32")


class AudioBuffer:
    def __init__(self, sample_rate):
        self.sample_rate = sample_rate
        self.audio_data = bytearray()
        self.vad = webrtcvad.Vad(0)  # 感度レベル(0〜3)
        self.frame_duration = 30  # フレームの長さ(ms)
        self.frame_size = int(self.sample_rate * 2 *
                              self.frame_duration / 1000)  # バイト数(16bitなので×2)
        self.silence_duration = 0.0  # 無音期間(秒)
        self.silence_limit = 0.3  # 無音と判定する秒数
        self.is_speaking = False

    def add_data(self, data):
        self.audio_data.extend(data)
        self.check_silence(data)

    def get_audio(self):
        return self.audio_data

    def reset(self):
        self.audio_data = bytearray()
        self.silence_duration = 0.0
        self.is_speaking = False

    def check_silence(self, data):
        is_speech = False
        # フレームごとにVADを適用
        for i in range(0, len(data) - self.frame_size + 1, self.frame_size):
            frame = data[i:i + self.frame_size]
            if len(frame) == self.frame_size:
                if self.vad.is_speech(frame, self.sample_rate):
                    is_speech = True
                    break  # 音声が検出されたらループを抜ける

        if not is_speech:
            # 無音期間を増加
            duration = len(data) / (self.sample_rate * 2)  # 秒に変換
            self.silence_duration += duration
            if self.silence_duration >= self.silence_limit and self.is_speaking:
                self.is_speaking = False
        else:
            # 音声が検出された場合
            self.silence_duration = 0.0
            self.is_speaking = True


async def audio_handler(websocket, path):
    print("WebSocket connection established, starting real-time transcription.")

    audio_buffer = None

    time.sleep(3)
    welcome_message = "お電話ありがとうございます。オラクル掃除機販売の製品交換専用AI電話です。ご用件をお話ください。"
    print(f"genai={welcome_message}")
    wav_data = request_voicevox(welcome_message)
    if wav_data is not None:
        await websocket.send(wav_data)
        print("Audio synthesis completed successfully and sent to the client.")
    else:
        print("Error during audio synthesis.")

    try:
        while True:
            data = await websocket.recv()

            if isinstance(data, str):
                message = json.loads(data)
                if 'sampleRate' in message:
                    sample_rate = message['sampleRate']
                    audio_buffer = AudioBuffer(sample_rate)
                    print(f"Sample rate received: {sample_rate} Hz")
            elif isinstance(data, bytes):
                if audio_buffer:
                    audio_buffer.add_data(data)

                    # 無音が一定期間続き、かつ話していた場合に文字起こしを行う
                    if not audio_buffer.is_speaking and len(audio_buffer.audio_data) > 0:
                        await transcribe_audio(audio_buffer, websocket)
                        audio_buffer.reset()
                else:
                    print("Audio buffer not initialized yet.")

    except websockets.ConnectionClosed:
        print("WebSocket connection closed")

        if audio_buffer and len(audio_buffer.audio_data) > 0:
            # 残りのデータを文字起こし
            await transcribe_audio(audio_buffer, websocket)

def request_voicevox(genai_resp, speaker=2):
    """VoiceVoxへのリクエスト処理を関数化"""
    query_url = f"http://localhost:50021/audio_query?speaker={speaker}"
    query_response = requests.post(query_url, params={"text": genai_resp})

    if query_response.status_code != 200:
        print(f"Error in query generation: {query_response.status_code}")
        return None, None

    query_data = query_response.json()
    # kana_text = query_data.get("kana", "Kana not found")  # 'kana' の値を取得

    synthesis_url = f"http://localhost:50021/synthesis?speaker={speaker}"
    headers = {"Content-Type": "application/json"}
    synthesis_response = requests.post(
        synthesis_url, headers=headers, data=json.dumps(query_data))

    if synthesis_response.status_code != 200:
        print(f"Error in audio synthesis: {synthesis_response.status_code}")
        return None, None

    wav_data = synthesis_response.content
    return wav_data

async def transcribe_audio(audio_buffer, websocket):
    # PCMデータをfloatに変換
    pcm_data = np.frombuffer(audio_buffer.get_audio(),
                             dtype=np.int16).astype(np.float32) / 32768.0

    # リサンプリング(必要であれば)
    if audio_buffer.sample_rate != 16000:
        duration = len(pcm_data) / audio_buffer.sample_rate
        target_length = int(duration * 16000)
        resampled_data = resample(pcm_data, target_length)
    else:
        resampled_data = pcm_data

    # 音声エネルギーと音量の平均値を計算
    energy, rms = calculate_audio_energy_and_rms(resampled_data)

    # エネルギーが所定未満の場合、出力しない
    if energy < 30:
        return

    # 文字起こしを実行
    segments, info = model.transcribe(
        resampled_data,
        language="ja",
        vad_filter=True,
        beam_size=5,
        best_of=5,
    )

    # 結果を出力
    text_list = []
    for segment in segments:
        text_list.append(segment.text)

    usrmsg = "".join(text_list)
    candidates = ["ご視聴ありがとうございました", "ご視聴ありがとうございました。"]
    if usrmsg in candidates:  # Whisper誤検出対応
        print("Whisper detected noise or incorrect voice")
        return
    print(f"usrmsg={usrmsg}")

    # genaiの処理を入れる
    genai_resp = generate_voice_from_message(usrmsg)
    print(f"genai={genai_resp}")

    # VoiceVoxへのリクエストを関数化したものを呼び出し
    wav_data = request_voicevox(genai_resp)

    if wav_data is not None:
        await websocket.send(wav_data)
        print("Audio synthesis completed successfully and sent to the client.")
    else:
        print("Error during audio synthesis.")


def calculate_audio_energy_and_rms(audio_data):
    """音声データのエネルギーとRMSを計算"""
    energy = np.sum(audio_data ** 2)  # エネルギーは振幅の二乗の総和
    # rms = np.sqrt(np.mean(audio_data ** 2))  # RMSは二乗平均平方根
    rms = 0
    return energy, rms


# サーバーの開始
start_server = websockets.serve(audio_handler, "0.0.0.0", 5000)

# イベントループの実行
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

TTSは色々ありますが、voicevoxは公式サイトを見て頂くと利用構築方法がわかりやすく記載あり、
また音声も魅力的なものが沢山あるので試していて面白かったです。
今回はspeaker=2を指定しました(VOICEVOX:四国めたん)

また、

candidates = ["ご視聴ありがとうございました", "ご視聴ありがとうございました。"]

こちらの部分はwhisperがおそらく微小な雑音を拾ってこのワードで文字列化してしまうことがあったので決め打ちで除去しています。。

実行結果

これらを一通り構築するとAI電話が構築できます。実際の音声付きの結果を載せたいところですが難しいのでやり取りのスクリプトのみ下記に載せます。かなり自然な流れですね。

やり取り
システム: お電話ありがとうございます。オラクル掃除機販売の製品交換専用AI電話です。ご用件をお話ください。
あなた: 掃除機の交換をしたいんですが
システム: では、製品交換の手続きを承ります。交換を希望される製品と状況について、いくつか質問をさせていただきますので、ご回答をお願いいたします。
まず、交換を希望される製品名と購入日をお伺いします。
あなた: オラクルクリーナー101で、買った日は2024年11月5日です
システム: 次に、交換を希望される理由を教えてください。製品に不具合などございましたか?
あなた: 特に落としたりしてないんですが、電源がつかなくなった感じですね
システム: ご不便をおかけして申し訳ございません。では、引き続き手続きを進めさせていただきます。
続いて、購入された製品の製造番号を教えてください。保証書などに記載されている場合がございますので、そちらをご確認ください。
あなた: 番号は3673-3673です
システム: 製造番号は3673-3673ですね。では次に、お客様のお名前をフルネームで教えてください。
あなた: 山田太郎です
システム: では、最後に、お客様の電話番号を教えてください。
あなた: 070-1234-5678です
システム: 電話番号は070-1234-5678ですね。
製品交換のお申し込みを受付いたしました。
後ほど担当者よりご連絡を差し上げますので、少々お待ちください。
お電話ありがとうございました。それでは失礼いたします。

あとはポスト処理でこのやり取りを別途GenAIにかけることで必要項目を抽出できます。

中間連携データ
{
  "product_name": "オラクルクリーナー101",
  "purchase_date": "2024-11-05",
  "exchange_reason": "電源がつかなくなった",
  "serial_number": "3673-3673",
  "customer_name": "山田太郎",
  "phone_number": "070-1234-5678"
}

AI電話の利用シーン

用途は上のサンプルのように自社のカスタマーサポート窓口もそうですが、SaaSとしても展開できそうな内容です。
050等の電話音声を受電する部分はおそらくクラウドPBXが別途必要になってくると思います。
image.png
また今回はAI電話として上半分「GPU+GenAIによるAI電話」部分を構築しただけですが、実際のケースでは下半分の通り、下記も可能と思います。

  • 先ほどのjsonデータをデータベースに入れてユーザ管理システムを構築
  • 必要に応じてfunctionでLINEやSMS, slack通知のnext actionを実行
  • 対話ログデータについてはRAGでの検索や、生成AIを使用して過去通話を参照した上での応対ができるなどできそうです。

おわりに

GPU+GenAIを利用してAI電話を構築してみました。
GPUは主にWhisperの推論環境として利用していますが、Whisperの高精度モデルlarge-v3をリアルタイム性を持って動作させるにはやはりGPUが必要になってきます。
またAI電話の頭脳部分にGenAIを用いることで、元々LLMが持つ言語感覚とプロンプトエンジニアリングによる指示系統によりかなり複雑な応答を対応できるので非常に検証していて面白いです。
AI電話でなくても、GenAI部分だけ利用してチャットボットとしても使えます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?