Pythonの並列処理を使ったコードの例として、Azure OpenAIとLlamaIndexを用いて複数のテキストを並行して要約する方法について書いていきます。LLM APIを逐次実行すると時間がかかると思い、速く回せる方法を探していたら、ついでにいろいろ勉強になったので備忘録的に置いておきます。
1. APIクライアントの設定
Azure OpenAIのAPIキーやエンドポイント情報を使って、llmという変数でクライアントをセットアップしています。
llm = AzureOpenAI(
model="gpt-4o-mini",
max_tokens=4096,
api_key=os.environ["AZURE_OPENAI_API_KEY"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version="2024-08-01-preview",
deployment_name="gpt-4o-mini"
)
2. 要約する関数 asummarize
asummarizeはテキストを要約する非同期関数で、awaitでAPIレスポンスを待つ仕組み。
async def asummarize(text):
res = await llm.achat(messages=SUMMARIZE_PROMPT_TEMPLATE.format_messages(context_str=text))
return res.message.content
3. 並行処理 parallel_summarize
parallel_summarize関数で複数のテキストを一気に要約します
async def parallel_summarize(texts: list[str], max_concurrent: int = 4) -> list[str]:
semaphore = Semaphore(max_concurrent)
async def process_with_progress(text: str, index: int) -> tuple[int, str]:
async with semaphore:
summary = await asummarize(text)
return index, summary
tasks = [asyncio.create_task(process_with_progress(text, i)) for i, text in enumerate(texts)]
results = [None] * len(texts)
for completed_task in tqdm(asyncio.as_completed(tasks), total=len(texts), desc="Summarizing"):
index, summary = await completed_task
results[index] = summary
return results
Semaphoreを使った同時実行数の制御と、結果の順番を保つための工夫がポイントです。
Semaphoreによる同時実行数の制御
Semaphoreは、同時に実行できる処理の数を制限する仕組みです。APIへの大量のリクエストを一度に送りすぎると、サーバーの負荷が増加してエラーが発生する可能性があります。そこで、Semaphoreを使って一度に実行するリクエスト数を制限し、システムに過剰な負担がかからないようにしています。
semaphore = Semaphore(max_concurrent)
ここでは、max_concurrentの値で同時実行数を指定します。例えばmax_concurrent=4にすると、同時に4つのリクエストまでしか実行しないようになります。
- 具体的な流れ:
- async with semaphoreという記述で、セマフォの枠内で処理を行います。
- 4つのリクエストが同時に実行され、1つが終わると次のリクエストがスタートします。
async def process_with_progress(text: str, index: int) -> tuple[int, str]:
async with semaphore:
summary = await asummarize(text)
return index, summary
結果の順番を保つ工夫
asyncio.as_completed(tasks)
は、完了した順にタスクを取り出すため、順番が保証されていません。しかし、ここではリストresultsを用意し、タスクごとに対応するインデックスに結果を格納することで、最終的な順番を保っています。
results = [None] * len(texts)
for completed_task in tqdm(asyncio.as_completed(tasks), total=len(texts), desc="Summarizing"):
index, summary = await completed_task
results[index] = summary
この工夫により、並行処理で完了順が異なる場合でも、resultsリストには元のテキストの順番に沿った要約が格納されます。
コード全体
import os
import asyncio
from asyncio import Semaphore
from llama_index.llms.azure_openai import AzureOpenAI
from tqdm.auto import tqdm
llm = AzureOpenAI(
temperature=0,
model="gpt-4o-mini",
max_tokens=4096,
api_key=os.environ["AZURE_OPENAI_API_KEY"],
azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
api_version="2024-08-01-preview",
deployment_name="gpt-4o-mini"
)
async def asummarize(text):
res = await llm.achat(messages=SUMMARIZE_PROMPT_TEMPLATE.format_messages(
context_str=text
))
return res.message.content
async def parallel_summarize(texts: list[str], max_concurrent: int = 4) -> list[str]:
semaphore = Semaphore(max_concurrent)
async def process_with_progress(text: str, index: int) -> tuple[int, str]:
async with semaphore:
summary = await asummarize(text)
return index, summary
tasks: list[asyncio.Task] = [
asyncio.create_task(process_with_progress(text, i))
for i, text in enumerate(texts)
]
results = [None] * len(texts) # 結果を格納するリストを事前に作成
for completed_task in tqdm(asyncio.as_completed(tasks), total=len(texts), desc="Summarizing"):
index, summary = await completed_task
results[index] = summary
return results