2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

この記事では、Azure Functionsを使用してGPT-4oのストリーミングを実現する方法について説明します。具体的には、Azure OpenAIサービスと連携し、リアルタイムでデータをストリーミングする方法を紹介します。

前提条件

  • Azureアカウント
  • Azure Functionsの基本的な知識
  • Pythonの基本的な知識
  • dotenvライブラリを使用して環境変数を管理する方法

導入

環境設定

まず、Azure Functionsのプロジェクトを作成します。VScodeでAzureにログインします。
左側のメニューのWORKSPACEからCreate Function Projectを選択します。プロジェクトフォルダーを選択します。次のパラーメータでFUNCTIONを作成します。
Language: python Model V2
path: python.exeのパス (例)"C:\Users\admin\anaconda3\envs\py311\python.exe"
template: HTTP trigger
auth level: FUNCTION

次に、必要なライブラリをインストールします。

pip install azure-functions openai python-dotenv requests

次に、.envファイルを作成し、Azure OpenAIのエンドポイントとAPIキーを設定します。

AZURE_OPEN_AI_ENDPOINT=your_openai_endpoint
AZURE_OPEN_AI_API_KEY=your_openai_api_key
ENDPOINT=http://localhost:7071/api/stream_openai_text  
# 上記のENDPOINTはローカル実行時のエンドポイントです。ネットで実行する場合はFUNCION URLを貼り付けます。

local.settings.jsonの設定

ローカル環境でAzure Functionsを実行するために、local.settings.jsonファイルに以下の変数を追加します。

"PYTHON_ENABLE_INIT_INDEXING": "1",
"PYTHON_ISOLATE_WORKER_DEPENDENCIES": "1"

image.png

コードの説明

インポートと環境変数の読み込み

まず、必要なライブラリをインポートし、環境変数を読み込みます。

import azure.functions as func
import openai
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio
import os
from dotenv import load_dotenv

load_dotenv()

Azure Function Appの設定

Azure Function Appを設定し、環境変数からエンドポイントとAPIキーを取得します。

app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)

endpoint = os.getenv("AZURE_OPEN_AI_ENDPOINT")
api_key = os.getenv("AZURE_OPEN_AI_API_KEY")

Azure OpenAIクライアントの設定

Azure OpenAIクライアントを非同期で設定します。

client = openai.AsyncAzureOpenAI(
    azure_endpoint=endpoint,
    api_key=api_key,
    api_version="2024-02-01"
)

ストリーミングデータの処理

非同期でストリーミングデータを処理する関数を定義します。

async def stream_processor(response):
    async for chunk in response:
        if len(chunk.choices) > 0:
            delta = chunk.choices[0].delta
            if delta.content:
                await asyncio.sleep(0.05)
                yield delta.content

HTTPストリーミングAzure Function

HTTPリクエストを受け取り、Azure OpenAIサービスからのストリーミングレスポンスを返す関数を定義します。

@app.route(route="stream_openai_text", methods=[func.HttpMethod.POST])
async def stream_openai_text(req: Request) -> StreamingResponse:
    try:
        body = await req.json()
        prompt = body.get("prompt")
        if not prompt:
            return func.HttpResponse("プロンプトが見つかりませんでした", status_code=400)
    except ValueError:
        return func.HttpResponse("無効なJSON形式のリクエストボディです", status_code=400)

    azure_open_ai_response = await client.chat.completions.create(
        model="gpt-4o",
        temperature=0,
        max_tokens=1000,
        messages=[{"role": "user", "content": prompt}],
        stream=True
    )

    return StreamingResponse(stream_processor(azure_open_ai_response), media_type="text/event-stream")

プログラム全体のコードはコチラです

function_app.py

import azure.functions as func
import openai
from azurefunctions.extensions.http.fastapi import Request, StreamingResponse
import asyncio
import os
from dotenv import load_dotenv

load_dotenv()

# Azure Function App
app = func.FunctionApp(http_auth_level=func.AuthLevel.FUNCTION)

endpoint = os.getenv("AZURE_OPEN_AI_ENDPOINT")
api_key = os.getenv("AZURE_OPEN_AI_API_KEY")

# Azure Open AI
deployment = "gpt-4o"
temperature = 0

client = openai.AsyncAzureOpenAI(
    azure_endpoint=endpoint,
    api_key=api_key,
    api_version="2024-02-01"
)

# Get data from Azure Open AI


async def stream_processor(response):
    async for chunk in response:
        if len(chunk.choices) > 0:
            delta = chunk.choices[0].delta
            if delta.content:  # Get remaining generated response if applicable
                await asyncio.sleep(0.05)
                yield delta.content

# HTTP streaming Azure Function


@app.route(route="stream_openai_text", methods=[func.HttpMethod.POST])
async def stream_openai_text(req: Request) -> StreamingResponse:
    try:
        body = await req.json()  # req.get_json() を req.json() に変更
        prompt = body.get("prompt")
        if not prompt:
            return func.HttpResponse("プロンプトが見つかりませんでした", status_code=400)
    except ValueError:
        return func.HttpResponse("無効なJSON形式のリクエストボディです", status_code=400)

    azure_open_ai_response = await client.chat.completions.create(
        model=deployment,
        temperature=temperature,
        max_tokens=1000,
        messages=[
            {"role": "system", "content": "You are voldermort's Diary"},
            {"role": "user", "content": prompt}
        ],
        stream=True
    )

    return StreamingResponse(stream_processor(azure_open_ai_response), media_type="text/event-stream")

実行方法

  1. Azure Functionsのローカル環境をセットアップします。
  2. 上記のコードをfunction_app.pyファイルにコピーします。
  3. local.settings.jsonファイルに必要な環境変数を追加します。
  4. Azure Functionsをローカルで実行し、http://localhost:7071/api/stream_openai_textにPOSTリクエストを送信します。

デプロイと環境変数の設定

Azure Functionsにデプロイする際、Azureポータルを使用して環境変数を設定する必要があります。

Azureポータルを使用して環境変数を設定する方法

  1. Azureポータルで対象のFunction Appを開きます。
  2. 左側のメニューから「Settings」 > 「Enviornment Variables」を選択します。
  3. 「App Settings」タブを選択し、「Add」をクリックします。

image.png

  1. 以下の環境変数を追加します。
    • AZURE_OPEN_AI_ENDPOINT: .envファイルに記載したエンドポイント
    • AZURE_OPEN_AI_API_KEY: .envファイルに記載したAPIキー
    • PYTHON_ENABLE_INIT_INDEXING: 1
    • PYTHON_ISOLATE_WORKER_DEPENDENCIES: 1
  2. 追加が完了したら、「保存」をクリックして変更を適用します。

デプロイ後のテスト

デプロイが完了したら、以下のtest.pyスクリプトを使用してAzure Functionsをテストします。

test.pyの内容

import requests
import json
import os
from dotenv import load_dotenv

load_dotenv()

# Azure FunctionsのエンドポイントURLを.envファイルのENDPOINTに格納します。
url = os.getenv("ENDPOINT")

# 送信するメッセージ
message = "Hello, Are you human?"

try:
    # リクエストペイロード
    payload = {
        "prompt": message
    }
    # ヘッダー
    headers = {
        "Content-Type": "application/json"
    }
    # POSTリクエストを送信
    response = requests.post(url, data=json.dumps(payload), headers=headers, stream=True)
    # ステータスコードが200以外の場合は例外をスロー
    response.raise_for_status()

    # ストリーミングレスポンスを処理
    print(response.content)
    #for line in response.iter_lines():
    #    if line:
    #        print(json.loads(line))  # ストリーミングレスポンスを表示

except requests.exceptions.RequestException as e:
    print(f"Error: {e}")

テスト手順

  1. test.pyスクリプトをローカル環境に保存します。
  2. .envファイルにデプロイしたAzure FunctionsのエンドポイントURLを設定します。
  3. test.pyスクリプトを実行して、Azure Functionsが正しく動作するか確認します。
python test.py

実行結果

image.png

まとめ

この記事では、Azure Functionsを使用して非同期ストリーミングを実現する方法について説明しました。非同期プログラミングの利点を活かし、リアルタイムでデータを処理する方法を学びました。また、ローカル環境とデプロイ時の環境変数の設定方法、およびデプロイ後のテスト方法についても説明しました。
しかしこれでは実際にストリーミングしていることが見えないため、次回の記事でフロントを作成してみます。

2
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?