こちらの翻訳です。
本書は著者が手動で翻訳したものであり内容の正確性を保証するものではありません。正確な内容に関しては原文を参照ください。
本書では、同時実行と非同期オペレーションを取り扱うための異なるアプローチを用いて、SparkデータフレームからのプロンプトでOpenAI APIをクエリーする4つの方法を比較します。環境を適切にセットアップした後で、それぞれの手法をブレークダウンし、比較します。
# importing libraries
import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.sql import Row
import pandas as pd
import openai
import os
import requests
import json
import aiohttp
import asyncio
import nest_asyncio
import ssl
from concurrent.futures import ThreadPoolExecutor, as_completed
from threading import Lock
sql_statement = f"""SELECT * FROM {path_to_your_table}"""
certificate_path = 'your/path/to/self/signed/certificate'
# SSL context for self-signed certificate
## Please note that this is an optional parameter if you're not using a self-hosted endpoint
ssl_context = ssl.create_default_context(cafile=certificate_path)
# setting up API configs
api_base = "your_api_endpoint_url"
api_model = "gpt-4"
api_version = "2023-05-15"
api_url = f"{api_base}/openai/deployments/{api_model}/chat/completions?api-version={api_version}"
api_key = "your_api_key"
手法1: Spark UDFを用いた同期API呼び出し
## generic function to query OpenAI API using `requests` package
def query_gpt4(prompt):
# Headers for the API request
headers = {
"Content-Type": "application/json",
"api-key": f"{api_key}"
}
prompt = prompt
messages=[{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}]
data = {
"model": "gpt-4",
"messages": messages
}
# Make the API request
response = requests.post(api_url, headers=headers, json=data, verify=certificate_path)
# Check for successful response
if response.status_code == 200:
# Parse the response
response_json = response.json()
return response_json['choices'][0]['message']['content']
else:
print("Failed to fetch response: ", response.status_code, response.text)
raise Exception
# Register the UDF
udf_query_gpt4 = udf(query_gpt4, StringType())
# read the dataframe with prompts
responseDF = spark.sql(sql_statement)
# re-partioning
responseDF = responseDF.repartition(30)
responseDF = responseDF.withColumn('response', udf_query_gpt4(F.col('prompt')))
アプローチ
この手法では、Sparkデータフレームのそれぞれの行に対する同期API呼び出しを行うために、requests
ライブラリを用いたユーザー定義関数(UDF)を使用しています。
コンセプト
- ユーザー定義関数: Sparkにおけるユーザー定義関数、UDFによって、それぞれの行に対してカスタムのPythonコードを実行することができます。
- 同期I/O: 同期呼び出しは、次に進む前にレスポンスを待ちますので、APIリクエストのようなI/Oバウンドのタスクでは遅くなることがあります。
比較
- 実装、理解するのにシンプルです。
- 小規模データセットや、レスポンス時間が問題にならない場合には適しています。
- 同期の性質を保つため、大規模データセットでは効率的ではありません。Sparkジョブ全体は完了するまでそれぞれのAPI呼び出しを待たなくてはならないので、実行時間が長くなります。
手法2: asyncio
を用いたPandasに対する非同期バッチ
# Apply the necessary patch for asyncio to utilise exisiting event loop, if running in Jupyter
nest_asyncio.apply()
# Function to make asynchronous call to Azure OpenAI API
async def async_query_gpt4(session, api_key, prompt, ssl_context=ssl_context):
url = api_url # Adjust as needed
headers = {
"Content-Type": "application/json",
"api-key": f"{api_key}"
}
data = {
"model": "gpt-4", # Adjust the model as needed
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
}
async with session.post(url, json=data, headers=headers, ssl=ssl_context) as response:
return await response.json()
# Function to perform asynchronous tasks in batches
async def async_task(api_key, prompts, ssl_context=ssl_context, api_call_batch_size=128):
async with aiohttp.ClientSession() as session:
tasks = []
responses = [] # Store responses
for prompt in prompts:
task = asyncio.create_task(async_query_gpt4(session, api_key, prompt, ssl_context))
tasks.append(task)
if len(tasks) >= api_call_batch_size:
responses.extend(await asyncio.gather(*tasks))
tasks = []
responses.extend(await asyncio.gather(*tasks)) # Process the last batch
return responses
def process_prompts(DF, prompt_col, response_col='response', api_call_batch_size=128):
# create a prompt index and extract prompts into a list
DF = DF.withColumn("prompt_idx", F.monotonically_increasing_id())
df = DF[['prompt_idx', prompt_col]].toPandas()
prompts_idx = df['prompt_idx'].to_list()
prompts = df[prompt_col].to_list()
# Run async tasks and get results
loop = asyncio.get_event_loop()
prompt_responses = loop.run_until_complete(async_task(api_key, prompts, ssl_context, api_call_batch_size))
# extracting content from the API responses
responses = [response['choices'][0]['message']['content'] for response in prompt_responses]
# converting results into a
results = list(zip(prompts_idx, responses))
results_DF = spark.createDataFrame(results, ["prompt_idx", response_col])
# Join back with original DataFrame using the index
DF = DF.join(results_DF, on=['prompt_idx']).drop('prompt_idx')
return DF
# read the dataframe with prompts
responseDF = spark.sql(sql_statement)
# Using pandas converted dataframe to make asynchronous calls
responseDF = process_prompts(DF=responseDF, prompt_col='prompt', response_col="response")
アプローチ
- 非同期プログラミングのために
asyncio
、非ブロッキングのHTTPリクエストのためにaiohttp
を活用します。 - 非同期タスクを用いてデータフレームにあるプロンプトを処理します。
- API呼び出しをバッチにグルーピングし、それぞれのバッチを非同期で実行します。
- Jupyter環境で非同期タスクを実行するために
asyncio.get_event_loop()
を使います。
キーコンセプト
- 非同期I/O: シングルスレッドで同時に複数のI/Oオペレーションを取り扱うことができるので、I/Oバウンドのタスクにおける効率性を改善します。
- バッチ処理: 頻繁なコンテキストスイッチのオーバーヘッドを削減し、ネットワーク利用を最適化することができます。
比較
- 非ブロッキングの性質のため、I/Oバウンドのタスクにおいては同期の手法よりも高速になる可能性があります。
- それぞれのプロンプトの個々の呼び出しを行うのではなく、バッチでリクエストを送信することで、APIサーバーの負荷を削減します。
- 非同期プログラミングなので実装が複雑になります。
手法3: asyncio semaphore
を用いたPandasに対するレート制限を持つ非同期呼び出し
# Apply the necessary patch for asyncio to utilise exisiting event loop, if running in Jupyter
nest_asyncio.apply()
# SSL context for self-signed certificate
## Please note that this is an optional parameter if you're not using a self-hosted endpoint
ssl_context = ssl.create_default_context(cafile=certificate_path)
# Function to make asynchronous call to Azure OpenAI API
async def async_query_gpt4(session, api_key, prompt, ssl_context=ssl_context):
url = api_url # Adjust as needed
headers = {
"Content-Type": "application/json",
"api-key": f"{api_key}"
}
data = {
"model": "gpt-4", # Adjust the model as needed
"messages": [
{"role": "system", "content": "You are a helpful assistant."},
{"role": "user", "content": prompt}
]
}
async with session.post(url, json=data, headers=headers, ssl=ssl_context) as response:
return await response.json()
# Function to perform asynchronous tasks with Semaphore for rate limiting
async def async_task(api_key, prompts, ssl_context=ssl_context, max_concurrent_requests=MAX_THREADS):
async with aiohttp.ClientSession() as session:
semaphore = asyncio.Semaphore(max_concurrent_requests)
tasks = []
responses = []
async def fetch_and_append(prompt):
async with semaphore:
response = await async_query_gpt4(session, api_key, prompt, ssl_context)
responses.append(response)
for prompt in prompts:
task = asyncio.create_task(fetch_and_append(prompt))
tasks.append(task)
# You can add logic here to adjust batch sizes adaptively
await asyncio.gather(*tasks)
return responses
def process_prompts(DF, prompt_col, response_col='response', max_concurrent_requests=MAX_THREADS):
# create a prompt index and extract prompts into a list
DF = DF.withColumn("prompt_idx", F.monotonically_increasing_id())
df = DF[['prompt_idx', prompt_col]].toPandas()
prompts_idx = df['prompt_idx'].to_list()
prompts = df[prompt_col].to_list()
# Run async tasks and get results
loop = asyncio.get_event_loop()
prompt_responses = loop.run_until_complete(async_task(api_key, prompts, ssl_context, max_concurrent_requests))
# extracting content from the API responses
responses = [response['choices'][0]['message']['content'] for response in prompt_responses]
# converting results into a
results = list(zip(prompts_idx, responses))
results_DF = spark.createDataFrame(results, ["prompt_idx", response_col])
# Join back with original DataFrame using the index
DF = DF.join(results_DF, on=['prompt_idx']).drop('prompt_idx')
return DF
# read the dataframe with prompts
responseDF = spark.sql(sql_statement)
# Using pandas converted dataframe to make asynchronous calls
responseDF = process_prompts(DF=responseDF, prompt_col='prompt', response_col="response")
アプローチ
- 上の手法と似ていますが、同時実行リクエスト数を制限するために
asyncio.Semaphore
を導入しています。 - それぞれのプロンプトを個々に処理しますが、セマフォの制限までの同時実行数となります。
- API呼び出しの同時実行数とレート制限のバランスを保ちます。
キーコンセプト
- セマフォ: 同時実行システムにおける複数処理によって、共通リソースへのアクセスをコントロールするメカニズムです。
- レート制限: APIサーバーの過負荷を回避し、潜在的なAPI利用制限に準拠します。
比較
- 上記の手法よりも優れた同時実行数の制御を行い、レート制限やサーバーの過負荷に関する問題が発生する可能性を削減します。
- APIレート制限やサーバーのキャパシティが問題となるような状況では、より効率的になる場合があります。
- セマフォ管理のために若干複雑になります。
手法4: Pandasに対するThreadPoolExecutorによるマルチスレッド
## generic function to query OpenAI API using `requests` package
def query_gpt4(prompt):
# Headers for the API request
headers = {
"Content-Type": "application/json",
"api-key": f"{api_key}"
}
prompt = prompt
messages=[{"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt}]
data = {
"model": "gpt-4",
"messages": messages
}
# Make the API request
response = requests.post(api_url, headers=headers, json=data, verify=certificate_path)
# Check for successful response
if response.status_code == 200:
# Parse the response
response_json = response.json()
return response_json['choices'][0]['message']['content']
else:
print("Failed to fetch response: ", response.status_code, response.text)
raise Exception
def process_prompts(DF, prompt_col, response_col='response', num_workers=32):
# Add a prompt index and convert to Pandas DataFrame
DF = DF.withColumn("prompt_idx", F.monotonically_increasing_id())
df = DF.select('prompt_idx', prompt_col).toPandas()
# Initialize the lock
lock = Lock()
with ThreadPoolExecutor(max_workers=num_workers) as executor:
# Submit tasks for each prompt and watch out for buffering
futures = {executor.submit(query_gpt4, row[prompt_col]): index for index, row in df.iterrows()}
# Collect smaller DataFrames
smaller_dfs = []
# Wait for all tasks to complete and process results
for future in as_completed(futures):
index = futures[future]
try:
results = future.result()
# Create a small DataFrame with the result
small_df = pd.DataFrame({response_col: [results], 'prompt_idx': [index]})
with lock:
smaller_dfs.append(small_df)
except Exception as e:
print(f"Error processing row {index}: {e}")
# Concatenate all smaller DataFrames
results_df = pd.concat(smaller_dfs)
# Convert results back to Spark DataFrame and join with original DataFrame
results_DF = spark.createDataFrame(results_df)
DF = DF.join(results_DF, on=['prompt_idx']).drop('prompt_idx')
return DF
# read the dataframe with prompts
responseDF = spark.sql(sql_statement)
# Using pandas converted dataframe to make multi-threaded calls
responseDF = process_prompts(DF=responseDF, prompt_col='prompt', response_col="response")
アプローチ
- マルチスレッドのためにPythonの
concurrent.futures.ThreadPoolExecutor
を使います。 - 複数スレッドで並列に同期API呼び出しを実行します。
- HTTPリクエストには
requests
ライブラリを活用します。 - ロックを用いて結果を集約するためのスレッドセーフなアプローチを採用します。
キーコンセプト
- マルチスレッド: 処理実行のマルチスレッドを活用することで、コードの並列実行を可能にします。
-
スレッドの安全性: 共有されるデータ構造(
smaller_dfs
)が複数のスレッドによって安全にアクセス、変更されることを保証します。
比較
- プロンプト数が多く、API呼び出しがボトルネックになるシナリオでは理想的となります。
- 特定のワークロード、特にタスクに外部I/Oの待ちが含まれる場合には、非同期の手法よりも高速になる場合があります。
- 非同期の手法よりも実装はわかりやすいですが、CPUバウンドのタスクにおいてはPythonのGlobal Interpreter Lock (GIL)の制限に直面する場合があります。
サマリー
- 大量の同時実行リクエスト、特にスレッド管理のオーバーヘッド(コンテキストスイッチ)が課題となる場合のI/Oバウンドタスクにおいては、ayncioベースの手法が非常に効率的です。セマフォベースのレート制限によって、リクエストの同時実行数に対するコントロールを追加し、APIレート制限に準拠したり、リソース使用を管理する際には有用となります。
- asyncioのイベントループモデルが理想的でなかったり、特定のI/Oの挙動によってスレッディングのパフォーマンスが優れている状況では、マルチスレッドがよりシンプルな同時実行モデルを提供し、使い方が直感的なものとなります。
- どの手法を選択するのかは、タスクの特性、コードが実行される環境、APIレート制限、Pythonにおける同時実行モデルに対する個人的な習熟度のような要因に依存します。
- Spark UDFを用いた同期AIP呼び出し(手法1)は、小規模なデータセットやAPIのレスポンスタイムが重大ではない場合には適しています。
- Pandasを用いた同期API呼び出し(手法2と手法3)は、APIリクエストの効率的な取り扱いが必要な大規模データセットに対しては理想的となります。
- Pandasを用いたThreadPoolExecutorによるマルチスレッディング(手法4)は、シンプルさと効率性の間のバランスが必要な際、特に(I/Oバウンドのタスクのように)GILが大きなボトルネックにならないような状況では優れた選択肢となります。