Agent の大敵スロットリングを打ち負かす
Agent が動いている途中にスロットリングすると、一般的にはやり直しになっちゃいます。
つらい。
やり直してまた、スロットリングすると無限ループです。
さてどうしようか、というお話です。
スロットリングさせないために、スロットリングしても処理を継続させるためにはどうすればいいですか?
用法・用量を守って正しくお使いください。それしかありません。
ここでいう用法・用量は推論の回数及び使用するトークンの量です。ただし、一般に Agent では推論の回数は同時に使わない限りそんなにハネません。
開発を前提にトークン使用料は独り占めできること状態でどうやって防ぐか、というお話です。トークンの時間単位での使用量を減らす方法を紹介します。
といっても、結論モデル呼び出しごとに sleep
させるしかないんですが、そのいい実装は?というお話。
それとは別にそれでも万一スロットリングしたときに処理を resume させる方法も紹介します。
Strands Agents では様々なタイミングで処理を追加できるので sleep を
callback や hook を使うことでなにかしらのイベント時に処理を入れることができます。
今回は使用したトークン量に応じてスリープを入れたかったので、使用したトークン量が取りやすい callback を利用しました。
from time import sleep
class TokenLimitHandler:
def __init__(self):
pass
def __call__(self, **kwargs):
if "event" in kwargs:
if "metadata" in kwargs["event"]:
if "usage" in kwargs["event"]["metadata"]:
usage = kwargs["event"]["metadata"]["usage"]
total_tokens = usage.get("totalTokens", 0)
input_tokens = usage.get("inputTokens", 0)
output_tokens = usage.get("outputTokens", 0)
sleep_time = (total_tokens / 4000) * 60
message = f"""[TokenLimitHandler] input token を {input_tokens}, output token を {output_tokens}, 合計 {total_tokens} 使用しました。
[TokenLimitHandler] Quota 回復のため {sleep_time} 秒休みます"""
print(message)
sleep(sleep_time)
こんな Callback Handler を用意しました。
Bedrock ではモデル呼び出しごとに usage
というキーに使用したトークン数が入るのでそのデータがあったら sleep する、という仕様にしちえます。
sleep する時間は sleep_time = (total_tokens / 4000) * 60
で設定しており 4000 というマジックナンバーが入っています。実際にはモデルのクオータ限界の値でいいでしょう。例えば Bedrock のサービスクオータのこの値などを設定してもいいと思います。(Claude Sonnet 4 の場合: デフォルト 200000)
さて、あとは Agent を生成するときに Callback を指定してあげるだけです。
from strands import Agent
agent = Agent(
model="us.anthropic.claude-sonnet-4-20250514-v1:0",
system_prompt="SYSTEM_PROMPT",
callback_handler=TokenLimitHandler(),
)
agent("YOUR_PROMPT")
すると、以下のようにモデル呼び出しごとに処理を一時的に止めるのでわりと確実に処理を終えられます。
[TokenLimitHandler] input token を 4809, output token を 119, 合計 4928 使用しました。
[TokenLimitHandler] Quota 回復のため 73.92 秒休みます
[TokenLimitHandler] input token を 1490, output token を 134, 合計 1624 使用しました。
[TokenLimitHandler] Quota 回復のため 24.360000000000003 秒休みます
[TokenLimitHandler] input token を 10938, output token を 8192, 合計 19130 使用しました。
[TokenLimitHandler] Quota 回復のため 286.95 秒休みます
Resume させる
strands では messages というプロパティで会話履歴を管理しているので、直近の失敗した会話を削除して、最後のユーザー入力を改めて入れてあげればよい。再実行するときも sleep を入れてあげるといいでしょう。
こちらでも紹介した通り。
また、エラーハンドリングが雑なのでスロットリングやタイムアウトのときのみ動くようにするとなおよい。
last_user_content = "YOUR_PROMPT"
agent = Agent()
for i in range(20):
try:
agent(last_user_content)
except Exception as e:
for _ in range(2):
if agent.messages[-1].get("role") == "assistant":
del agent.messages[-1]
elif agent.messages[-1].get("role") == "user":
last_user_content = agent.messages.pop().get("content")
break
else:
raise e
time.sleep(20)
Hook の場合は?
User Guide
API Reference
こちらにある通りやればよい。
各イベントのタイミングで hook をかけられるので、この場合 AfterModelInvocationEvent が良いと思います。
が、AfgerModelInvocationEvent がまだ Experimental だったので今回は使用しませんでした。
こんな感じで使う
import time
from strands import Agent
from strands.hooks import HookProvider, HookRegistry
from strands.experimental.hooks import AfterModelInvocationEvent
class ModelThrottleHook(HookProvider):
def __init__(self, sleep_duration: float = 1.0):
self.sleep_duration = sleep_duration
def register_hooks(self, registry: HookRegistry) -> None:
registry.add_callback(AfterModelInvocationEvent, self.sleep_after_model)
def sleep_after_model(self, event: AfterModelInvocationEvent) -> None:
print(f"Model invocation completed. Sleeping for {self.sleep_duration} seconds...")
time.sleep(self.sleep_duration)
def main():
agent = Agent(hooks=[ModelThrottleHook(sleep_duration=1.0)])
result = agent("2+2は?")
print(f"Result: {result}")
if __name__ == "__main__":
main()