import os
import asyncio
import logging
import json
from openai import AsyncOpenAI
from aioconsole import ainput
from colorama import Fore, Style, init
# Windows環境でも色が表示されるように初期化
init(autoreset=True)
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
# イベントタイプごとの色付け
EVENT_COLORS = {
'session.created': Fore.MAGENTA,
'session.updated': Style.BRIGHT + Fore.MAGENTA,
'rate_limits.updated': Style.BRIGHT + Fore.MAGENTA,
'conversation.item.created': Fore.YELLOW,
'response.created': Fore.GREEN,
'response.output_item.added': Style.BRIGHT + Fore.GREEN,
'response.content_part.added': Style.BRIGHT + Fore.GREEN,
'response.content_part.done': Style.BRIGHT + Fore.GREEN,
'response.output_item.done': Style.BRIGHT + Fore.GREEN,
'response.function_call_arguments.delta': Fore.BLUE,
'response.function_call_arguments.done': Style.BRIGHT + Fore.BLUE,
'response.text.delta': Fore.GREEN,
'response.done': Fore.CYAN,
'response.text.done': Fore.YELLOW,
}
# ダミーの天気情報取得関数
def get_current_weather(location):
if not location:
logging.error("Missing location argument")
return {"error": "Location not provided"}
return {
"location": location,
"weather": "Sunny",
"temperature": "23degrees",
"wind_speed": "10meter",
}
# WebSocket イベントを処理するメインハンドラ
async def handle_websocket_events(connection):
async for event in connection:
try:
event_type = event.type
if event_type in EVENT_COLORS:
color = EVENT_COLORS[event_type]
print(f"{color}{event_type}: {event}{Style.RESET_ALL}")
if event_type == "response.function_call_arguments.done":
await handle_function_call(event, connection)
elif event_type == "response.text.delta":
print(f"{Fore.YELLOW}Delta: {event.delta}{Style.RESET_ALL}", end="\n")
except Exception as e:
logging.error(f"Error handling event: {e}")
break
# Function Call の処理
async def handle_function_call(event, connection):
print("Handling function call...")
try:
data = json.loads(event.arguments)
location = data.get("location")
result = get_current_weather(location)
await connection.conversation.item.create(item={
"type": "function_call_output",
"call_id": event.call_id,
"output": json.dumps(result),
})
await connection.response.create()
except Exception as e:
logging.error(f"Error handling function call: {e}")
# ユーザー入力の処理
async def process_user_input(connection):
# Session Update完了まで3秒程度待つ
await asyncio.sleep(3)
while True:
user_input = await ainput(f"{Fore.CYAN}You: {Style.RESET_ALL}")
if not user_input.strip():
continue # 空文字の場合は次の入力を促す
if user_input.lower() in ["exit", "quit", "q"]:
print(f"{Fore.RED}Exiting chat...{Style.RESET_ALL}")
await connection.close()
break
try:
await connection.conversation.item.create(item={
"type": "message",
"role": "user",
"content": [{"type": "input_text", "text": user_input}]
})
await connection.response.create()
except Exception as e:
logging.error(f"Error sending user input: {e}")
# 1秒間の待機を追加
await asyncio.sleep(1)
# メイン処理
async def main():
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
async with client.beta.realtime.connect(model="gpt-4o-mini-realtime-preview-2024-12-17") as connection:
print("Connected to WebSocket.")
# セッション設定
await connection.session.update(session={
"modalities": ["text"],
"tools": [
{
"type": "function",
"name": "getCurrentWeather",
"description": "指定された都市の現在の天気を取得します。",
"parameters": {
"type": "object",
"properties": {
"location": {"type": "string", "description": "都市名"},
},
"required": ["location"],
},
},
],
"tool_choice": "auto",
})
# イベント処理とユーザー入力を並行実行
await asyncio.gather(
handle_websocket_events(connection),
process_user_input(connection),
)
if __name__ == "__main__":
asyncio.run(main())