はじめに
LLMを使ったアプリを本番運用すると、必ずぶつかる問題があります。
anthropic.RateLimitError: 429 Too Many Requests
開発中は問題なくても、ユーザーが増えたり、バッチ処理を走らせたりすると突然レート制限に引っかかります。本記事では、レート制限の仕組みを理解した上で、引っかかりにくくする実装パターンを解説します。
レート制限の種類
LLMプロバイダーのレート制限には主に3種類あります。
| 種別 | 内容 | 単位 |
|---|---|---|
| RPM | リクエスト数上限 | リクエスト/分 |
| TPM | トークン数上限 | トークン/分 |
| TPD | 日次トークン上限 | トークン/日 |
ClaudeとOpenAIどちらも、これらを複数組み合わせて制限しています。RPMに余裕があってもTPMに引っかかることがあり、どちらかを超えた時点で429エラーが返ります。
基本:指数バックオフリトライ
レート制限に引っかかったとき、すぐにリトライするのは逆効果です。待ち時間を徐々に伸ばす「指数バックオフ」が基本パターンです。
import time
import random
import anthropic
from anthropic import RateLimitError, APIStatusError
def call_with_retry(
client: anthropic.Anthropic,
max_retries: int = 5,
**kwargs
):
"""指数バックオフ付きリトライ"""
for attempt in range(max_retries):
try:
return client.messages.create(**kwargs)
except RateLimitError as e:
if attempt == max_retries - 1:
raise # 最大リトライ数に達したら諦める
# 指数バックオフ + ジッター(ランダム揺らぎ)
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"レート制限。{wait_time:.1f}秒後にリトライ({attempt + 1}/{max_retries})")
time.sleep(wait_time)
except APIStatusError as e:
if e.status_code == 529: # Anthropicの過負荷エラー
wait_time = (2 ** attempt) + random.uniform(0, 1)
time.sleep(wait_time)
else:
raise
# 使用例
client = anthropic.Anthropic(api_key="sk-...")
response = call_with_retry(
client,
model="claude-sonnet-4-5",
max_tokens=1024,
messages=[{"role": "user", "content": "こんにちは"}]
)
ジッター(ランダム揺らぎ)を加える理由: 複数のプロセスが同じタイミングでリトライすると、またすぐにレート制限に引っかかります。ランダムな揺らぎを加えることで、リトライのタイミングを分散させます。
実践:Retry-Afterヘッダーを活用する
レート制限レスポンスには retry-after ヘッダーが含まれていることがあります。このヘッダーを読んで、指定された時間だけ待つのが最も確実です。
import httpx
import anthropic
def call_with_smart_retry(client, max_retries=5, **kwargs):
for attempt in range(max_retries):
try:
return client.messages.create(**kwargs)
except anthropic.RateLimitError as e:
if attempt == max_retries - 1:
raise
# Retry-Afterヘッダーを確認
retry_after = None
if hasattr(e, 'response') and e.response:
retry_after = e.response.headers.get('retry-after')
if retry_after:
wait_time = float(retry_after)
print(f"Retry-After指定: {wait_time}秒待機")
else:
wait_time = (2 ** attempt) + random.uniform(0, 1)
print(f"デフォルトバックオフ: {wait_time:.1f}秒待機")
time.sleep(wait_time)
並列処理でのレート制限対策
バッチ処理で大量のリクエストを並列実行すると、TPMを一気に使い切ってしまいます。セマフォとスリープを組み合わせてリクエストレートを制御します。
import asyncio
import anthropic
from asyncio import Semaphore
async def process_batch(items: list, max_concurrent: int = 5):
"""同時リクエスト数を制限した並列処理"""
client = anthropic.AsyncAnthropic(api_key="sk-...")
semaphore = Semaphore(max_concurrent) # 同時5リクエストまで
async def process_one(item):
async with semaphore:
try:
response = await client.messages.create(
model="claude-sonnet-4-5",
max_tokens=512,
messages=[{"role": "user", "content": item}]
)
return response.content[0].text
except anthropic.RateLimitError:
await asyncio.sleep(10) # レート制限時は10秒待機
return None
tasks = [process_one(item) for item in items]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
# 使用例
items = ["テキスト1", "テキスト2", ..., "テキスト100"]
results = asyncio.run(process_batch(items, max_concurrent=5))
トークン使用量を事前に見積もる
TPM制限に引っかからないように、リクエスト前にトークン数を見積もって、制限に近い場合はスリープを入れる方法もあります。
import anthropic
import time
from collections import deque
class TokenBucketLimiter:
"""トークンバケット方式のレート制御"""
def __init__(self, tpm_limit: int = 40000):
self.tpm_limit = tpm_limit
self.token_usage = deque() # (timestamp, tokens) のキュー
def wait_if_needed(self, estimated_tokens: int):
now = time.time()
# 1分以上古いエントリを削除
while self.token_usage and self.token_usage[0][0] < now - 60:
self.token_usage.popleft()
# 直近1分のトークン使用量を計算
recent_tokens = sum(tokens for _, tokens in self.token_usage)
# 制限に近い場合は待機
if recent_tokens + estimated_tokens > self.tpm_limit * 0.9:
oldest = self.token_usage[0][0] if self.token_usage else now
wait_time = 60 - (now - oldest) + 1
print(f"TPM制限接近。{wait_time:.1f}秒待機")
time.sleep(wait_time)
self.token_usage.append((time.time(), estimated_tokens))
limiter = TokenBucketLimiter(tpm_limit=40000)
def safe_api_call(prompt: str, client):
# 大まかなトークン数を見積もる(文字数 / 3 程度)
estimated_tokens = len(prompt) // 3 + 1000 # 出力分も加算
limiter.wait_if_needed(estimated_tokens)
return client.messages.create(
model="claude-sonnet-4-5",
max_tokens=1000,
messages=[{"role": "user", "content": prompt}]
)
モデル切り替えによるフォールバック
単一プロバイダーに依存していると、そのプロバイダーがレート制限に達したときに詰まります。複数のモデルをフォールバックとして用意しておくと、可用性が上がります。
from openai import OpenAI
import anthropic
# OpenAI互換ゲートウェイ経由で複数モデルを管理
client = OpenAI(
api_key="sk-あなたのキー",
base_url="https://router.flatkey.ai/v1"
)
FALLBACK_MODELS = [
"anthropic/claude-sonnet-4-5", # メイン
"openai/gpt-4o-mini", # フォールバック1(軽量)
"deepseek/deepseek-chat", # フォールバック2(低コスト)
]
def call_with_fallback(prompt: str) -> str:
last_error = None
for model in FALLBACK_MODELS:
try:
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=1000
)
return response.choices[0].message.content
except Exception as e:
if "429" in str(e) or "rate" in str(e).lower():
print(f"{model} レート制限。次のモデルへ")
last_error = e
continue
raise # レート制限以外のエラーはそのまま投げる
raise last_error # 全モデルがダメだった場合
# 使用例
result = call_with_fallback("Pythonでクイックソートを実装してください")
キューイングによる流量制御
ユーザーからのリクエストが急増したとき、すべてを即座にLLMに投げるのではなく、キューに溜めて一定レートで処理する設計も有効です。
import asyncio
from asyncio import Queue
class RateLimitedQueue:
def __init__(self, requests_per_minute: int = 50):
self.queue = Queue()
self.rpm = requests_per_minute
self.interval = 60 / requests_per_minute # リクエスト間隔(秒)
async def worker(self, client):
"""キューからリクエストを取り出して処理"""
while True:
prompt, future = await self.queue.get()
try:
response = await client.messages.create(
model="claude-sonnet-4-5",
max_tokens=512,
messages=[{"role": "user", "content": prompt}]
)
future.set_result(response.content[0].text)
except Exception as e:
future.set_exception(e)
finally:
self.queue.task_done()
await asyncio.sleep(self.interval) # レート制御
async def submit(self, prompt: str):
"""リクエストをキューに追加して結果を待つ"""
loop = asyncio.get_event_loop()
future = loop.create_future()
await self.queue.put((prompt, future))
return await future
レート制限エラーのモニタリング
本番環境では、レート制限エラーの頻度をモニタリングして、設計を見直すサインとして使います。
import time
from dataclasses import dataclass, field
from collections import defaultdict
@dataclass
class RateLimitMetrics:
total_requests: int = 0
rate_limit_errors: int = 0
retry_counts: list = field(default_factory=list)
def error_rate(self) -> float:
if self.total_requests == 0:
return 0
return self.rate_limit_errors / self.total_requests
metrics = RateLimitMetrics()
# エラーレートが5%を超えたら警告
if metrics.error_rate() > 0.05:
print(f"警告: レート制限エラー率が{metrics.error_rate():.1%}です。TPM/RPM制限の見直しを検討してください")
まとめ
LLMのレート制限対策まとめ:
- 指数バックオフ + ジッター:基本のリトライ実装
- Retry-Afterヘッダーを活用:プロバイダーが指定した待機時間を使う
- セマフォで並列数を制限:バッチ処理でTPMを使い切らない
- トークンバケットで事前制御:制限に近づく前に待機を入れる
- モデルフォールバック:1つのモデルが制限に達したら別モデルへ
- キューイング:急増したリクエストを一定レートで処理
レート制限は「使いすぎのサイン」でもあります。エラー率が高い場合は、プロンプトキャッシュの活用・コンテキスト削減・モデルの使い分けなどで根本的なトークン使用量を減らすことも検討してみてください。