1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonでgpt-4o-realtime-previewを動かしてみた

Posted at

本記事の目的

とりあえずOpenAIのAPIキーを使ってGPT-4o-Realtime Previewを動かしてみよー.という記事です.
Pythonのプログラムを動かしたことがあり,OpenAIのキーを持っていれば簡単に動かせるかなと思います.
円滑なコミュニケーションを行わせるために、コードはChatGPTと相談しつつ,自己流にした部分もあります.

GPT-4o Realtime Previewとは

入力:音声 → 出力:音声 のリアルタイムな会話を高速に行うことができるモデルです.
音声入力とテキスト入力の両方に対応し,WebRTC または WebSocket インターフェースを通じてリアルタイムに応答できるのが特徴です.

事前準備 → プログラム実行まで

・ 今回のプログラムを動かすための仮想環境を作成し,有効化する.
(Pythonのバージョンは3.9で動きました.)

conda create -n gptrealtime python=3.9
conda activate gptrealtime

・OpenAIのAPIキーを環境変数に入れておく.
(APIキーをコードに直書きすることは非推奨のため.)

setx OPENAI_API_KEY "sk-proj-xxxxxxxxxxxxxxxx"
$env:OPENAI_API_KEY = "sk-proj-xxxxxxxxxxxxxxxx"

※ "sk-proj-xxxxxxxxxxxxxxxx"の部分にご自身のAPIキーを入れてください.


・ターミナルを一度再起動し,環境変数が反映されているかを確認する.
(以下のコードを実行したときに,先ほど設定したAPIキーが出力されればOK)

echo $env:OPENAI_API_KEY

・再度仮想環境を有効化し,プログラムを動かすために必要なライブラリのインストールを行う.

conda activate gptrealtime
pip install websocket-client requests sounddevice numpy

・以下のプログラムをコピー&ペーストすれば動くはず…!

ws_realtime_dialog.py
# 割り込み対応・低遅延: 到着即再生 + 200ms送信 + barge-inゲート(キャンセル完了まで徹底ミュート)
import os, json, base64, time, traceback, threading
import websocket
import sounddevice as sd
import numpy as np

API_KEY = os.environ.get("OPENAI_API_KEY")
URL = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview"

# ====== 録音 / VAD 設定(やや敏感) ======
SR = 16000
FRAME_MS = 20
FRAME_SAMPLES = SR * FRAME_MS // 1000
VAD_START_DB = -50        # 自分の声を素早く拾う(-50〜-45 推奨)
VAD_STOP_DB  = -55
VAD_TAIL_MS  = 300        # 無音300msで区切る(誤停止が多ければ 450〜700 に)
MAX_UTT_SEC  = 10.0

# ====== モデル音声 出力 ======
OUT_SR = 24000
OUT_CH = 1

# ====== 返答形式 ======
TEXT_ONLY = False         # True: テキストのみ / False: 音声+字幕

# ====== 状態・バッファ ======
transcript_buf = []
text_buf = []

play_stream = None
play_lock = threading.Lock()

model_generating = False      # サーバー側が応答生成中か
barge_mode = False            # 割り込み中(キャンセル完了までデルタを無視)
closing = False

# ====== ユーティリティ ======
def rms_dbfs(x: np.ndarray) -> float:
    if len(x) == 0: return -120.0
    x = x.astype(np.float32) / 32768.0
    ms = np.mean(x*x) + 1e-12
    return 10.0 * np.log10(ms)

def b64_pcm(frames: np.ndarray) -> str:
    if frames.dtype != np.int16:
        frames = frames.astype(np.int16)
    return base64.b64encode(frames.tobytes()).decode("utf-8")

# ====== 再生制御(到着即再生 / 低レイテンシ) ======
def audio_play_chunk(b: bytes):
    global play_stream
    with play_lock:
        if play_stream is None:
            play_stream = sd.OutputStream(
                samplerate=OUT_SR, channels=OUT_CH, dtype='int16',
                latency='low', blocksize=max(OUT_SR // 100, 240)  # 約10ms分
            )
            play_stream.start()
        play_stream.write(np.frombuffer(b, dtype=np.int16))

def audio_stop():
    global play_stream
    with play_lock:
        if play_stream is not None:
            try:
                play_stream.stop(); play_stream.close()
            finally:
                play_stream = None

def is_playing() -> bool:
    return play_stream is not None

# ====== barge-in:割り込み時の処理 ======
def barge_in(ws):
    """話し始めた瞬間に呼ぶ:再生停止+応答生成キャンセル+以降のデルタ無視"""
    global model_generating, barge_mode
    # 1) ローカル再生を即停止
    audio_stop()
    # 2) 以降の出力デルタはキャンセル確定まで無視
    barge_mode = True
    # 3) サーバーの進行中応答をキャンセル
    try:
        ws.send(json.dumps({"type": "response.cancel"}))
    except Exception:
        pass
    model_generating = False

# ====== 録音(簡易VAD) ======
def record_one_utterance(ws) -> np.ndarray:
    """
    一発話を録音して返す。
    ユーザが話し始めた瞬間に barge-in を発動:再生停止&応答キャンセル&ゲートON
    """
    print("🎙️ 話してください(停止:短い無音)...")
    frames = []
    silence_ms = 0
    started = False
    total_ms = 0

    stream = sd.InputStream(samplerate=SR, channels=1, dtype='int16', blocksize=FRAME_SAMPLES)
    with stream:
        t0 = time.time()
        while True:
            block, _ = stream.read(FRAME_SAMPLES)
            db = rms_dbfs(block)
            total_ms = (time.time() - t0) * 1000.0

            if not started:
                # 自分の声を検知 → 即barge-in
                if db > VAD_START_DB:
                    started = True
                    if is_playing() or model_generating:
                        barge_in(ws)
                    frames.append(block)
            else:
                frames.append(block)
                if db < VAD_STOP_DB:
                    silence_ms += FRAME_MS
                else:
                    silence_ms = 0
                if silence_ms >= VAD_TAIL_MS:
                    break

            if total_ms > MAX_UTT_SEC * 1000:
                print("⌛ 上限秒に達したため送信します。")
                break

    if len(frames) == 0:
        return np.zeros((0, 1), dtype='int16')
    audio = np.concatenate(frames, axis=0)
    print(f"⏺️ 録音完了: {len(audio)/SR:.2f}s")
    return audio

# ====== WebSocket ハンドラ ======
def on_open(ws):
    print("[OPEN] connected. いつでも話しかけてOK!")

def on_message(ws, message):
    global transcript_buf, text_buf, model_generating, barge_mode
    try:
        evt = json.loads(message)
    except Exception:
        print("[on_message] 非JSON:", message[:160]); return

    t = evt.get("type")
    if t:
        print(f"[EVT] {t}")

    # 生成フラグ管理
    if t in ("response.created", "response.output_item.added"):
        model_generating = True
    if t in ("response.done", "response.cancelled"):
        model_generating = False
        # ★ キャンセル完了/応答完了でゲート解除
        if barge_mode:
            print("[barge] gate off")
        barge_mode = False
        # 念のため再生は止めておく
        audio_stop()

    # ---- テキスト(TEXT_ONLY=Trueのとき) ----
    if t == "text.delta" and "delta" in evt:
        text_buf.append(evt["delta"])
    if t in ("text.completed", "response.output_text.done"):
        line = "".join(text_buf).strip()
        if line:
            print("\n[TEXT]", line)
        text_buf.clear()

    # ---- 字幕(音声応答テキスト)----
    if t == "response.output_audio_transcript.delta" and "delta" in evt:
        if not barge_mode:  # 割り込み中は字幕も無視
            transcript_buf.append(evt["delta"])
    if t == "response.output_audio_transcript.done":
        if not barge_mode:
            line = "".join(transcript_buf).strip()
            if line:
                print("\n[TRANSCRIPT]", line)
        transcript_buf.clear()

    # ---- 音声本体(到着即再生)----
    if t == "response.output_audio.delta":
        if not barge_mode:  # ★ 割り込み中は受信しても再生しない
            b64 = evt.get("delta") or evt.get("audio")
            if b64:
                audio_play_chunk(base64.b64decode(b64))

    if t == "response.output_audio.done":
        audio_stop()

    # ---- エラー詳細 ----
    if t == "error":
        print("[EVT error payload]", evt)

def on_error(ws, err):
    if not closing:
        print("[ERROR]", err)

def on_close(ws, *args):
    print("[CLOSE]", args)
    audio_stop()

# ====== 送信(200msチャンク) ======
def send_audio_turn(ws, audio_pcm: np.ndarray):
    """append(200ms刻み) → commit → response.create"""
    if audio_pcm.size == 0:
        print("(無音のためスキップ)"); return

    # 1) 200ms チャンクで append
    CH_MS = 200
    ch_samples = SR * CH_MS // 1000
    total = len(audio_pcm)
    sent = 0
    while sent < total:
        chunk = audio_pcm[sent: sent+ch_samples]
        ws.send(json.dumps({
            "type": "input_audio_buffer.append",
            "audio": b64_pcm(chunk)
        }))
        sent += len(chunk)

    # 2) commit
    ws.send(json.dumps({"type": "input_audio_buffer.commit"}))

    # 3) 応答要求(短く即答)
    payload = {
        "type": "response.create",
        "response": {
            "instructions": (
                "今受け取った内容に、日本語で自然に即答してください。"
                "文量は1〜2文、なるべく短く。"
            )
        }
    }
    if TEXT_ONLY:
        payload["modalities"] = ["text"]  # テキストのみ

    ws.send(json.dumps(payload))
    print("📤 送信(200msチャンク)→ commit 済み。応答待ち...")

# ====== 会話ループ ======
def run_conversation_loop(ws):
    print("=== マルチターン会話(barge-in対応・低遅延)開始 ===")
    print("割り込みたい時は、そのまま話し始めてください。")
    while True:
        try:
            audio = record_one_utterance(ws)
            send_audio_turn(ws, audio)
            time.sleep(0.05)
        except KeyboardInterrupt:
            print("\n👋 終了します。")
            break
        except Exception:
            traceback.print_exc(); time.sleep(0.3)

if __name__ == "__main__":
    if not API_KEY:
        print("環境変数 OPENAI_API_KEY が未設定です。"); raise SystemExit(1)

    closing = False

    # 必要なら入出力デバイス指定(例):sd.default.device = (input_dev, output_dev)
    ws = websocket.WebSocketApp(
        URL,
        header=[f"Authorization: Bearer {API_KEY}"],
        on_open=on_open, on_message=on_message,
        on_error=on_error, on_close=on_close
    )
    websocket.enableTrace(False)

    t = threading.Thread(target=lambda: ws.run_forever(), daemon=True)
    t.start()
    time.sleep(0.6)
    try:
        run_conversation_loop(ws)
    finally:
        closing = True
        audio_stop()
        ws.close()

まとめ

ひとまず,gpt-4o-realtime-previewを問題なく動かせるようにした過程を記事にまとめてみました.あとは、このモデルにいろいろプロンプトを与えてみて,対話システムをカスタマイズしていくのも面白そうだなーと思っています.

1
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?