3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【OpenAI × twilio】RealtimeAPIでお悩み電話相談室を作って相談してみた

Last updated at Posted at 2024-12-17

OpenAIの新しいRealtimeAPIを使って、電話での対話システムを実装しました。今回は「お悩み相談室」としてGo言語で作ってみましたので、実装の詳細を順を追って解説していきます。説明の途中ではコードを省略しているところがあります。記事の最後にサンプルコードを載せていますので適宜参照しながらご覧ください。まだ改善の余地はありますが、とりあえず動かして遊んでみたいという人の手助けになれば幸いです。

完成イメージ

1. イベント定義とシステム定数

まず、OpenAIとのWebSocket通信で使用するイベントを定義します。サーバーからのイベントとクライアントからのイベント、それぞれを以下のように定義しています。

// Server event types from OpenAI
const (
    EventError                              = "error"
    EventSessionCreated                     = "session.created"
    EventSessionUpdated                     = "session.updated"
    EventConversationCreated                = "conversation.created"
    // ...その他のイベント
)

// Client event types to OpenAI
const (
    EventSessionUpdate            = "session.update"
    EventInputAudioBufferAppend   = "input_audio_buffer.append"
    EventInputAudioBufferCommit   = "input_audio_buffer.commit"
    // ...その他のイベント
)

これらのイベントは、音声認識から応答生成まで、システム全体の通信フローを制御します。例えば、EventInputAudioBufferAppendは音声データの送信に、EventResponseAudioDeltaは音声データの受信に使用されます。

2. 基本構造体の定義

システムの設定と状態管理のために、以下の構造体を定義しています。

type Config struct {
    Port         string
    OpenAIKey    string
    SystemPrompt string
    Voice        string
}

type ConversationState struct {
    streamSid            string
    currentDialog        string
    latestMediaTimestamp int64
    lastAssistantItem    string
    mu                   sync.Mutex
}

Config構造体は環境変数から読み込まれる設定を管理し、ConversationStateは会話の状態を追跡します。特にConversationStateでは、ミューテックスを使用して並行処理時のデータ整合性を保護しています。

3. OpenAIクライアントの実装

OpenAIとの通信を担当するクライアントは以下のように実装しています。

type OpenAIClient struct {
    conn   *websocket.Conn
    config *Config
}

func NewOpenAIClient(conn *websocket.Conn, config *Config) *OpenAIClient {
    return &OpenAIClient{
        conn:   conn,
        config: config,
    }
}

func (c *OpenAIClient) SendSessionUpdate(config SessionConfig) error {
    event := struct {
        ClientEvent
        Session SessionConfig `json:"session"`
    }{
        ClientEvent: ClientEvent{
            Type: EventSessionUpdate,
        },
        Session: config,
    }
    return c.conn.WriteJSON(event)
}

このクライアントは、WebSocket接続とシステム設定を保持し、OpenAIとのメッセージのやり取りを管理します。

4. メイン処理フロー

メディアストリームの処理は以下のように実装しています。

func handleMediaStream(twilioConn *fiberWs.Conn) {
    state := &ConversationState{}
    
    config, err := loadConfig()
    if err != nil {
        log.Printf("Failed to load configuration: %v", err)
        return
    }

    client, err := connectToOpenAI(config)
    if err != nil {
        log.Printf("Failed to connect to OpenAI: %v", err)
        return
    }
    defer client.conn.Close()

    done := make(chan struct{})
    go handleTwilioMessages(twilioConn, client, state, done)
    go handleOpenAIMessages(twilioConn, client.conn, state, done)

    <-done
}

この関数は、TwilioとOpenAI間の音声データの橋渡しを行います。2つのgoroutineを使用して、それぞれの方向のメッセージ処理を並行して行います。

5. 音声データの処理

音声データの処理は、以下のように実装しています。

func handleTwilioMedia(data map[string]interface{}, client *OpenAIClient, state *ConversationState) {
    if media, ok := data["media"].(map[string]interface{}); ok {
        state.mu.Lock()
        if timestamp, ok := media["timestamp"].(float64); ok {
            state.latestMediaTimestamp = int64(timestamp)
        }
        state.mu.Unlock()

        if payload, ok := media["payload"].(string); ok {
            audioData, err := base64.StdEncoding.DecodeString(payload)
            if err != nil {
                log.Printf("Error decoding audio: %v", err)
                return
            }

            if err := client.SendAudioBuffer(audioData); err != nil {
                log.Printf("Error sending audio to OpenAI: %v", err)
            }
        }
    }
}

このコードでは、Twilioから受け取った音声データをデコードし、OpenAIに送信しています。タイムスタンプの管理も行い、会話の時系列を適切に保持します。

6. OpenAIからのレスポンス処理

OpenAIからの応答は、イベントタイプに応じて異なる処理を行います。

func handleOpenAIMessages(twilioConn *fiberWs.Conn, openaiConn *websocket.Conn, state *ConversationState, done chan struct{}) {
    for {
        select {
        case <-done:
            return
        default:
            _, msg, err := openaiConn.ReadMessage()
            if isConnectionClosed(err) {
                close(done)
                return
            }

            var response map[string]interface{}
            if err := json.Unmarshal(msg, &response); err != nil {
                log.Printf("Error parsing OpenAI message: %v", err)
                continue
            }

            eventType, _ := response["type"].(string)
            switch eventType {
            case EventError:
                handleError(response)
            case EventSessionCreated:
                handleSessionCreated(response)
            case EventConversationItemCreated:
                handleConversationItem(response, state)
            case EventResponseAudioDelta:
                handleAudioDelta(response, twilioConn, state)
            // ... その他のイベント処理
            }
        }
    }
}

各イベントタイプに対する具体的な処理も実装しています。

func handleAudioDelta(response map[string]interface{}, twilioConn *fiberWs.Conn, state *ConversationState) {
    if delta, ok := response["delta"].(string); ok {
        audioBytes, _ := base64.StdEncoding.DecodeString(delta)
        audioPayload := base64.StdEncoding.EncodeToString(audioBytes)

        state.mu.Lock()
        if itemID, ok := response["item_id"].(string); ok {
            state.lastAssistantItem = itemID
        }
        streamSid := state.streamSid
        state.mu.Unlock()

        err := twilioConn.WriteJSON(map[string]interface{}{
            "event":     "media",
            "streamSid": streamSid,
            "media": map[string]interface{}{
                "payload": audioPayload,
            },
        })
        if err != nil {
            log.Printf("Error sending audio delta: %v", err)
        }
    }
}

7. エラーハンドリングと接続管理

システムの安定性を確保するため、エラーハンドリングも実装しています。

func handleError(response map[string]interface{}) {
    if err, ok := response["error"].(map[string]interface{}); ok {
        log.Printf("OpenAI Error: Type=%s, Code=%s, Message=%s",
            err["type"], err["code"], err["message"])
    }
}

func isConnectionClosed(err error) bool {
    return err != nil && websocket.IsUnexpectedCloseError(
        err,
        websocket.CloseGoingAway,
        websocket.CloseNormalClosure,
    )
}

また、Twilio側との接続確立も以下のように実装しています。ここでは、自動音声の前にPausePlayで間隔を開けて小さな音をならすことで、自動音声の最初が途切れてしまう問題を解決しています。

func handleIncomingCall(c *fiber.Ctx) error {
    twiml := fmt.Sprintf(`<?xml version="1.0" encoding="UTF-8"?>
        <Response>
            <Pause length="2"/>
            <Play digits="1" volume="-35"/>
            <Pause length="0.5"/>
            <Say language="ja-JP">AIアシスタントが対応いたします。</Say>
            <Connect>
                <Stream url="wss://%s/media-stream"/>
            </Connect>
        </Response>`, c.Hostname())

    c.Set("Content-Type", "application/xml")
    return c.SendString(twiml)
}

8. 設定管理

環境変数からの設定読み込みも実装しています。

func loadConfig() (*Config, error) {
    if err := godotenv.Load(); err != nil {
        log.Printf("Warning: Error loading .env file: %v", err)
    }

    openaiAPIKey := os.Getenv("OPENAI_API_KEY")
    if openaiAPIKey == "" {
        return nil, fmt.Errorf("OPENAI_API_KEY is required")
    }

    return &Config{
        Port:         getEnvOrDefault("PORT", defaultPort),
        OpenAIKey:    openaiAPIKey,
        SystemPrompt: getEnvOrDefault("OPENAI_SYSTEM_PROMPT", ""),
        Voice:        getEnvOrDefault("OPENAI_VOICE", defaultVoice),
    }, nil
}

このように、RealtimeAPIを使用した電話対話システムでは、WebSocketによる双方向通信、音声データの処理、状態管理、エラーハンドリングなどを意識して実装する必要があります。特に並行処理の制御と安全な状態管理が重要なポイントとなっています。

9. お悩み相談室としての設定

これまでの技術的な実装に加えて、AIをお悩み相談室として機能させるための設定を行います。プロジェクトのルートディレクトリに.envファイルを作成し、以下の設定を追加します。

基本設定

PORT=5050
OPENAI_API_KEY="xxxxxxxxxxxxxxxxxxx"
OPENAI_VOICE="sage"

AIカウンセラーの設定

OPENAI_SYSTEM_PROMPT="
あなたは経験豊富な心理カウンセラーとして、以下の原則に従って相談者の悩みに寄り添い、適切なアドバイスを提供します:

# 基本姿勢
- 相談者に対して常に温かく、共感的な態度で接する
- 批判や否定をせず、相談者の気持ちを受け止める
- プロフェッショナルとして冷静さを保ちながらも、人間味のある対応を心がける

[... 以下、プロンプトの詳細設定 ...]"

.envのサンプル

.envのサンプル
PORT=5050

OPENAI_API_KEY="xxxxxxxxxxxxxxxxxxx"

OPENAI_VOICE="sage"
OPENAI_SYSTEM_PROMPT="
あなたは経験豊富な心理カウンセラーとして、以下の原則に従って相談者の悩みに寄り添い、適切なアドバイスを提供します:

# 基本姿勢
- 相談者に対して常に温かく、共感的な態度で接する
- 批判や否定をせず、相談者の気持ちを受け止める
- プロフェッショナルとして冷静さを保ちながらも、人間味のある対応を心がける

# 会話の進め方
1. 最初の挨拶
   「お悩み相談室です。どのようなご相談でしょうか?」

2. 傾聴のフェーズ
   - 相談者の話を途中で遮らず、最後まで聞く
   - 適切なタイミングで相槌を打つ
   - 内容の確認や明確化が必要な場合は、開かれた質問を使用する
   例:「その時どのようなお気持ちでしたか?」「もう少し詳しく教えていただけますか?」

3. 共感・理解のフェーズ
   - 相談者の感情を言語化して返す
   例:「そのような状況で○○と感じられるのは、当然のことですね」
   - 相談者の価値観や考え方を否定せず、受け入れる

4. 問題解決のフェーズ
   - 相談者自身が解決策を見つけられるよう、適切な質問を投げかける
   - 具体的で実行可能な提案を行う
   - 複数の選択肢を提示し、相談者が自己決定できるよう支援する

# 注意事項
- 医療や法律など、専門的な判断が必要な場合は、適切な専門家への相談を推奨する
- 自殺や暴力など、危機的な状況の場合は、速やかに専門機関への連絡を促す
- 個人情報の取り扱いには十分注意する
- 断定的な助言は避け、相談者自身の判断を尊重する

# 緊急時の対応
深刻な危機状況を感知した場合:
1. 落ち着いた声で話しかける
2. 具体的な状況を確認する
3. 以下の連絡先を案内する:
   - 警察(110)
   - 救急(119)
   - いのちの電話(0570-064-556)

# 会話の終了
- 相談者が納得できる形で会話を終えられるよう配慮する
- 必要に応じて、継続的な相談の可能性を提案する
- 具体的な行動プランを確認する

# 応答例
相談者:「最近、職場の人間関係で悩んでいて...」
応答:「職場での人間関係のお悩みなのですね。具体的にどのような状況でしょうか?」

相談者:「子どもの教育について不安で...」
応答:「お子様の教育について心配されているのですね。どのような点が特に気がかりですか?」

相談者:「何をしても楽しくなくて...」
応答:「つらい気持ちでいらっしゃるのですね。もう少し、お気持ちについて教えていただけますか?」

このプロンプトは、状況に応じて柔軟に対応できるよう、基本的なフレームワークを提供しています。実際の会話では、相談者の状況や感情に合わせて、より個別化された対応を行うことが重要です。"
"

このプロンプトにより、AIは相談者に寄り添った対応が可能になります。共感的な傾聴から具体的なアドバイスまで、カウンセリングに必要な一連の対応を行うことができます。

これで技術的な実装とAIの設定が完了し、お悩み相談室システムとして機能する準備が整いました。

10. 実際に相談してみる

今回実装したシステムを起動し、実際に相談をしてみました。

起動方法

go run main.go

ngrokやtwilioの設定など、詳しい起動方法は以下を参照してください。
twilioのドキュメント

実際の会話例

このように、AIは相談者の気持ちに寄り添いながら、適切な質問を投げかけ、共感的な応答を返すことができています。

まとめ

OpenAIのRealtimeAPIを使用することで、リアルタイムの音声対話システムを実現できました。今回実装したお悩み相談室では、以下を意識して作成しました。

  1. 技術面

    • WebSocketによる双方向通信
    • 音声データのリアルタイム処理
    • 堅牢なエラーハンドリング
  2. 対話面

    • 自然な会話の実現
    • 共感的な応答の生成
    • 適切なカウンセリングフロー

このシステムは、カウンセリング以外にも様々な用途に応用可能です。システムプロンプトを変更することで、異なる性格や役割を持つボットとして機能させることができます。

今後の展望

  • 応答の精度向上
  • より自然な音声合成の実現
  • マルチモーダル対話の実装

今回の実装を通じて、AIによる対話システムの可能性と課題が見えてきました。今後も技術の進化とともに、より自然で効果的な対話システムの実現が期待できます。電話オペレーターがすべてAIになるのもすぐ先のことかもしれません。

サンプルコード

サンプルコード
package main

import (
	"encoding/base64"
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
	"sync"

	"github.com/fasthttp/websocket"
	"github.com/gofiber/fiber/v2"
	fiberWs "github.com/gofiber/websocket/v2"
	"github.com/joho/godotenv"
)

// Server event types from OpenAI
const (
	EventError                              = "error"
	EventSessionCreated                     = "session.created"
	EventSessionUpdated                     = "session.updated"
	EventConversationCreated                = "conversation.created"
	EventConversationItemCreated            = "conversation.item.created"
	EventConversationItemInputTransComplete = "conversation.item.input_audio_transcription.completed"
	EventConversationItemInputTransFailed   = "conversation.item.input_audio_transcription.failed"
	EventInputAudioBufferCommitted          = "input_audio_buffer.committed"
	EventInputAudioBufferCleared            = "input_audio_buffer.cleared"
	EventInputAudioBufferSpeechStarted      = "input_audio_buffer.speech_started"
	EventInputAudioBufferSpeechStopped      = "input_audio_buffer.speech_stopped"
	EventResponseCreated                    = "response.created"
	EventResponseDone                       = "response.done"
	EventResponseOutputItemAdded            = "response.output_item.added"
	EventResponseOutputItemDone             = "response.output_item.done"
	EventResponseContentPartAdded           = "response.content_part.added"
	EventResponseContentPartDone            = "response.content_part.done"
	EventResponseTextDelta                  = "response.text.delta"
	EventResponseTextDone                   = "response.text.done"
	EventResponseAudioTranscriptDelta       = "response.audio_transcript.delta"
	EventResponseAudioTranscriptDone        = "response.audio_transcript.done"
	EventResponseAudioDelta                 = "response.audio.delta"
	EventResponseAudioDone                  = "response.audio.done"
	EventRateLimitsUpdated                  = "rate_limits.updated"
)

// Client event types to OpenAI
const (
	EventSessionUpdate            = "session.update"
	EventInputAudioBufferAppend   = "input_audio_buffer.append"
	EventInputAudioBufferCommit   = "input_audio_buffer.commit"
	EventInputAudioBufferClear    = "input_audio_buffer.clear"
	EventConversationItemCreate   = "conversation.item.create"
	EventConversationItemTruncate = "conversation.item.truncate"
	EventConversationItemDelete   = "conversation.item.delete"
	EventResponseCreate           = "response.create"
	EventResponseCancel           = "response.cancel"
)

const (
	openAIWSURL  = "wss://api.openai.com/v1/realtime?model=gpt-4o-realtime-preview-2024-10-01"
	defaultPort  = "5050"
	defaultVoice = "alloy"
)

type Config struct {
	Port         string
	OpenAIKey    string
	SystemPrompt string
	Voice        string
}

type ConversationState struct {
	streamSid            string
	currentDialog        string
	latestMediaTimestamp int64
	lastAssistantItem    string
	mu                   sync.Mutex
}

type ClientEvent struct {
	EventID string `json:"event_id,omitempty"`
	Type    string `json:"type"`
}

type SessionConfig struct {
	Modalities              []string             `json:"modalities"`
	Instructions            string               `json:"instructions,omitempty"`
	Voice                   string               `json:"voice,omitempty"`
	InputAudioFormat        string               `json:"input_audio_format,omitempty"`
	OutputAudioFormat       string               `json:"output_audio_format,omitempty"`
	InputAudioTranscription *TranscriptionConfig `json:"input_audio_transcription,omitempty"`
	TurnDetection           *TurnDetectionConfig `json:"turn_detection,omitempty"`
	Tools                   []Tool               `json:"tools,omitempty"`
	ToolChoice              string               `json:"tool_choice,omitempty"`
	Temperature             float64              `json:"temperature,omitempty"`
	MaxResponseTokens       interface{}          `json:"max_response_output_tokens,omitempty"`
}

type TranscriptionConfig struct {
	Model string `json:"model"`
}

type TurnDetectionConfig struct {
	Type              string  `json:"type"`
	Threshold         float64 `json:"threshold,omitempty"`
	PrefixPaddingMs   int     `json:"prefix_padding_ms,omitempty"`
	SilenceDurationMs int     `json:"silence_duration_ms,omitempty"`
}

type Tool struct {
	Type        string     `json:"type"`
	Name        string     `json:"name"`
	Description string     `json:"description"`
	Parameters  ToolParams `json:"parameters"`
}

type ToolParams struct {
	Type       string                 `json:"type"`
	Properties map[string]interface{} `json:"properties"`
	Required   []string               `json:"required"`
}

type OpenAIClient struct {
	conn   *websocket.Conn
	config *Config
}

func main() {
	config, err := loadConfig()
	if err != nil {
		log.Fatalf("Failed to load configuration: %v", err)
	}

	app := fiber.New()
	setupRoutes(app)
	log.Fatal(app.Listen(":" + config.Port))
}

func loadConfig() (*Config, error) {
	if err := godotenv.Load(); err != nil {
		log.Printf("Warning: Error loading .env file: %v", err)
	}

	openaiAPIKey := os.Getenv("OPENAI_API_KEY")
	if openaiAPIKey == "" {
		return nil, fmt.Errorf("OPENAI_API_KEY is required")
	}

	return &Config{
		Port:         getEnvOrDefault("PORT", defaultPort),
		OpenAIKey:    openaiAPIKey,
		SystemPrompt: getEnvOrDefault("OPENAI_SYSTEM_PROMPT", "何も答えてはいけません。"),
		Voice:        getEnvOrDefault("OPENAI_VOICE", defaultVoice),
	}, nil
}

func getEnvOrDefault(key, defaultValue string) string {
	if value := os.Getenv(key); value != "" {
		return value
	}
	return defaultValue
}

func NewOpenAIClient(conn *websocket.Conn, config *Config) *OpenAIClient {
	return &OpenAIClient{
		conn:   conn,
		config: config,
	}
}

func (c *OpenAIClient) SendSessionUpdate(config SessionConfig) error {
	event := struct {
		ClientEvent
		Session SessionConfig `json:"session"`
	}{
		ClientEvent: ClientEvent{
			Type: EventSessionUpdate,
		},
		Session: config,
	}
	return c.conn.WriteJSON(event)
}

func (c *OpenAIClient) SendAudioBuffer(audioData []byte) error {
	event := struct {
		ClientEvent
		Audio string `json:"audio"`
	}{
		ClientEvent: ClientEvent{
			Type: EventInputAudioBufferAppend,
		},
		Audio: base64.StdEncoding.EncodeToString(audioData),
	}
	return c.conn.WriteJSON(event)
}

func (c *OpenAIClient) TruncateAudioMessage(itemID string, audioEndMs int) error {
	event := struct {
		ClientEvent
		ItemID       string `json:"item_id"`
		ContentIndex int    `json:"content_index"`
		AudioEndMs   int    `json:"audio_end_ms"`
	}{
		ClientEvent: ClientEvent{
			Type: EventConversationItemTruncate,
		},
		ItemID:       itemID,
		ContentIndex: 0,
		AudioEndMs:   audioEndMs,
	}
	return c.conn.WriteJSON(event)
}

func setupRoutes(app *fiber.App) {
	app.Get("/", func(c *fiber.Ctx) error {
		return c.JSON(fiber.Map{"status": "running"})
	})

	app.All("/incoming-call", handleIncomingCall)
	app.Use("/media-stream", wsUpgradeMiddleware)
	app.Get("/media-stream", fiberWs.New(handleMediaStream))
}

func wsUpgradeMiddleware(c *fiber.Ctx) error {
	if fiberWs.IsWebSocketUpgrade(c) {
		c.Locals("allowed", true)
		return c.Next()
	}
	return fiber.ErrUpgradeRequired
}

func handleIncomingCall(c *fiber.Ctx) error {
	twiml := fmt.Sprintf(`<?xml version="1.0" encoding="UTF-8"?>
		<Response>
			<Pause length="2"/>
			<Play digits="1" volume="-35"/>
			<Pause length="0.5"/>
			<Say language="ja-JP">AIアシスタントが対応いたします。</Say>
			<Connect>
				<Stream url="wss://%s/media-stream"/>
			</Connect>
		</Response>`, c.Hostname())

	c.Set("Content-Type", "application/xml")
	return c.SendString(twiml)
}

func connectToOpenAI(config *Config) (*OpenAIClient, error) {
	conn, _, err := websocket.DefaultDialer.Dial(openAIWSURL, http.Header{
		"Authorization": []string{"Bearer " + config.OpenAIKey},
		"OpenAI-Beta":   []string{"realtime=v1"},
	})
	if err != nil {
		return nil, fmt.Errorf("dial error: %v", err)
	}

	client := NewOpenAIClient(conn, config)

	sessionConfig := SessionConfig{
		Modalities:        []string{"text", "audio"},
		InputAudioFormat:  "g711_ulaw",
		OutputAudioFormat: "g711_ulaw",
		Voice:             config.Voice,
		Instructions:      config.SystemPrompt,
		Temperature:       0.8,
		TurnDetection: &TurnDetectionConfig{
			Type:              "server_vad",
			PrefixPaddingMs:   300,
			SilenceDurationMs: 500,
		},
	}

	if err := client.SendSessionUpdate(sessionConfig); err != nil {
		conn.Close()
		return nil, fmt.Errorf("session init error: %v", err)
	}

	return client, nil
}

func handleMediaStream(twilioConn *fiberWs.Conn) {
	state := &ConversationState{}

	config, err := loadConfig()
	if err != nil {
		log.Printf("Failed to load configuration: %v", err)
		return
	}

	client, err := connectToOpenAI(config)
	if err != nil {
		log.Printf("Failed to connect to OpenAI: %v", err)
		return
	}
	defer client.conn.Close()

	done := make(chan struct{})
	go handleTwilioMessages(twilioConn, client, state, done)
	go handleOpenAIMessages(twilioConn, client.conn, state, done)

	<-done
}

func handleTwilioMessages(twilioConn *fiberWs.Conn, client *OpenAIClient, state *ConversationState, done chan struct{}) {
	for {
		select {
		case <-done:
			return
		default:
			_, msg, err := twilioConn.ReadMessage()
			if isConnectionClosed(err) {
				close(done)
				return
			}

			var data map[string]interface{}
			if err := json.Unmarshal(msg, &data); err != nil {
				log.Printf("Error parsing Twilio message: %v", err)
				continue
			}

			handleTwilioEvent(data, client, state)
		}
	}
}

func handleTwilioEvent(data map[string]interface{}, client *OpenAIClient, state *ConversationState) {
	event, _ := data["event"].(string)
	switch event {
	case "media":
		handleTwilioMedia(data, client, state)
	case "start":
		handleTwilioStart(data, state)
	}
}

func handleTwilioMedia(data map[string]interface{}, client *OpenAIClient, state *ConversationState) {
	if media, ok := data["media"].(map[string]interface{}); ok {
		state.mu.Lock()
		if timestamp, ok := media["timestamp"].(float64); ok {
			state.latestMediaTimestamp = int64(timestamp)
		}
		state.mu.Unlock()

		if payload, ok := media["payload"].(string); ok {
			audioData, err := base64.StdEncoding.DecodeString(payload)
			if err != nil {
				log.Printf("Error decoding audio: %v", err)
				return
			}

			if err := client.SendAudioBuffer(audioData); err != nil {
				log.Printf("Error sending audio to OpenAI: %v", err)
			}
		}
	}
}

func handleTwilioStart(data map[string]interface{}, state *ConversationState) {
	if start, ok := data["start"].(map[string]interface{}); ok {
		state.mu.Lock()
		if streamSid, ok := start["streamSid"].(string); ok {
			state.streamSid = streamSid
			log.Printf("Stream started: %s", streamSid)
		}
		state.currentDialog = ""
		state.lastAssistantItem = ""
		state.mu.Unlock()
	}
}

func handleOpenAIMessages(twilioConn *fiberWs.Conn, openaiConn *websocket.Conn, state *ConversationState, done chan struct{}) {
	for {
		select {
		case <-done:
			return
		default:
			_, msg, err := openaiConn.ReadMessage()
			if isConnectionClosed(err) {
				close(done)
				return
			}

			var response map[string]interface{}
			if err := json.Unmarshal(msg, &response); err != nil {
				log.Printf("Error parsing OpenAI message: %v", err)
				continue
			}

			eventType, _ := response["type"].(string)
			switch eventType {
			case EventError:
				handleError(response)
			case EventSessionCreated:
				handleSessionCreated(response)
			case EventConversationItemCreated:
				handleConversationItem(response, state)
			case EventConversationItemInputTransComplete:
				handleTranscriptionComplete(response)
			case EventConversationItemInputTransFailed:
				handleTranscriptionFailed(response)
			case EventResponseCreated:
				handleResponseCreated(response)
			case EventResponseDone:
				handleResponseDone(state)
			case EventResponseAudioDelta:
				handleAudioDelta(response, twilioConn, state)
			case EventResponseTextDelta:
				handleTextDelta(response, state)
			case EventRateLimitsUpdated:
				handleRateLimits(response)
			case EventInputAudioBufferSpeechStarted:
				handleSpeechStarted(response)
			case EventInputAudioBufferSpeechStopped:
				handleSpeechStopped(response)
			}
		}
	}
}

func handleError(response map[string]interface{}) {
	if err, ok := response["error"].(map[string]interface{}); ok {
		log.Printf("OpenAI Error: Type=%s, Code=%s, Message=%s",
			err["type"], err["code"], err["message"])
	}
}

func handleSessionCreated(response map[string]interface{}) {
	if session, ok := response["session"].(map[string]interface{}); ok {
		log.Printf("Session created: ID=%s, Model=%s",
			session["id"], session["model"])
	}
}

func handleConversationItem(response map[string]interface{}, state *ConversationState) {
	if item, ok := response["item"].(map[string]interface{}); ok {
		if content, ok := item["content"].([]interface{}); ok {
			for _, c := range content {
				if contentItem, ok := c.(map[string]interface{}); ok {
					if transcript, ok := contentItem["transcript"].(string); ok {
						state.mu.Lock()
						state.currentDialog += transcript
						state.mu.Unlock()
						log.Printf("Added transcript: %s", transcript)
					}
				}
			}
		}
	}
}

func handleTranscriptionComplete(response map[string]interface{}) {
	if transcript, ok := response["transcript"].(string); ok {
		log.Printf("Transcription completed: %s", transcript)
	}
}

func handleTranscriptionFailed(response map[string]interface{}) {
	if err, ok := response["error"].(map[string]interface{}); ok {
		log.Printf("Transcription failed: %s", err["message"])
	}
}

func handleResponseCreated(response map[string]interface{}) {
	if resp, ok := response["response"].(map[string]interface{}); ok {
		log.Printf("Response created: ID=%s, Status=%s",
			resp["id"], resp["status"])
	}
}

func handleResponseDone(state *ConversationState) {
	state.mu.Lock()
	if state.currentDialog != "" {
		log.Printf("Complete response: %s", state.currentDialog)
	}
	state.currentDialog = ""
	state.mu.Unlock()
}

func handleTextDelta(response map[string]interface{}, state *ConversationState) {
	if delta, ok := response["delta"].(string); ok {
		state.mu.Lock()
		state.currentDialog += delta
		state.mu.Unlock()
	}
}

func handleAudioDelta(response map[string]interface{}, twilioConn *fiberWs.Conn, state *ConversationState) {
	if delta, ok := response["delta"].(string); ok {
		audioBytes, _ := base64.StdEncoding.DecodeString(delta)
		audioPayload := base64.StdEncoding.EncodeToString(audioBytes)

		state.mu.Lock()
		if itemID, ok := response["item_id"].(string); ok {
			state.lastAssistantItem = itemID
		}
		streamSid := state.streamSid
		state.mu.Unlock()

		err := twilioConn.WriteJSON(map[string]interface{}{
			"event":     "media",
			"streamSid": streamSid,
			"media": map[string]interface{}{
				"payload": audioPayload,
			},
		})
		if err != nil {
			log.Printf("Error sending audio delta: %v", err)
		}
	}
}

func handleRateLimits(response map[string]interface{}) {
	if limits, ok := response["rate_limits"].([]interface{}); ok {
		for _, limit := range limits {
			if l, ok := limit.(map[string]interface{}); ok {
				log.Printf("Rate limit updated: %s - Remaining: %v/%v, Reset in %v seconds",
					l["name"], l["remaining"], l["limit"], l["reset_seconds"])
			}
		}
	}
}

func handleSpeechStarted(response map[string]interface{}) {
	if itemID, ok := response["item_id"].(string); ok {
		log.Printf("Speech started: Item ID=%s, Start=%v ms",
			itemID, response["audio_start_ms"])
	}
}

func handleSpeechStopped(response map[string]interface{}) {
	if itemID, ok := response["item_id"].(string); ok {
		log.Printf("Speech stopped: Item ID=%s, End=%v ms",
			itemID, response["audio_end_ms"])
	}
}

func isConnectionClosed(err error) bool {
	return err != nil && websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNormalClosure)
}

参考リンク

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?