はじめに
現代のアプリケーションでは、AIの応答速度はユーザー体験に直接影響を与えます。特に、複数のリクエストを一度に処理する場合、効率的な方法を選ばなければいけません。本記事ではマルチスレッドによる応答の高速化という話で書き連ねます
マルチスレッド処理について
マルチスレッド処理は簡単に言うと並行処理です。通常プログラムは1つのスレッドで処理が行われますが、複数のスレッドを立てて処理を行うと並列処理が実現でき、結果として全体の処理速度が速くなります。
前準備
今回はAzureOpenAIの応答をマルチスレッドにて処理させてみました。
前準備として、request-responseの処理を、prompt→answerで受け取れるクラスにWrapします。
OpenAIResponder.py
#openai setting jsonm objects
import requests
class OpenAIResponder:
def __init__(self, API_KEY, ENDPOINT):
self.API_KEY = API_KEY
self.ENDPOINT = ENDPOINT
self.headers = {
"Content-Type": "application/json",
"api-key": API_KEY,
}
def _get_payload(self, comment, temperature=0.7, top_p=0.95, max_tokens=800):
'''
to get payload for the request to OpenAI api server
comment: str # input text
temperature: float # 0.0 - 1.0
top_p: float # 0.0 - 1.0
max_tokens: int # 1 - 2048
'''
return {
"messages": [
{
"role": "system",
"content": [
{
"type": "text",
"text": comment
}
]
}
],
"temperature": temperature,
"top_p": top_p,
"max_tokens": max_tokens
}
def answer(self, comment):
'''
to get response from OpenAI api server
comment: str # input text
'''
payload = self._get_payload(comment)
try:
response = requests.post(self.ENDPOINT, headers=self.headers, json=payload)
response.raise_for_status()
except requests.RequestException as err:
raise SystemExit(err)
return response.json()['choices'][0]['message']['content']
このようにして、以下のような処理が可能です
sample.py
openai_responder = OpenAIResponder(API_KEY, ENDPOINT)
answer = openai_responder.answer("今日はいい天気ですね")
print(answer)
性能比較
早速性能比較にうつりましょう、以下がコードです。
main.py
import threading
import time
from setting import API_KEY, ENDPOINT
from OpenAIResponder import OpenAIResponder
# インスタンス作成
openai_responder = OpenAIResponder(API_KEY, ENDPOINT)
# 4つの異なるコメント
prompts = [
"1990年代のおすすめの映画を教えてください",
"最新のAI技術のトレンドは何ですか?",
"おすすめのプログラミング言語は?",
"未来のエネルギー源について教えてください"
]
# 直列処理
def sequential_responses():
start_time = time.time()
for i, prompt in enumerate(prompts):
response = openai_responder.answer(prompt)
print(f"Sequential response {i + 1}: {response}")
end_time = time.time()
print(f"Sequential processing time: {end_time - start_time:.2f} seconds\n")
# 並列処理
def parallel_responses():
def get_response(index, prompt):
response = openai_responder.answer(prompt)
print(f"Parallel response {index + 1}: {response}")
start_time = time.time()
# 4つのスレッドを作成して実行
threads = []
for i, prompt in enumerate(prompts):
thread = threading.Thread(target=get_response, args=(i, prompt))
threads.append(thread)
thread.start()
# すべてのスレッドが完了するのを待機
for thread in threads:
thread.join()
end_time = time.time()
print(f"Parallel processing time: {end_time - start_time:.2f} seconds\n")
if __name__ == "__main__":
print("Running sequential processing...")
sequential_responses()
print("Running parallel processing...")
parallel_responses()
結果
以下のようになりました。
出力は長いので省略しますが、一部スクショと速度結果を示すとこんな感じです。
Sequential processing time: 34.89 seconds
Parallel processing time: 10.93 seconds
およそ4倍近くタスクの終了が速くなっていることがわかります。
おわりに
理論上はスレッドを立てまくることでループ処理でそれぞれAPI通信とかselenium処理とかが入っていれば、早くさせることができるはずですが、リクエスト制限に引っかかると一時的に使えなくなるなどのブロッキング仕様とかがあるはずです。そのあたりとの兼ね合いを考慮して使っていきたいものです。