0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Pythonの並列処理でLLM APIを効率的に利用する方法

Posted at

Pythonの並列処理を使ったコードの例として、Azure OpenAIとLlamaIndexを用いて複数のテキストを並行して要約する方法について書いていきます。LLM APIを逐次実行すると時間がかかると思い、速く回せる方法を探していたら、ついでにいろいろ勉強になったので備忘録的に置いておきます。

1. APIクライアントの設定

Azure OpenAIのAPIキーやエンドポイント情報を使って、llmという変数でクライアントをセットアップしています。

llm = AzureOpenAI(
    model="gpt-4o-mini",
    max_tokens=4096,
    api_key=os.environ["AZURE_OPENAI_API_KEY"],
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    api_version="2024-08-01-preview",
    deployment_name="gpt-4o-mini"
)

2. 要約する関数 asummarize

asummarizeはテキストを要約する非同期関数で、awaitでAPIレスポンスを待つ仕組み。

async def asummarize(text):
    res = await llm.achat(messages=SUMMARIZE_PROMPT_TEMPLATE.format_messages(context_str=text))
    return res.message.content

3. 並行処理 parallel_summarize

parallel_summarize関数で複数のテキストを一気に要約します

async def parallel_summarize(texts: list[str], max_concurrent: int = 4) -> list[str]:
    semaphore = Semaphore(max_concurrent)
    
    async def process_with_progress(text: str, index: int) -> tuple[int, str]:
        async with semaphore:
            summary = await asummarize(text)
            return index, summary
            
    tasks = [asyncio.create_task(process_with_progress(text, i)) for i, text in enumerate(texts)]
    results = [None] * len(texts)
    for completed_task in tqdm(asyncio.as_completed(tasks), total=len(texts), desc="Summarizing"):
        index, summary = await completed_task
        results[index] = summary
    return results

Semaphoreを使った同時実行数の制御と、結果の順番を保つための工夫がポイントです。

Semaphoreによる同時実行数の制御

Semaphoreは、同時に実行できる処理の数を制限する仕組みです。APIへの大量のリクエストを一度に送りすぎると、サーバーの負荷が増加してエラーが発生する可能性があります。そこで、Semaphoreを使って一度に実行するリクエスト数を制限し、システムに過剰な負担がかからないようにしています。

semaphore = Semaphore(max_concurrent)

ここでは、max_concurrentの値で同時実行数を指定します。例えばmax_concurrent=4にすると、同時に4つのリクエストまでしか実行しないようになります。

  • 具体的な流れ:
    • async with semaphoreという記述で、セマフォの枠内で処理を行います。
    • 4つのリクエストが同時に実行され、1つが終わると次のリクエストがスタートします。
async def process_with_progress(text: str, index: int) -> tuple[int, str]:
    async with semaphore:
        summary = await asummarize(text)
        return index, summary

結果の順番を保つ工夫

asyncio.as_completed(tasks)は、完了した順にタスクを取り出すため、順番が保証されていません。しかし、ここではリストresultsを用意し、タスクごとに対応するインデックスに結果を格納することで、最終的な順番を保っています。

results = [None] * len(texts)
for completed_task in tqdm(asyncio.as_completed(tasks), total=len(texts), desc="Summarizing"):
    index, summary = await completed_task
    results[index] = summary

この工夫により、並行処理で完了順が異なる場合でも、resultsリストには元のテキストの順番に沿った要約が格納されます。

コード全体

import os
import asyncio
from asyncio import Semaphore

from llama_index.llms.azure_openai import AzureOpenAI
from tqdm.auto import tqdm

llm = AzureOpenAI(
    temperature=0,
    model="gpt-4o-mini",
    max_tokens=4096,
    api_key=os.environ["AZURE_OPENAI_API_KEY"],
    azure_endpoint=os.environ["AZURE_OPENAI_ENDPOINT"],
    api_version="2024-08-01-preview",
    deployment_name="gpt-4o-mini"
)

async def asummarize(text):
    res = await llm.achat(messages=SUMMARIZE_PROMPT_TEMPLATE.format_messages(
        context_str=text
    ))
    return res.message.content


async def parallel_summarize(texts: list[str], max_concurrent: int = 4) -> list[str]:
    semaphore = Semaphore(max_concurrent)

    async def process_with_progress(text: str, index: int) -> tuple[int, str]:
        async with semaphore:
            summary = await asummarize(text)
            return index, summary

    tasks: list[asyncio.Task] = [
        asyncio.create_task(process_with_progress(text, i))
        for i, text in enumerate(texts)
    ]

    results = [None] * len(texts)  # 結果を格納するリストを事前に作成

    for completed_task in tqdm(asyncio.as_completed(tasks), total=len(texts), desc="Summarizing"):
        index, summary = await completed_task
        results[index] = summary

    return results
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?