31
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PythonでLLM APIを並列実行する方法

31
Posted at

株式会社シンシアでは、実務未経験のエンジニアの方や学生エンジニアインターンを採用し一緒に働いています。
※ シンシアにおける働き方の様子はこちら

シンシアでは、年間100人程度の実務未経験の方が応募し技術面接を受けます。
その経験を通し、実務未経験者の方にぜひ身につけて欲しい技術力(文法)をここでは紹介していきます。

概要

LLM APIの呼び出しは通常数秒から数十秒かかるため、複数のリクエストを順次処理すると非効率です。

: 100件のプロンプトを処理する場合

  • 順次処理: 1件3秒 × 100件 = 300秒(5分)
  • 並列処理: 10件ずつ並列 → 3秒 × 10バッチ = 30秒

並列処理により、処理時間を10分の1に短縮できます。

推奨方式

LLM API呼び出しには asyncio を使った非同期並列処理が最適です。


基本的な実装パターン

パターン1: 最もシンプルな実装(少量データ向け)

import asyncio
from openai import AsyncOpenAI

# グローバルにクライアントを初期化
client = AsyncOpenAI(api_key="your-api-key")

async def call_llm(prompt: str) -> str:
    """LLM APIを非同期で呼び出す"""
    response = await client.chat.completions.create(
        model="gpt-4o",
        messages=[{"role": "user", "content": prompt}],
    )
    return response.choices[0].message.content

async def main():
    prompts = [
        "What is Python?",
        "What is JavaScript?",
        "What is TypeScript?",
    ]
    
    # 並列実行
    tasks = [call_llm(prompt) for prompt in prompts]
    results = await asyncio.gather(*tasks)
    
    # 結果を表示
    for prompt, result in zip(prompts, results):
        print(f"Prompt: {prompt}")
        print(f"Result: {result}\n")

# 実行
asyncio.run(main())

実行ログ例:

Prompt: What is Python?
Result: Python is a high-level programming language known for its simplicity...

Prompt: What is JavaScript?
Result: JavaScript is a programming language primarily used for web development...

Prompt: TypeScript?
Result: TypeScript is a strongly typed programming language that builds on JavaScript...

パターン2: セマフォで同時実行数を制限(実用的)

大量のリクエストを処理する場合、無制限に並列実行するとレート制限に引っかかります。
セマフォ(Semaphore) を使って同時実行数を制限しましょう。

import asyncio
from openai import AsyncOpenAI

client = AsyncOpenAI(api_key="your-api-key")

async def call_llm_with_semaphore(
    prompt: str,
    semaphore: asyncio.Semaphore,
    index: int,
    total: int
) -> str:
    """
    セマフォで同時実行数を制限しながらLLM APIを呼び出す
    
    Args:
        prompt: プロンプト
        semaphore: 同時実行数を制限するセマフォ
        index: インデックス(進捗表示用)
        total: 総数(進捗表示用)
    
    Returns:
        LLMの応答
    """
    async with semaphore:
        if (index % 10) == 0:
            print(f"  [{index}/{total}] 処理中...")
        
        response = await client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": prompt}],
        )
        return response.choices[0].message.content

async def main():
    prompts = [f"Question {i}" for i in range(50)]
    
    print(f"総数: {len(prompts)}")
    print("処理開始...")
    
    # 最大10件まで同時実行
    semaphore = asyncio.Semaphore(10)
    
    tasks = [
        call_llm_with_semaphore(prompt, semaphore, i, len(prompts))
        for i, prompt in enumerate(prompts)
    ]
    
    results = await asyncio.gather(*tasks)
    
    print(f"完了! {len(results)}件処理しました")

asyncio.run(main())

実行ログ例:

総数: 50件
処理開始...
  [0/50] 処理中...
  [10/50] 処理中...
  [20/50] 処理中...
  [30/50] 処理中...
  [40/50] 処理中...
完了! 50件処理しました

推奨同時実行数:

  • OpenAI: 5-10
  • Anthropic (Claude): 5-10
  • Google (Gemini): 10-20

パターン3: エラーハンドリング付き(本番環境向け)

本番環境では、API呼び出しが失敗する可能性を考慮する必要があります。

import asyncio
from typing import Optional
from openai import AsyncOpenAI, APIError, RateLimitError

client = AsyncOpenAI(api_key="your-api-key")

async def call_llm_with_retry(
    prompt: str,
    semaphore: asyncio.Semaphore,
    index: int,
    total: int,
    max_retries: int = 3
) -> Optional[str]:
    """
    エラーハンドリングとリトライ機能付きでLLM APIを呼び出す
    
    Args:
        prompt: プロンプト
        semaphore: 同時実行数を制限するセマフォ
        index: インデックス(進捗表示用)
        total: 総数(進捗表示用)
        max_retries: 最大リトライ回数
    
    Returns:
        LLMの応答(エラー時はNone)
    """
    async with semaphore:
        delay = 1.0  # 初回の待機時間
        
        for attempt in range(max_retries):
            try:
                if (index % 10) == 0:
                    print(f"  [{index}/{total}] 処理中...")
                
                response = await client.chat.completions.create(
                    model="gpt-4o",
                    messages=[{"role": "user", "content": prompt}],
                )
                
                return response.choices[0].message.content
            
            except RateLimitError as e:
                if attempt < max_retries - 1:
                    print(f"  [{index}/{total}] レート制限。{delay}秒後にリトライ... ({attempt + 1}/{max_retries})")
                    await asyncio.sleep(delay)
                    delay *= 2  # 指数バックオフ
                else:
                    print(f"  [{index}/{total}] エラー: レート制限超過")
                    return None
            
            except APIError as e:
                if attempt < max_retries - 1:
                    print(f"  [{index}/{total}] APIエラー。{delay}秒後にリトライ... ({attempt + 1}/{max_retries})")
                    await asyncio.sleep(delay)
                    delay *= 2
                else:
                    print(f"  [{index}/{total}] エラー: {e}")
                    return None
            
            except Exception as e:
                print(f"  [{index}/{total}] 予期しないエラー: {e}")
                return None
        
        return None

async def main():
    prompts = [f"Question {i}" for i in range(50)]
    
    print(f"総数: {len(prompts)}")
    print("処理開始...")
    
    semaphore = asyncio.Semaphore(10)
    
    tasks = [
        call_llm_with_retry(prompt, semaphore, i, len(prompts))
        for i, prompt in enumerate(prompts)
    ]
    
    results = await asyncio.gather(*tasks)
    
    # 統計情報
    successful = sum(1 for r in results if r is not None)
    failed = len(results) - successful
    
    print(f"\n=== 処理完了 ===")
    print(f"総数: {len(results)}")
    print(f"成功: {successful}")
    print(f"失敗: {failed}")

asyncio.run(main())

実行ログ例:

総数: 50件
処理開始...
  [0/50] 処理中...
  [10/50] 処理中...
  [15/50] レート制限。1秒後にリトライ... (1/3)
  [20/50] 処理中...
  [30/50] 処理中...
  [40/50] 処理中...

=== 処理完了 ===
総数: 50件
成功: 48件
失敗: 2件

よくある質問

Q1. 何件まで並列実行できますか?

A. プロバイダーのレート制限によります。

  • OpenAI: 5-10件が安全(Tier 1の場合)
  • Anthropic: 5-10件が安全
  • Google Gemini: 10-20件が安全

レート制限に引っかかる場合は、同時実行数を減らすか、リトライロジックを実装してください。

Q2. 処理時間はどのくらい短縮されますか?

A. 同時実行数によります。

  • 10件を順次処理: 3秒 × 10 = 30秒
  • 10件を並列処理(5件ずつ): 3秒 × 2 = 6秒80%短縮
  • 100件を並列処理(10件ずつ): 3秒 × 10 = 30秒90%短縮

Q3. エラーが発生した場合はどうすればいいですか?

A. リトライロジックを実装してください(パターン3参照)。

  • レート制限エラー: 待機してからリトライ(指数バックオフ)
  • APIエラー: 3回までリトライ
  • タイムアウト: タイムアウト時間を延長

Q4. コストはどのくらいかかりますか?

A. 使用するモデルとトークン数によります。

: GPT-4o で100件のプロンプトを処理(各1,000トークン)

  • 入力トークン: 50,000トークン → $0.125
  • 出力トークン: 50,000トークン → $0.50
  • 合計: $0.625

コスト削減のコツ:

  • gpt-4o-miniやgpt-5-miniなどのmini系を使用(ハイグレードモデルの約1/10の価格)
  • プロンプトを簡潔にする
  • 不要なリトライを避ける

まとめ

基本パターンの選び方

ケース 推奨パターン
10件以下の少量処理 パターン1(シンプルな並列実行)
100件以上の大量処理 パターン2(セマフォで同時実行数制限)
本番環境・信頼性が重要 パターン3(エラーハンドリング付き)

重要なポイント

  1. asyncio.Semaphore で同時実行数を制限する(5-10が安全)
  2. エラーハンドリング を実装する(RateLimitError, APIError)
  3. 進捗表示 を追加してユーザーエクスペリエンスを向上
  4. リトライロジック で一時的なエラーに対応
  5. ログ を出力して問題を追跡しやすくする

このドキュメントの実装例を参考に、プロジェクトの要件に合わせてカスタマイズしてください。

31
18
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
31
18

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?