5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

KDDI エンジニア&デザイナーAdvent Calendar 2024

Day 25

OpenAIのRealtimeAPIをチャットで利用して理解を深める!Python版!

Last updated at Posted at 2024-12-25

はじめに

  • Pythonのopenaiライブラリにおいて、12月17日リリースの ver. 1.58 で、RealtimeAPIがサポートされました
  • これに伴い早速、前回Node.jsで書いた以下の記事の内容を、Pythonで作り直したので、だれかの助けになればと記事化したいと思います

実行例

  • 今回は、実行のイベントを色分けで区別して表示するようにしています
  • 色分けはおおまかに、以下としています
    • Conversationアイテムは黄色系、
    • Sessionは紫、
    • Responseは緑色

image.png

シーケンス図

ソースコード

  • 必要パッケージは以下
  • openaiについては、最新版を入れるために、pip install --upgrade openaiとやっておいたほうがよいかも
pip install asyncio aioconsole colorama openai
import os
import asyncio
import logging
import json

from openai import AsyncOpenAI
from aioconsole import ainput

from colorama import Fore, Style, init

# Windows環境でも色が表示されるように初期化
init(autoreset=True)

OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]

# イベントタイプごとの色付け
EVENT_COLORS = {
    'session.created': Fore.MAGENTA,
    'session.updated': Style.BRIGHT + Fore.MAGENTA,
    'rate_limits.updated': Style.BRIGHT + Fore.MAGENTA,
    'conversation.item.created': Fore.YELLOW,
    'response.created': Fore.GREEN,
    'response.output_item.added': Style.BRIGHT + Fore.GREEN,
    'response.content_part.added': Style.BRIGHT + Fore.GREEN,
    'response.content_part.done': Style.BRIGHT + Fore.GREEN,
    'response.output_item.done': Style.BRIGHT + Fore.GREEN,
    'response.function_call_arguments.delta': Fore.BLUE,
    'response.function_call_arguments.done': Style.BRIGHT + Fore.BLUE,
    'response.text.delta': Fore.GREEN,
    'response.done': Fore.CYAN,
    'response.text.done': Fore.YELLOW,
}

# ダミーの天気情報取得関数
def get_current_weather(location):
    if not location:
        logging.error("Missing location argument")
        return {"error": "Location not provided"}
    return {
        "location": location,
        "weather": "Sunny",
        "temperature": "23degrees",
        "wind_speed": "10meter",
    }

# WebSocket イベントを処理するメインハンドラ
async def handle_websocket_events(connection):
    async for event in connection:
        try:
            event_type = event.type

            if event_type in EVENT_COLORS:
                color = EVENT_COLORS[event_type]
                print(f"{color}{event_type}: {event}{Style.RESET_ALL}")

            if event_type == "response.function_call_arguments.done":
                await handle_function_call(event, connection)

            elif event_type == "response.text.delta":
                print(f"{Fore.YELLOW}Delta: {event.delta}{Style.RESET_ALL}", end="\n")


        except Exception as e:
            logging.error(f"Error handling event: {e}")
            break

# Function Call の処理
async def handle_function_call(event, connection):
    print("Handling function call...")
    try:
        data = json.loads(event.arguments)
        location = data.get("location")
        result = get_current_weather(location)
        await connection.conversation.item.create(item={
            "type": "function_call_output",
            "call_id": event.call_id,
            "output": json.dumps(result),
        })
        await connection.response.create()
    except Exception as e:
        logging.error(f"Error handling function call: {e}")

# ユーザー入力の処理
async def process_user_input(connection):
    # Session Update完了まで3秒程度待つ
    await asyncio.sleep(3)
    while True:
        user_input = await ainput(f"{Fore.CYAN}You: {Style.RESET_ALL}")
        if not user_input.strip():
            continue  # 空文字の場合は次の入力を促す
        if user_input.lower() in ["exit", "quit", "q"]:
            print(f"{Fore.RED}Exiting chat...{Style.RESET_ALL}")
            await connection.close()
            break

        try:
            await connection.conversation.item.create(item={
                "type": "message",
                "role": "user",
                "content": [{"type": "input_text", "text": user_input}]
            })
            await connection.response.create()
        except Exception as e:
            logging.error(f"Error sending user input: {e}")

        # 1秒間の待機を追加
        await asyncio.sleep(1)


# メイン処理
async def main():
    client = AsyncOpenAI(api_key=OPENAI_API_KEY)
    async with client.beta.realtime.connect(model="gpt-4o-mini-realtime-preview-2024-12-17") as connection:
        print("Connected to WebSocket.")

        # セッション設定
        await connection.session.update(session={
            "modalities": ["text"],
            "tools": [
                {
                    "type": "function",
                    "name": "getCurrentWeather",
                    "description": "指定された都市の現在の天気を取得します。",
                    "parameters": {
                        "type": "object",
                        "properties": {
                            "location": {"type": "string", "description": "都市名"},
                        },
                        "required": ["location"],
                    },
                },
            ],
            "tool_choice": "auto",
        })

        # イベント処理とユーザー入力を並行実行
        await asyncio.gather(
            handle_websocket_events(connection),
            process_user_input(connection),
        )

if __name__ == "__main__":
    asyncio.run(main())

補足説明

  • コンソールでのユーザー入力を受け付けるため、Websocketのイベントハンドラと、ユーザー入力は並列動作させています
        # イベント処理とユーザー入力を並行実行
        await asyncio.gather(
            handle_websocket_events(connection),
            process_user_input(connection),
        )

さいごに

  • 今後ますますニーズが高まっていくと思われる、RealtimeAPIを利用して、みなさんもハッピーAIライフをお過ごしください!
5
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?