はじめに
この記事では、Azure Functionsを使用してGPT-4oのストリーミングを実現する方法について説明します。具体的には、Azure OpenAIサービスと連携し、リアルタイムでデータをストリーミングする方法を紹介します。
前提条件
- Azureアカウント
- Azure Functionsの基本的な知識
- Pythonの基本的な知識
-
dotenv
ライブラリを使用して環境変数を管理する方法
導入
環境設定
まず、Azure Functionsのプロジェクトを作成します。VScodeでAzureにログインします。
左側のメニューのWORKSPACEからCreate Function Projectを選択します。プロジェクトフォルダーを選択します。次のパラーメータでFUNCTIONを作成します。
Language: python Model V2
path: python.exeのパス (例)"C:\Users\admin\anaconda3\envs\py311\python.exe"
template: HTTP trigger
auth level: FUNCTION
次に、必要なライブラリをインストールします。
pip install azure-functions openai python-dotenv requests
次に、.env
ファイルを作成し、Azure OpenAIのエンドポイントとAPIキーを設定します。
AZURE_OPEN_AI_ENDPOINT=your_openai_endpoint
AZURE_OPEN_AI_API_KEY=your_openai_api_key
ENDPOINT=http://localhost:7071/api/stream_openai_text
# 上記のENDPOINTはローカル実行時のエンドポイントです。ネットで実行する場合はFUNCION URLを貼り付けます。
local.settings.json
の設定
ローカル環境でAzure Functionsを実行するために、local.settings.json
ファイルに以下の変数を追加します。
"PYTHON_ENABLE_INIT_INDEXING": "1",
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1"
コードの説明
インポートと環境変数の読み込み
まず、必要なライブラリをインポートし、環境変数を読み込みます。
import azure.functions as func
import openai
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio
import os
from dotenv import load_dotenv
load_dotenv()
Azure Function Appの設定
Azure Function Appを設定し、環境変数からエンドポイントとAPIキーを取得します。
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
endpoint = os.getenv("AZURE_OPEN_AI_ENDPOINT")
api_key = os.getenv("AZURE_OPEN_AI_API_KEY")
Azure OpenAIクライアントの設定
Azure OpenAIクライアントを非同期で設定します。
client = openai.AsyncAzureOpenAI(
azure_endpoint=endpoint,
api_key=api_key,
api_version="2024-02-01"
)
ストリーミングデータの処理
非同期でストリーミングデータを処理する関数を定義します。
async def stream_processor(response):
async for chunk in response:
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if delta.content:
await asyncio.sleep(0.05)
yield delta.content
HTTPストリーミングAzure Function
HTTPリクエストを受け取り、Azure OpenAIサービスからのストリーミングレスポンスを返す関数を定義します。
@app.route(route="stream_openai_text", methods=[func.HttpMethod.POST])
async def stream_openai_text(req: Request) -> StreamingResponse:
try:
body = await req.json()
prompt = body.get("prompt")
if not prompt:
return func.HttpResponse("プロンプトが見つかりませんでした", status_code=400)
except ValueError:
return func.HttpResponse("無効なJSON形式のリクエストボディです", status_code=400)
azure_open_ai_response = await client.chat.completions.create(
model="gpt-4o",
temperature=0,
max_tokens=1000,
messages=[{"role": "user", "content": prompt}],
stream=True
)
return StreamingResponse(stream_processor(azure_open_ai_response), media_type="text/event-stream")
プログラム全体のコードはコチラです
import azure.functions as func
import openai
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio
import os
from dotenv import load_dotenv
load_dotenv()
# Azure Function App
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)
endpoint = os.getenv("AZURE_OPEN_AI_ENDPOINT")
api_key = os.getenv("AZURE_OPEN_AI_API_KEY")
# Azure Open AI
deployment = "gpt-4o"
temperature = 0
client = openai.AsyncAzureOpenAI(
azure_endpoint=endpoint,
api_key=api_key,
api_version="2024-02-01"
)
# Get data from Azure Open AI
async def stream_processor(response):
async for chunk in response:
if len(chunk.choices) > 0:
delta = chunk.choices[0].delta
if delta.content: # Get remaining generated response if applicable
await asyncio.sleep(0.05)
yield delta.content
# HTTP streaming Azure Function
@app.route(route="stream_openai_text", methods=[func.HttpMethod.POST])
async def stream_openai_text(req: Request) -> StreamingResponse:
try:
body = await req.json() # req.get_json() を req.json() に変更
prompt = body.get("prompt")
if not prompt:
return func.HttpResponse("プロンプトが見つかりませんでした", status_code=400)
except ValueError:
return func.HttpResponse("無効なJSON形式のリクエストボディです", status_code=400)
azure_open_ai_response = await client.chat.completions.create(
model=deployment,
temperature=temperature,
max_tokens=1000,
messages=[
{"role": "system", "content": "You are voldermort's Diary"},
{"role": "user", "content": prompt}
],
stream=True
)
return StreamingResponse(stream_processor(azure_open_ai_response), media_type="text/event-stream")
実行方法
- Azure Functionsのローカル環境をセットアップします。
- 上記のコードを
function_app.py
ファイルにコピーします。 -
local.settings.json
ファイルに必要な環境変数を追加します。 - Azure Functionsをローカルで実行し、
http://localhost:7071/api/stream_openai_text
にPOSTリクエストを送信します。
デプロイと環境変数の設定
Azure Functionsにデプロイする際、Azureポータルを使用して環境変数を設定する必要があります。
Azureポータルを使用して環境変数を設定する方法
- Azureポータルで対象のFunction Appを開きます。
- 左側のメニューから「Settings」 > 「Enviornment Variables」を選択します。
- 「App Settings」タブを選択し、「Add」をクリックします。
- 以下の環境変数を追加します。
-
AZURE_OPEN_AI_ENDPOINT
:.env
ファイルに記載したエンドポイント -
AZURE_OPEN_AI_API_KEY
:.env
ファイルに記載したAPIキー -
PYTHON_ENABLE_INIT_INDEXING
:1
-
PYTHON_ISOLATE_WORKER_DEPENDENCIES
:1
-
- 追加が完了したら、「保存」をクリックして変更を適用します。
デプロイ後のテスト
デプロイが完了したら、以下のtest.py
スクリプトを使用してAzure Functionsをテストします。
test.py
の内容
import requests
import json
import os
from dotenv import load_dotenv
load_dotenv()
# Azure FunctionsのエンドポイントURLを.envファイルのENDPOINTに格納します。
url = os.getenv("ENDPOINT")
# 送信するメッセージ
message = "Hello, Are you human?"
try:
# リクエストペイロード
payload = {
"prompt": message
}
# ヘッダー
headers = {
"Content-Type": "application/json"
}
# POSTリクエストを送信
response = requests.post(url, data=json.dumps(payload), headers=headers, stream=True)
# ステータスコードが200以外の場合は例外をスロー
response.raise_for_status()
# ストリーミングレスポンスを処理
print(response.content)
#for line in response.iter_lines():
# if line:
# print(json.loads(line)) # ストリーミングレスポンスを表示
except requests.exceptions.RequestException as e:
print(f"Error: {e}")
テスト手順
-
test.py
スクリプトをローカル環境に保存します。 -
.env
ファイルにデプロイしたAzure FunctionsのエンドポイントURLを設定します。 -
test.py
スクリプトを実行して、Azure Functionsが正しく動作するか確認します。
python test.py
実行結果
まとめ
この記事では、Azure Functionsを使用して非同期ストリーミングを実現する方法について説明しました。非同期プログラミングの利点を活かし、リアルタイムでデータを処理する方法を学びました。また、ローカル環境とデプロイ時の環境変数の設定方法、およびデプロイ後のテスト方法についても説明しました。
しかしこれでは実際にストリーミングしていることが見えないため、次回の記事でフロントを作成してみます。