1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

トークンバケット方式でblastengineのRate Limitに対応する方法

Posted at

blastengineのAPIを利用する際には、安定した配信を行うために「Rate Limit(レート制限)」の理解が欠かせません。短時間に大量のリクエストを送信すると、サーバーが処理しきれずHTTP 429(Too Many Requests)を返す場合があります。この記事では、blastengineが定めるレート制限の仕様や、開発者が実装時に考慮すべき制御アルゴリズム(トークンバケット方式)、およびリトライ処理の実装例をわかりやすく解説します。

Rate Limitの基本仕様

blastengine APIには1分あたり500リクエストのRate Limitが設定されています。
この制限を超えてリクエストを送信すると、APIサーバーはHTTP 429 (Too Many Requests) ステータスコードを返し、リクエストは拒否されます。

Rate Limitを超えないようにするには、以下のような対策が考えられます:

  • リクエスト送信間隔を調整する:1分間に500リクエスト以内に収まるよう、適切な待機時間を設ける
  • トークンバケット方式を導入する:リクエストレートを平滑化し、バースト的な送信を制御する
  • 429エラー時のリトライロジック:エラー発生時に再試行タイミングを制御する

blastengineのRate Limit関連ヘッダ

blastengineでは、Rate制限に関する情報が以下のレスポンスヘッダとして返却されます:

ヘッダ名 内容
X-Rate-Limit-Remaining 現在の1分間ウィンドウで、残りのリクエスト可能回数
X-Rate-Limit-Retry-After-Seconds Rate Limitに達した場合、次にリクエスト可能になるまでの秒数(429時のみ返却)

429エラー時には、これらのヘッダ値を利用して適切な待機時間を設定するのが推奨されます。

トークンバケット方式とは

トークンバケット(Token Bucket)は、Rate Limitを超えないようにリクエストレートを制御するアルゴリズムの一つです。

基本的な仕組み

  1. バケツ(Bucket) に一定数のトークン(=リクエスト可能数)が入っている
  2. 一定の速度で新しいトークンが補充される
  3. リクエストを送るたびにトークンを1つ消費
  4. トークンがなければ補充されるまで待機

blastengineの制限(1分500リクエスト)を基にすると:

  • 60秒 ÷ 500リクエスト = 0.12秒ごとに1トークン補充
  • 8.33リクエスト/秒 のペースで送信可能

この方式の利点:

  • バースト対応:短時間に複数リクエストを送信可能
  • 平滑化:長期的には設定したレートを超えない
  • シンプル:実装が容易で理解しやすい

Pythonでの実装例

同期処理版

import time
import threading
import requests

class TokenBucket:
    def __init__(self, rate, capacity):
        """
        rate: 1秒あたりのトークン補充数
        capacity: バケツの最大容量
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = threading.Lock()

    def consume(self, tokens=1):
        """トークンを消費する。なければ補充されるまで待機。"""
        with self.lock:
            while True:
                now = time.time()
                elapsed = now - self.last_update
                self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
                self.last_update = now

                if self.tokens >= tokens:
                    self.tokens -= tokens
                    break
                else:
                    sleep_time = (tokens - self.tokens) / self.rate
                    time.sleep(sleep_time)

# blastengine用設定(500req/min ≒ 8.33req/sec)
bucket = TokenBucket(rate=8.33, capacity=100)

def send_request(url, data):
    """Rate制限を考慮してリクエスト送信"""
    bucket.consume(1)
    response = requests.post(url, json=data)
    return response

# 使用例
api_url = "https://app.engn.jp/api/v1/deliveries"
for i in range(600):  # 500を超えるテストケース
    try:
        res = send_request(api_url, {"data": f"message_{i}"})
        print(f"Request {i}: {res.status_code}")
    except Exception as e:
        print(f"Request {i}: Error {e}")

非同期処理版(aiohttp)

import asyncio
import time
import aiohttp

class AsyncTokenBucket:
    def __init__(self, rate, capacity):
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_update = time.time()
        self.lock = asyncio.Lock()

    async def consume(self, tokens=1):
        async with self.lock:
            while True:
                now = time.time()
                elapsed = now - self.last_update
                self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
                self.last_update = now

                if self.tokens >= tokens:
                    self.tokens -= tokens
                    break
                else:
                    sleep_time = (tokens - self.tokens) / self.rate
                    await asyncio.sleep(sleep_time)

bucket = AsyncTokenBucket(rate=8.33, capacity=100)

async def send_request_async(session, url, data, i):
    await bucket.consume(1)
    try:
        async with session.post(url, json=data) as res:
            print(f"Request {i}: Status {res.status}")
    except Exception as e:
        print(f"Request {i}: Error {e}")

async def main():
    api_url = "https://app.engn.jp/api/v1/deliveries"
    async with aiohttp.ClientSession() as session:
        tasks = [send_request_async(session, api_url, {"data": f"message_{i}"}, i) for i in range(600)]
        await asyncio.gather(*tasks)

if __name__ == "__main__":
    asyncio.run(main())

429エラー時のリトライ例

blastengineでは、Rate Limit超過時に以下のようなレスポンスが返されます:

HTTP/1.1 429 Too Many Requests
X-Rate-Limit-Remaining: 0
X-Rate-Limit-Retry-After-Seconds: 60

この場合、クライアント側で以下のようにリトライ制御を行うのが推奨されます。

import requests
import time

def safe_request(url, data, token_bucket):
    token_bucket.consume(1)
    response = requests.post(url, json=data)

    if response.status_code == 429:
        retry_after = int(response.headers.get("X-Rate-Limit-Retry-After-Seconds", "60"))
        print(f"Rate limit exceeded. Retrying after {retry_after} seconds.")
        time.sleep(retry_after)
        return safe_request(url, data, token_bucket)

    return response

動作確認(例)

テストケース 設定 結果
トークンバケット未使用 約600リクエストを一気に送信 500件以降でHTTP 429発生
トークンバケット使用 8.33req/s, capacity=100 600件全てHTTP 200、実行時間約72秒

まとめ

  • blastengine APIのRate Limitは1分あたり500リクエスト
  • 超過時はHTTP 429とともにX-Rate-Limit-Retry-After-Secondsが返却される
  • トークンバケット方式を導入することで、安定した送信と429回避が可能
  • エンドポイントは https://app.engn.jp/api/v1/ を使用する

トークンバケットによるレート制御を導入することで、API利用を安全かつ効率的に行うことができます。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?