0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LLMのレート制限に引っかからない実装パターン — リトライ・バックオフ・モデル切り替えの実践

0
Posted at

はじめに

LLMを使ったアプリを本番運用すると、必ずぶつかる問題があります。

anthropic.RateLimitError: 429 Too Many Requests

開発中は問題なくても、ユーザーが増えたり、バッチ処理を走らせたりすると突然レート制限に引っかかります。本記事では、レート制限の仕組みを理解した上で、引っかかりにくくする実装パターンを解説します。


レート制限の種類

LLMプロバイダーのレート制限には主に3種類あります。

種別 内容 単位
RPM リクエスト数上限 リクエスト/分
TPM トークン数上限 トークン/分
TPD 日次トークン上限 トークン/日

ClaudeとOpenAIどちらも、これらを複数組み合わせて制限しています。RPMに余裕があってもTPMに引っかかることがあり、どちらかを超えた時点で429エラーが返ります。


基本:指数バックオフリトライ

レート制限に引っかかったとき、すぐにリトライするのは逆効果です。待ち時間を徐々に伸ばす「指数バックオフ」が基本パターンです。

import time
import random
import anthropic
from anthropic import RateLimitError, APIStatusError

def call_with_retry(
    client: anthropic.Anthropic,
    max_retries: int = 5,
    **kwargs
):
    """指数バックオフ付きリトライ"""
    for attempt in range(max_retries):
        try:
            return client.messages.create(**kwargs)

        except RateLimitError as e:
            if attempt == max_retries - 1:
                raise  # 最大リトライ数に達したら諦める

            # 指数バックオフ + ジッター(ランダム揺らぎ)
            wait_time = (2 ** attempt) + random.uniform(0, 1)
            print(f"レート制限。{wait_time:.1f}秒後にリトライ({attempt + 1}/{max_retries}")
            time.sleep(wait_time)

        except APIStatusError as e:
            if e.status_code == 529:  # Anthropicの過負荷エラー
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                time.sleep(wait_time)
            else:
                raise

# 使用例
client = anthropic.Anthropic(api_key="sk-...")
response = call_with_retry(
    client,
    model="claude-sonnet-4-5",
    max_tokens=1024,
    messages=[{"role": "user", "content": "こんにちは"}]
)

ジッター(ランダム揺らぎ)を加える理由: 複数のプロセスが同じタイミングでリトライすると、またすぐにレート制限に引っかかります。ランダムな揺らぎを加えることで、リトライのタイミングを分散させます。


実践:Retry-Afterヘッダーを活用する

レート制限レスポンスには retry-after ヘッダーが含まれていることがあります。このヘッダーを読んで、指定された時間だけ待つのが最も確実です。

import httpx
import anthropic

def call_with_smart_retry(client, max_retries=5, **kwargs):
    for attempt in range(max_retries):
        try:
            return client.messages.create(**kwargs)

        except anthropic.RateLimitError as e:
            if attempt == max_retries - 1:
                raise

            # Retry-Afterヘッダーを確認
            retry_after = None
            if hasattr(e, 'response') and e.response:
                retry_after = e.response.headers.get('retry-after')

            if retry_after:
                wait_time = float(retry_after)
                print(f"Retry-After指定: {wait_time}秒待機")
            else:
                wait_time = (2 ** attempt) + random.uniform(0, 1)
                print(f"デフォルトバックオフ: {wait_time:.1f}秒待機")

            time.sleep(wait_time)

並列処理でのレート制限対策

バッチ処理で大量のリクエストを並列実行すると、TPMを一気に使い切ってしまいます。セマフォとスリープを組み合わせてリクエストレートを制御します。

import asyncio
import anthropic
from asyncio import Semaphore

async def process_batch(items: list, max_concurrent: int = 5):
    """同時リクエスト数を制限した並列処理"""
    client = anthropic.AsyncAnthropic(api_key="sk-...")
    semaphore = Semaphore(max_concurrent)  # 同時5リクエストまで

    async def process_one(item):
        async with semaphore:
            try:
                response = await client.messages.create(
                    model="claude-sonnet-4-5",
                    max_tokens=512,
                    messages=[{"role": "user", "content": item}]
                )
                return response.content[0].text
            except anthropic.RateLimitError:
                await asyncio.sleep(10)  # レート制限時は10秒待機
                return None

    tasks = [process_one(item) for item in items]
    results = await asyncio.gather(*tasks, return_exceptions=True)
    return results

# 使用例
items = ["テキスト1", "テキスト2", ..., "テキスト100"]
results = asyncio.run(process_batch(items, max_concurrent=5))

トークン使用量を事前に見積もる

TPM制限に引っかからないように、リクエスト前にトークン数を見積もって、制限に近い場合はスリープを入れる方法もあります。

import anthropic
import time
from collections import deque

class TokenBucketLimiter:
    """トークンバケット方式のレート制御"""

    def __init__(self, tpm_limit: int = 40000):
        self.tpm_limit = tpm_limit
        self.token_usage = deque()  # (timestamp, tokens) のキュー

    def wait_if_needed(self, estimated_tokens: int):
        now = time.time()

        # 1分以上古いエントリを削除
        while self.token_usage and self.token_usage[0][0] < now - 60:
            self.token_usage.popleft()

        # 直近1分のトークン使用量を計算
        recent_tokens = sum(tokens for _, tokens in self.token_usage)

        # 制限に近い場合は待機
        if recent_tokens + estimated_tokens > self.tpm_limit * 0.9:
            oldest = self.token_usage[0][0] if self.token_usage else now
            wait_time = 60 - (now - oldest) + 1
            print(f"TPM制限接近。{wait_time:.1f}秒待機")
            time.sleep(wait_time)

        self.token_usage.append((time.time(), estimated_tokens))

limiter = TokenBucketLimiter(tpm_limit=40000)

def safe_api_call(prompt: str, client):
    # 大まかなトークン数を見積もる(文字数 / 3 程度)
    estimated_tokens = len(prompt) // 3 + 1000  # 出力分も加算
    limiter.wait_if_needed(estimated_tokens)

    return client.messages.create(
        model="claude-sonnet-4-5",
        max_tokens=1000,
        messages=[{"role": "user", "content": prompt}]
    )

モデル切り替えによるフォールバック

単一プロバイダーに依存していると、そのプロバイダーがレート制限に達したときに詰まります。複数のモデルをフォールバックとして用意しておくと、可用性が上がります。

from openai import OpenAI
import anthropic

# OpenAI互換ゲートウェイ経由で複数モデルを管理
client = OpenAI(
    api_key="sk-あなたのキー",
    base_url="https://router.flatkey.ai/v1"
)

FALLBACK_MODELS = [
    "anthropic/claude-sonnet-4-5",   # メイン
    "openai/gpt-4o-mini",             # フォールバック1(軽量)
    "deepseek/deepseek-chat",         # フォールバック2(低コスト)
]

def call_with_fallback(prompt: str) -> str:
    last_error = None

    for model in FALLBACK_MODELS:
        try:
            response = client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=1000
            )
            return response.choices[0].message.content

        except Exception as e:
            if "429" in str(e) or "rate" in str(e).lower():
                print(f"{model} レート制限。次のモデルへ")
                last_error = e
                continue
            raise  # レート制限以外のエラーはそのまま投げる

    raise last_error  # 全モデルがダメだった場合

# 使用例
result = call_with_fallback("Pythonでクイックソートを実装してください")

キューイングによる流量制御

ユーザーからのリクエストが急増したとき、すべてを即座にLLMに投げるのではなく、キューに溜めて一定レートで処理する設計も有効です。

import asyncio
from asyncio import Queue

class RateLimitedQueue:
    def __init__(self, requests_per_minute: int = 50):
        self.queue = Queue()
        self.rpm = requests_per_minute
        self.interval = 60 / requests_per_minute  # リクエスト間隔(秒)

    async def worker(self, client):
        """キューからリクエストを取り出して処理"""
        while True:
            prompt, future = await self.queue.get()

            try:
                response = await client.messages.create(
                    model="claude-sonnet-4-5",
                    max_tokens=512,
                    messages=[{"role": "user", "content": prompt}]
                )
                future.set_result(response.content[0].text)
            except Exception as e:
                future.set_exception(e)
            finally:
                self.queue.task_done()
                await asyncio.sleep(self.interval)  # レート制御

    async def submit(self, prompt: str):
        """リクエストをキューに追加して結果を待つ"""
        loop = asyncio.get_event_loop()
        future = loop.create_future()
        await self.queue.put((prompt, future))
        return await future

レート制限エラーのモニタリング

本番環境では、レート制限エラーの頻度をモニタリングして、設計を見直すサインとして使います。

import time
from dataclasses import dataclass, field
from collections import defaultdict

@dataclass
class RateLimitMetrics:
    total_requests: int = 0
    rate_limit_errors: int = 0
    retry_counts: list = field(default_factory=list)

    def error_rate(self) -> float:
        if self.total_requests == 0:
            return 0
        return self.rate_limit_errors / self.total_requests

metrics = RateLimitMetrics()

# エラーレートが5%を超えたら警告
if metrics.error_rate() > 0.05:
    print(f"警告: レート制限エラー率が{metrics.error_rate():.1%}です。TPM/RPM制限の見直しを検討してください")

まとめ

LLMのレート制限対策まとめ:

  1. 指数バックオフ + ジッター:基本のリトライ実装
  2. Retry-Afterヘッダーを活用:プロバイダーが指定した待機時間を使う
  3. セマフォで並列数を制限:バッチ処理でTPMを使い切らない
  4. トークンバケットで事前制御:制限に近づく前に待機を入れる
  5. モデルフォールバック:1つのモデルが制限に達したら別モデルへ
  6. キューイング:急増したリクエストを一定レートで処理

レート制限は「使いすぎのサイン」でもあります。エラー率が高い場合は、プロンプトキャッシュの活用・コンテキスト削減・モデルの使い分けなどで根本的なトークン使用量を減らすことも検討してみてください。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?