本記事の目的
とりあえずOpenAIのAPIキーを使ってGPT-4o-Realtime Previewを動かしてみよー.という記事です.
Pythonのプログラムを動かしたことがあり,OpenAIのキーを持っていれば簡単に動かせるかなと思います.
円滑なコミュニケーションを行わせるために、コードはChatGPTと相談しつつ,自己流にした部分もあります.
GPT-4o Realtime Previewとは
入力:音声 → 出力:音声 のリアルタイムな会話を高速に行うことができるモデルです.
音声入力とテキスト入力の両方に対応し,WebRTC または WebSocket インターフェースを通じてリアルタイムに応答できるのが特徴です.
事前準備 → プログラム実行まで
・ 今回のプログラムを動かすための仮想環境を作成し,有効化する.
(Pythonのバージョンは3.9で動きました.)
conda create -n gptrealtime python=3.9
conda activate gptrealtime
・OpenAIのAPIキーを環境変数に入れておく.
(APIキーをコードに直書きすることは非推奨のため.)
setx OPENAI_API_KEY "sk-proj-xxxxxxxxxxxxxxxx"
$env:OPENAI_API_KEY = "sk-proj-xxxxxxxxxxxxxxxx"
※ "sk-proj-xxxxxxxxxxxxxxxx"の部分にご自身のAPIキーを入れてください.
・ターミナルを一度再起動し,環境変数が反映されているかを確認する.
(以下のコードを実行したときに,先ほど設定したAPIキーが出力されればOK)
echo $env:OPENAI_API_KEY
・再度仮想環境を有効化し,プログラムを動かすために必要なライブラリのインストールを行う.
conda activate gptrealtime
pip install websocket-client requests sounddevice numpy
・以下のプログラムをコピー&ペーストすれば動くはず…!
# 割り込み対応・低遅延: 到着即再生 + 200ms送信 + barge-inゲート(キャンセル完了まで徹底ミュート)
import os, json, base64, time, traceback, threading
import websocket
import sounddevice as sd
import numpy as np
API_KEY = os.environ.get("OPENAI_API_KEY")
URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview"
# ====== 録音 / VAD 設定(やや敏感) ======
SR = 16000
FRAME_MS = 20
FRAME_SAMPLES = SR * FRAME_MS // 1000
VAD_START_DB = -50 # 自分の声を素早く拾う(-50〜-45 推奨)
VAD_STOP_DB = -55
VAD_TAIL_MS = 300 # 無音300msで区切る(誤停止が多ければ 450〜700 に)
MAX_UTT_SEC = 10.0
# ====== モデル音声 出力 ======
OUT_SR = 24000
OUT_CH = 1
# ====== 返答形式 ======
TEXT_ONLY = False # True: テキストのみ / False: 音声+字幕
# ====== 状態・バッファ ======
transcript_buf = []
text_buf = []
play_stream = None
play_lock = threading.Lock()
model_generating = False # サーバー側が応答生成中か
barge_mode = False # 割り込み中(キャンセル完了までデルタを無視)
closing = False
# ====== ユーティリティ ======
def rms_dbfs(x: np.ndarray) -> float:
if len(x) == 0: return -120.0
x = x.astype(np.float32) / 32768.0
ms = np.mean(x*x) + 1e-12
return 10.0 * np.log10(ms)
def b64_pcm(frames: np.ndarray) -> str:
if frames.dtype != np.int16:
frames = frames.astype(np.int16)
return base64.b64encode(frames.tobytes()).decode("utf-8")
# ====== 再生制御(到着即再生 / 低レイテンシ) ======
def audio_play_chunk(b: bytes):
global play_stream
with play_lock:
if play_stream is None:
play_stream = sd.OutputStream(
samplerate=OUT_SR, channels=OUT_CH, dtype='int16',
latency='low', blocksize=max(OUT_SR // 100, 240) # 約10ms分
)
play_stream.start()
play_stream.write(np.frombuffer(b, dtype=np.int16))
def audio_stop():
global play_stream
with play_lock:
if play_stream is not None:
try:
play_stream.stop(); play_stream.close()
finally:
play_stream = None
def is_playing() -> bool:
return play_stream is not None
# ====== barge-in:割り込み時の処理 ======
def barge_in(ws):
"""話し始めた瞬間に呼ぶ:再生停止+応答生成キャンセル+以降のデルタ無視"""
global model_generating, barge_mode
# 1) ローカル再生を即停止
audio_stop()
# 2) 以降の出力デルタはキャンセル確定まで無視
barge_mode = True
# 3) サーバーの進行中応答をキャンセル
try:
ws.send(json.dumps({"type": "response.cancel"}))
except Exception:
pass
model_generating = False
# ====== 録音(簡易VAD) ======
def record_one_utterance(ws) -> np.ndarray:
"""
一発話を録音して返す。
ユーザが話し始めた瞬間に barge-in を発動:再生停止&応答キャンセル&ゲートON
"""
print("🎙️ 話してください(停止:短い無音)...")
frames = []
silence_ms = 0
started = False
total_ms = 0
stream = sd.InputStream(samplerate=SR, channels=1, dtype='int16', blocksize=FRAME_SAMPLES)
with stream:
t0 = time.time()
while True:
block, _ = stream.read(FRAME_SAMPLES)
db = rms_dbfs(block)
total_ms = (time.time() - t0) * 1000.0
if not started:
# 自分の声を検知 → 即barge-in
if db > VAD_START_DB:
started = True
if is_playing() or model_generating:
barge_in(ws)
frames.append(block)
else:
frames.append(block)
if db < VAD_STOP_DB:
silence_ms += FRAME_MS
else:
silence_ms = 0
if silence_ms >= VAD_TAIL_MS:
break
if total_ms > MAX_UTT_SEC * 1000:
print("⌛ 上限秒に達したため送信します。")
break
if len(frames) == 0:
return np.zeros((0, 1), dtype='int16')
audio = np.concatenate(frames, axis=0)
print(f"⏺️ 録音完了: {len(audio)/SR:.2f}s")
return audio
# ====== WebSocket ハンドラ ======
def on_open(ws):
print("[OPEN] connected. いつでも話しかけてOK!")
def on_message(ws, message):
global transcript_buf, text_buf, model_generating, barge_mode
try:
evt = json.loads(message)
except Exception:
print("[on_message] 非JSON:", message[:160]); return
t = evt.get("type")
if t:
print(f"[EVT] {t}")
# 生成フラグ管理
if t in ("response.created", "response.output_item.added"):
model_generating = True
if t in ("response.done", "response.cancelled"):
model_generating = False
# ★ キャンセル完了/応答完了でゲート解除
if barge_mode:
print("[barge] gate off")
barge_mode = False
# 念のため再生は止めておく
audio_stop()
# ---- テキスト(TEXT_ONLY=Trueのとき) ----
if t == "text.delta" and "delta" in evt:
text_buf.append(evt["delta"])
if t in ("text.completed", "response.output_text.done"):
line = "".join(text_buf).strip()
if line:
print("\n[TEXT]", line)
text_buf.clear()
# ---- 字幕(音声応答テキスト)----
if t == "response.output_audio_transcript.delta" and "delta" in evt:
if not barge_mode: # 割り込み中は字幕も無視
transcript_buf.append(evt["delta"])
if t == "response.output_audio_transcript.done":
if not barge_mode:
line = "".join(transcript_buf).strip()
if line:
print("\n[TRANSCRIPT]", line)
transcript_buf.clear()
# ---- 音声本体(到着即再生)----
if t == "response.output_audio.delta":
if not barge_mode: # ★ 割り込み中は受信しても再生しない
b64 = evt.get("delta") or evt.get("audio")
if b64:
audio_play_chunk(base64.b64decode(b64))
if t == "response.output_audio.done":
audio_stop()
# ---- エラー詳細 ----
if t == "error":
print("[EVT error payload]", evt)
def on_error(ws, err):
if not closing:
print("[ERROR]", err)
def on_close(ws, *args):
print("[CLOSE]", args)
audio_stop()
# ====== 送信(200msチャンク) ======
def send_audio_turn(ws, audio_pcm: np.ndarray):
"""append(200ms刻み) → commit → response.create"""
if audio_pcm.size == 0:
print("(無音のためスキップ)"); return
# 1) 200ms チャンクで append
CH_MS = 200
ch_samples = SR * CH_MS // 1000
total = len(audio_pcm)
sent = 0
while sent < total:
chunk = audio_pcm[sent: sent+ch_samples]
ws.send(json.dumps({
"type": "input_audio_buffer.append",
"audio": b64_pcm(chunk)
}))
sent += len(chunk)
# 2) commit
ws.send(json.dumps({"type": "input_audio_buffer.commit"}))
# 3) 応答要求(短く即答)
payload = {
"type": "response.create",
"response": {
"instructions": (
"今受け取った内容に、日本語で自然に即答してください。"
"文量は1〜2文、なるべく短く。"
)
}
}
if TEXT_ONLY:
payload["modalities"] = ["text"] # テキストのみ
ws.send(json.dumps(payload))
print("📤 送信(200msチャンク)→ commit 済み。応答待ち...")
# ====== 会話ループ ======
def run_conversation_loop(ws):
print("=== マルチターン会話(barge-in対応・低遅延)開始 ===")
print("割り込みたい時は、そのまま話し始めてください。")
while True:
try:
audio = record_one_utterance(ws)
send_audio_turn(ws, audio)
time.sleep(0.05)
except KeyboardInterrupt:
print("\n👋 終了します。")
break
except Exception:
traceback.print_exc(); time.sleep(0.3)
if __name__ == "__main__":
if not API_KEY:
print("環境変数 OPENAI_API_KEY が未設定です。"); raise SystemExit(1)
closing = False
# 必要なら入出力デバイス指定(例):sd.default.device = (input_dev, output_dev)
ws = websocket.WebSocketApp(
URL,
header=[f"Authorization: Bearer {API_KEY}"],
on_open=on_open, on_message=on_message,
on_error=on_error, on_close=on_close
)
websocket.enableTrace(False)
t = threading.Thread(target=lambda: ws.run_forever(), daemon=True)
t.start()
time.sleep(0.6)
try:
run_conversation_loop(ws)
finally:
closing = True
audio_stop()
ws.close()
まとめ
ひとまず,gpt-4o-realtime-previewを問題なく動かせるようにした過程を記事にまとめてみました.あとは、このモデルにいろいろプロンプトを与えてみて,対話システムをカスタマイズしていくのも面白そうだなーと思っています.