1. はじめに
こんにちは!株式会社スタイルアーツの篠木友馬です。2024年10月からインターンとして、スタイルアーツに加えていただいております。
皆さんは、AIといえばChatGPTのようなテキストベースのやり取りを思い浮かべることが多いのではないでしょうか。しかし、OpenAIから公開されたRealtimeAPIを使えば、AIと音声会話ができるシステムを比較的簡単に構築できるようになりました。
本記事では、このRealtimeAPIをPythonで実装し、実際に動かしてみた経験をシェアします。システムプロンプトのカスタマイズによる応答の変化や、実際の会話例なども紹介しながら、皆さんがご自身でAI音声会話システムを構築する際の参考になれば幸いです。
2. RealtimeAPIとは
OpenAIのRealtimeAPIは、2024年10月に公開されたAI音声会話インターフェースです。最大の特徴は、その名の通り「リアルタイム性」にあります。
テキストベースのAIコミュニケーションシステムと違って、RealtimeAPIでは以下のように動作します。
- ユーザーが話し始めるとほぼ同時に音声認識と処理が始まる
- AIは部分的な入力に対しても応答の生成を開始できる
- 応答は生成されるとすぐに返され、会話が途切れることなく進行
RealtimeAPIがもたらす会話体験には、以下の特徴があります。
- 自然な会話感: 人間同士の会話に近い、自然な間合いとフローを実現
- コンテキストの即時理解: 話の途中でもAIが理解を開始し、より文脈に沿った応答が可能
- インタラクティブな修正: ユーザーがリアルタイムでAIの応答方向を調整できる
- 低レイテンシー: 応答待ち時間の大幅な短縮により、ストレスのない対話が可能
- マルチターン対話の効率化: 複数回のやり取りがスムーズに進行
RealtimeAPIを使えば、実会話に近い体験でAIとのコミュニケーションが可能になります!
3. 開発環境と準備
言語: Python 3.12
AIモデル: GPT-4o(OpenAI)
API: RealtimeAPI(OpenAI)
通信プロトコル: WebSocket
処理方式: 非同期処理(Async/Await)
必要なライブラリは以下の通りです。
ライブラリ | 役割 |
---|---|
asyncio | 非同期処理の制御 |
websockets | WebSocketによる双方向通信 |
pyaudio | マイク入力とスピーカー出力の制御 |
queue | データキューの管理 |
threading | マルチスレッド処理 |
4. 開発プロセス
Realtime APIサーバー通信処理
まずは根幹となる、Realtime APIのサーバー通信部分について構築しました。WebSocketという通信プロトコルを用いて、サーバーとイベントを送受信します。イベントやり取りの規則については、OpenAIの公式ドキュメントに詳しい情報が載っています。
重要なイベントをいくつかご紹介します。
イベント | 意味 |
---|---|
response.audio_transcript.delta | AIからの返答が1文字ずつ格納されています。 |
response.done | AIからの応答が終了した合図として送信されます。 |
response.audio.delta | AI返答の音声データが格納されています。 |
input_audio_buffer.speech_started | ユーザーの発話を検知すると送信されます。 |
これらのイベントを受け取り、各イベントごとにアクションを設定する、というのが基本的な内部構造です。
例えば、response.audio_transcript.deltaを受け取った場合、中身のテキストをコマンドラインに出力します。これを繰り返して、AIの返答をテキスト出力しているのです。
## サーバーからの応答をリアルタイム(ストリーム)で表示
if "type" in response_data and response_data["type"] == "response.audio_transcript.delta":
print(response_data["delta"], end = "", flush = True)
音声処理
次に、音声データの送受信です。ユーザーの音声をサーバーに送信する、AI返答の音声データを出力する、の2つの機能が必要です。
ユーザーの発声はマイクで入力、その後エンコードしてinput_audio_buffer.appendというイベントに格納してサーバーに送信します。サーバーから受け取ったAI返答の音声データは、デコードして音声出力します。
ここで、会話のリアリティを高める1つのポイントがあります。それは、会話の割り込みです。今回のシステムでは、AIが発声している最中にユーザーが割り込んで話し掛けることができます!
具体的には、AI返答の音声データを、受け取った際にキューに一旦格納します。そしてキューから再生します。再生中にinput_audio_buffer.speech_startedイベントを受け取った場合(つまりユーザーが発話した場合)、再生をストップするという仕組みになっています。
モデル設定
AIモデルに対して、パラメーターを設定することができます。
init_request = {
"type": "session.update",
"session": {
"modalities": ["audio", "text"],
"instructions": """あなたは基本的に日本語で話します。あなたはユーザーの質問に答えるAIアシスタントです。ユーザーの質問に対して、できるだけ正確で役立つ情報を提供してください。""",
## ボイス設定
"voice": "echo", ## "alloy","echo","shimmer","ash","ballad","coral","sage","verse"から選べる
## ターン検出の設定
"turn_detection": {
"type": "semantic_vad",
},
## 文字起こしに必要
"input_audio_transcription": {
"model": "whisper-1"
},
}
}
-
instructions
:AIモデルの役割を指示することができます。 -
voice
:声色を選択することができます。 -
turn_detection
:ターン検出を設定できます。 -
input_audio_transcription
:ユーザー発話の文字起こしに使うことができます。
特に、instructions
の指示を変更することで、AIモデルをカスタマイズ可能です。
5. 動作
全体のコードはこちらです。
コード
import asyncio
import websockets
import pyaudio
import base64
import os
import json
import queue
import threading
from datetime import datetime
import time
## OpenAIのAPIキー
API_KEY = os.getenv("OPENAI_API_KEY")
if not API_KEY:
raise ValueError("OPENAI_API_KEY環境変数が設定されていません")
## WebSocket URLとヘッダー情報。Websocketに接続してOpenAIのモデルに繋ぐ
# WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01" # 旧バージョン
WS_URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-12-17" # 新バージョン
HEADERS = {
"Authorization": "Bearer "+ API_KEY,
"OpenAI-Beta": "realtime=v1"
}
## キューを初期化
audio_send_queue = queue.Queue()
audio_receive_queue = queue.Queue()
## PCM16形式に変換する関数
def base64_to_pcm16(base64_audio):
audio_data = base64.b64decode(base64_audio)
return audio_data
## ここまでは初期設定----------------------------------------------------------------------------------------------------
## 音声を送信する非同期関数
async def send_audio_from_queue(websocket): ## websocket接続、音声ストリーム、データの読み取り単位サイズが引数
## マイクからCHUNKサイズ分の音声データを同期的に読み取る関数
print("【システム】マイクから音声を取得して送信中...")
while True:
# マイクから音声を取得(先の関数を使って)
audio_data = await asyncio.get_event_loop().run_in_executor(None, audio_send_queue.get)
if audio_data is None:
continue # 読み取りに失敗した場合はスキップ
## PCM16データをBase64にエンコード
base64_audio = base64.b64encode(audio_data).decode("utf-8")
## audio_event辞書に音声データ格納
audio_event = {
"type": "input_audio_buffer.append", ## input_audio_buffer.appendイベント(Client events)を作成
"audio": base64_audio
}
## input_audio_buffer.appendイベントを送信(WebSocketで音声データを送信)
await websocket.send(json.dumps(audio_event))
# キューの処理間隔を少し空ける
await asyncio.sleep(0)
# マイクからの音声を取得しキューに入れる関数
def read_audio_to_queue(stream, CHUNK):
while True:
try:
audio_data = stream.read(CHUNK, exception_on_overflow=False)
audio_send_queue.put(audio_data)
except Exception as e:
print(f"音声読み取りエラー: {e}")
break
## サーバーから音声を受信して再生する非同期関数
async def receive_audio_to_queue(websocket): ## websocket接続と、音声出力用のストリーム
print("=====開始します=====") ## コマンドラインに開始タイミングを出力
print("【AI 】", end = "", flush = True)
##########################################
## セッション設定更新。開始時に設定される。
init_request = {
"type": "session.update",
"session": {
"modalities": ["audio", "text"],
"instructions": """あなたは基本的に日本語で話します。あなたはユーザーの質問に答えるAIアシスタントです。ユーザーの質問に対して、できるだけ正確で役立つ情報を提供してください。
""",
## ボイス設定
"voice": "echo", ## "alloy","echo","shimmer","ash","ballad","coral","sage","verse"から選べる
## ターン検出の設定
"turn_detection": {
"type": "server_vad",
"threshold": 0.5,
},
## 文字起こしに必要
"input_audio_transcription": {
"model": "whisper-1"
},
## ツール
"tools": [
],
## モデルが何のツールを使うか
"tool_choice": "auto",
}
}
await websocket.send(json.dumps(init_request)) ## これでイベントをサーバーに送る
# print("初期リクエストを送信しました")
# # conversation.item.create ユーザーが問いかける内容を設定する(ここでは一番最初のタイミングの設定)
# con_create = {
# "type": "conversation.item.create"
# }
# await websocket.send(json.dumps(con_create))
# response_create 開始時に問いかけてくるようにする
res_create = {
"type": "response.create"
}
await websocket.send(json.dumps(res_create))
while True:
# サーバーからの応答を受信、While Trueで継続的に動作
response = await websocket.recv()
if response:
response_data = json.loads(response)
# イベントを表示(デバッグ用)
# print(response_data["type"])
## サーバーからのデータのtypeに応じて異なる処理をする(Server eventsを処理)
## サーバーからの応答をリアルタイム(ストリーム)で表示
if "type" in response_data and response_data["type"] == "response.audio_transcript.delta":
print(response_data["delta"], end = "", flush = True)
# elif "type" in response_data and response_data["type"] == "response.created":
# print("【AI】")
## サーバからの応答が完了したことを取得
elif "type" in response_data and response_data["type"] == "response.done":
# print("応答完了")
# print("\n(音声受付中…)")
print("\n【AI 】", end = "", flush = True)
## 会話履歴を更新
# print("会話履歴を保存しました")
## ユーザ発話の文字起こしを出力
# elif "type" in response_data and response_data["type"] == "conversation.item.input_audio_transcription.completed":
# print("【ユーザー】", response_data["transcript"])
# print("【AI】")
## レートリミットの情報を取得
# elif "type" in response_data and response_data["type"] == "rate_limits.updated":
# print(f"Rate limits: {response_data['rate_limits'][0]['remaining']} requests remaining.")
#こちらの発話がスタートしたことをサーバが取得したことを確認する
if "type" in response_data and response_data["type"] == "input_audio_buffer.speech_started":
#すでに存在する取得したAI発話音声をリセットする
while not audio_receive_queue.empty():
audio_receive_queue.get()
# サーバーからの音声データをキューに格納
if "type" in response_data and response_data["type"] == "response.audio.delta":
base64_audio_response = response_data["delta"]
if base64_audio_response:
pcm16_audio = base64_to_pcm16(base64_audio_response)
audio_receive_queue.put(pcm16_audio)
await asyncio.sleep(0)
# サーバーからの音声を再生する関数
def play_audio_from_queue(output_stream):
while True:
pcm16_audio = audio_receive_queue.get()
if pcm16_audio:
output_stream.write(pcm16_audio)
## ここまでが音声の送信と受信のタスク(async関数)------------------------------------------------------------------------
## マイクからの音声を取得し、WebSocketで送信しながらサーバーからの音声応答を再生する非同期関数(メインのループ)
async def stream_audio_and_receive_response():
## WebSocketに接続
async with websockets.connect(WS_URL, extra_headers=HEADERS) as websocket: ## このwebsocketに接続することで、OpenAIモデルに繋ぐ
print("【システム】WebSocketに接続しました")
# PyAudioの設定
INPUT_CHUNK = 2400 # マイクからの入力データのチャンクサイズ、データの読み取り単位サイズ
OUTPUT_CHUNK = 2400
FORMAT = pyaudio.paInt16 # PCM16形式
CHANNELS = 1 # モノラル
INPUT_RATE = 24000 # サンプリングレート(24kHz)
OUTPUT_RATE = 24000
# PyAudioインスタンス
p = pyaudio.PyAudio()
# マイクストリームの初期化
stream = p.open(format=FORMAT, channels=CHANNELS, rate=INPUT_RATE, input=True, frames_per_buffer=INPUT_CHUNK)
# サーバーからの応答音声を再生するためのストリームを初期化
output_stream = p.open(format=FORMAT, channels=CHANNELS, rate=OUTPUT_RATE, output=True, frames_per_buffer=OUTPUT_CHUNK)
# マイクの音声読み取りをスレッドで開始
threading.Thread(target=read_audio_to_queue, args=(stream, INPUT_CHUNK), daemon=True).start()
# サーバーからの音声再生をスレッドで開始
threading.Thread(target=play_audio_from_queue, args=(output_stream,), daemon=True).start()
print("【システム】マイク入力およびサーバーからの音声再生を開始...")
try:
# 音声送信タスクと音声受信タスクを非同期で並行実行
send_task = asyncio.create_task(send_audio_from_queue(websocket))
receive_task = asyncio.create_task(receive_audio_to_queue(websocket))
# タスクが終了するまで待機
await asyncio.gather(send_task, receive_task)
except KeyboardInterrupt:
print("終了します...")
finally:
if stream.is_active():
stream.stop_stream()
stream.close()
output_stream.stop_stream()
output_stream.close()
p.terminate()
if __name__ == "__main__":
asyncio.run(stream_audio_and_receive_response())
今回のコードはコマンドライン上で動作します。pythonファイルを実行します。
python main.py
初めにシステムメッセージが表示されます。数秒後、「開始します」との表示と共に、AIが会話を開始します。
ここからはユーザーが好きなタイミングで話し掛けることができます。
今回は例として、京都の観光スポットを教えて
と言ってみましょう。
観光スポットを5つ提案してくれました!さらに金閣寺について詳しく教えて
と言ってみましょう。
具体的な情報を提供してくれました!
以上のような動作となります。今回はコマンドプロンプト上での動作でしたが、UIやユーザー発話の文字起こしを充実させることができれば、上質な会話体験ができそうですね。
6. Realtime APIを体験してみて
今まで、テキスト上でしかAIとやりとりをしたことのなかった自分にとって、この体験は本当に驚きでした!音声を使うことでリアルタイム性が一気に向上し、コミュニケーションのスムーズさはテキストベースとは違うものがあります。コミュニケーション自体の楽しさも改めて感じました。AIは人間を相手にするのとはまた別なので、思ったことをなんでも話せる点も嬉しいです(笑)。会話割り込みの実装には苦労しましたが、この機能が会話体験の向上に寄与する影響は大きいと感じます。
今後はますますAI音声対話技術も発展すると思われます。現状のRealtime APIでは、機械音声感が否めないところですが、ここが進化すればさらに実際の会話や人間同士の会話に近いような体験が可能になるはずです。今後の展開に期待です!
参考
実装したコード
RealtimeAPI公式ドキュメント
参考にした記事