生成 AI の宿命 : スロットリング
みんなしてますよね。みんなが一斉にリクエストすりゃそりゃフルスロットルですわ。
とはいえ、レスポンスは遅くてもいいから Agent を完走させたい、そんな思いで書いてます。
AgentCore の仕様(ただの確認)
AgentCore は 8 時間まで実行できるんですよー、という情報だけだとハマる(私がハマった)ので、念の為確認。
- 同期は 15 分
- ストリーミングは 1 時間
- 非同期が 8 時間
ってだけです。
1 時間超えるようなら非同期実行しような!
uvicorn の設定
AgentCore を動かす時は starter toolkit を使うと簡単です。
starter toolkit では http サーバーとして uvicorn (FastAPI)を使うのですが、uvivorn の --timeout-keep-alive
及び --timeout-graceful-shutdown
の設定をいじらないと Failed to invoke agent endpoint: HTTPConnectionPool(host='0.0.0.0', port=8080): Read timed out.
とよく勝手に接続を閉じられてしまいます。
おそらく graceful shutdown のほうだけでよいと思いますが、一応以下のような設定をすると呼び出しあと、勝手に切られることがなくなりました。たぶん。
import os
os.environ['UVICORN_TIMEOUT_KEEP_ALIVE'] = '28800'
os.environ['UVICORN_TIMEOUT_GRACEFUL_SHUTDOWN'] = '28800'
28800 秒は AgentCore Runtime の最長時間 8 時間です。
BedrockModel の設定
strands では BedrockModel
クラスを使うことでモデル呼び出しの細かい設定をできます。
BedrockModel の実態はもちろん boto3 であり、boto3 の設定を渡すこともできます。
リトライ回数やタイムアウトの設定をいじるといいでしょう。これはワークロードによりますが、私はこんな設定にしています。
def create_bedrock_model(
model_id="us.anthropic.claude-sonnet-4-20250514-v1:0",
region_name="us-west-2"
):
return BedrockModel(
model_id=model_id,
region_name=region_name,
boto_client_config = Config(
retries={
'total_max_attempts': 100, # 100 回やれ、って設定にしているけど、100回やっているようには見えない
'mode': 'standard'
},
connect_timeout = 10, # 接続に 10 秒もかかることあるんか?
read_timeout = 300, # こっちは大幅に上げたら改善しました。レスポンスが返り始めるまでまぁまぁ時間かかるからだと思う
),
temperature=0,
top_p=0,
)
設定値の詳細はこちらを見ると良いね。
Strands の Event Loop の設定
ここらへんからこんな実装やめろwwwwという話。
Agent の出力をストリーミングしたい場合、
async def generate():
agent = Agent(
tools=[calculator, http_request],
callback_handler=None
)
try:
async for event in agent.stream_async(request.prompt):
if "data" in event:
# Only stream text chunks to the client
yield event["data"]
みたいに書きますが、これ、Agent が途中で止まってしまった時どうやってリトライすんねん!!!
という話が残ります。(解決策は後述)
Bedrock の Converse だったら最新のユーザーリクエストと過去の会話履歴をつなげて渡せばいいだけなのですが、Strands Agents の場合は agent インスタンスに過去履歴を持っているけど、そこに改めて request.prompt
(つまり最初のユーザーリクエスト)なんて入れたら会話履歴がぐっちゃぐっちゃになりません?というお話。
特に会話が長くなると input tokens が長くなって容易に input tokens で too much tokens
します。
resume()
みたいなメソッドがあったらいいのですがそんなものはありません。
とはいえ、Strands 側の event_loop.py
にもリトライ機構があるのですが、
for attempt in range(MAX_ATTEMPTS):
model_id = agent.model.config.get("model_id") if hasattr(agent.model, "config") else None
model_invoke_span = tracer.start_model_invoke_span(
messages=agent.messages,
parent_span=cycle_span,
model_id=model_i
その MAX_ATTEMPTS
とかどうなってんのや、と見てみると
MAX_ATTEMPTS = 6
INITIAL_DELAY = 4
MAX_DELAY = 240 # 4 minutes
まさかのハードコーディングでござる。
特に INITIAL_DELAY = 4
が致命的で 4 秒なんかでトークンが回復するはずありません。
uv や venv で Strands Agents を使っている場合は直接書き換えられますが、AgentCore の場合は コンテナなので Dockerfile を書き換えなければいけません・・・。
ということでオラオラ実装
# 依存パッケージをインストールした後、event_loop.pyファイルを修正
RUN sed -i 's/MAX_ATTEMPTS = 60/MAX_ATTEMPTS = 60/' /usr/local/lib/python3.13/site-packages/strands/event_loop/event_loop.py && \
sed -i 's/INITIAL_DELAY = 4/INITIAL_DELAY = 60/' /usr/local/lib/python3.13/site-packages/strands/event_loop/event_loop.py && \
sed -i 's/MAX_DELAY = 240 # 4 minutes/MAX_DELAY = 600 # 10 minutes/' /usr/local/lib/python3.13/site-packages/strands/event_loop/event_loop.py
ガハハハ、Strands Agents 側に修正入ったら終わるZeeeeeee
とはいえ、sdk-python 側でもこの問題は提起されており、Issue も PR も出ています。
改善を待ちたいところです(お前がやれ、というまさかりが飛んできそう)。
Strands の処理の resume
とはいえ、どれだけコードを書いてもスロットリングやタイムアウトはするものです。
全部やり直しはきついので途中から resume させたいです。
特に strands 側で agent.resume()
みたいなことはできないので(PR作ればええやん、という話は置いておいて)別途実装する必要があります。
agent=Agent()
の場合
上記のような agent インスタンスを生成した場合、会話履歴は agent.messages というプロパティで管理されています。どこか途中で止まったら最後の user input まで巻き戻して再度 user input を投入すれば resume できます。例えば以下のようにやれば 120 秒間隔で 20 回 resume できます。Exponential Backoff を入れたければどうぞご自由に(ただ、一回でも成功したら Exponential Backoff の時間をリセットしたいので意外と実装はむずい)
last_user_content = "XXX"
agent = Agent()
for i in range(20):
try:
agent(last_user_content)
except Exception as e:
for _ in range(2):
if agent.messages[-1].get("role") == "assistant":
del agent.messages[-1]
elif agent.messages[-1].get("role") == "user":
last_user_content = agent.messages.pop().get("content")
break
else:
raise e
time.sleep(20)
Swarm を使っていた場合(まだ怪しい)
マルチエージェントで Swarm を使う場合もあるでしょう。そんなときの実装例は以下
agents = [agent1, agent2, agent3, ...]
from time import sleep
from strands.multiagent.base import Status
import copy
# 全体の状態を保存する変数
saved_node_states = {}
saved_shared_context = None
for i in range(20):
print(f"試行 {i+1}/20 開始")
# 初回以外は状態を復元
if i > 0 and saved_node_states:
for node_id, node in swarm.nodes.items():
if node_id in saved_node_states:
node.executor.messages = copy.deepcopy(saved_node_states[node_id]['messages'])
if saved_shared_context:
swarm.shared_context = copy.deepcopy(saved_shared_context)
result = swarm(last_user_content)
if result.status == Status.COMPLETED:
print("成功しました")
break
else:
print(f"失敗しました (試行 {i+1}/20): ステータス={result.status}")
# 失敗時に全ノードの状態を保存
saved_node_states = {}
for node_id, node in swarm.nodes.items():
messages_copy = copy.deepcopy(node.executor.messages)
# 不完全なassistantメッセージを削除
if messages_copy and messages_copy[-1].get("role") == "assistant":
messages_copy.pop()
saved_node_states[node_id] = {
'messages': messages_copy
}
# SharedContextも保存
saved_shared_context = copy.deepcopy(swarm.shared_context)
current_node = swarm.state.current_node
if current_node and current_node.executor.messages:
for msg in reversed(current_node.executor.messages):
if msg.get("role") == "user":
last_user_content = msg.get("content", frame_input_prompt)
break
if i == 19:
raise Exception("最大リトライ回数に達しました")
sleep(120)
else:
print("20回すべて失敗しました")
呼び出し側のタイムアウト
agentcore starter toolkit を使うと agentcore invoke 'xxxx'
とか agentcore invoke 'xxxx' --local
などで Agent を呼び出せるが、呼び出し側のタイムアウトが 15 分で設定されており、mcp などで時間がかかっているとタイムアウトしてしまう。
response = requests.post(
url,
params={"qualifier": endpoint_name},
headers=headers,
json=body,
timeout=900,
stream=True,
)
config = Config(
read_timeout=900,
connect_timeout=60,
retries={"max_attempts": 3},
)
残念ながらここもいじれません。ので、自作しました。
#!/usr/bin/env python3
"""Custom invoke script with configurable timeout for Bedrock AgentCore."""
import json
import os
import sys
import urllib.parse
from pathlib import Path
from typing import Optional
import boto3
import requests
import yaml
from botocore.config import Config
def load_config(config_path: Path):
"""Load .bedrock_agentcore.yaml configuration."""
with open(config_path, 'r') as f:
return yaml.safe_load(f)
def get_agent_config(config, agent_name: Optional[str] = None):
"""Get agent configuration."""
if agent_name:
return config['agents'][agent_name]
return config['agents'][config['default_agent']]
def invoke_local(payload: dict, session_id: str, timeout: int = 3600):
"""Invoke local endpoint."""
url = "http://0.0.0.0:8080/invocations"
headers = {
"Content-Type": "application/json",
"X-Amzn-Bedrock-AgentCore-Runtime-Session-Id": session_id,
}
response = requests.post(
url,
headers=headers,
json=payload,
timeout=timeout,
stream=True
)
if "text/event-stream" in response.headers.get("content-type", ""):
for line in response.iter_lines(chunk_size=1):
if line:
line = line.decode("utf-8")
if line.startswith("data: "):
json_chunk = line[6:]
try:
parsed_chunk = json.loads(json_chunk)
if isinstance(parsed_chunk, str):
print(parsed_chunk, end="")
else:
print(json.dumps(parsed_chunk, ensure_ascii=False), end="")
except json.JSONDecodeError:
print(json_chunk, end="")
else:
print(response.text)
def invoke_cloud(agent_arn: str, payload: dict, session_id: str, region: str, timeout: int = 3600):
"""Invoke cloud endpoint."""
config = Config(
read_timeout=timeout,
connect_timeout=60,
retries={"max_attempts": 3},
)
client = boto3.client(
"bedrock-agentcore",
region_name=region,
config=config
)
req = {
"agentRuntimeArn": agent_arn,
"qualifier": "DEFAULT",
"runtimeSessionId": session_id,
"payload": json.dumps(payload, ensure_ascii=False),
}
response = client.invoke_agent_runtime(**req)
if "text/event-stream" in response.get("contentType", ""):
for event in response.get("response", []):
print(event, end="")
else:
events = []
for event in response.get("response", []):
events.append(event)
print(json.dumps(events, indent=2, ensure_ascii=False))
def main():
import argparse
parser = argparse.ArgumentParser(description="Invoke Bedrock AgentCore with custom timeout")
parser.add_argument("payload", help="JSON payload to send")
parser.add_argument("--agent", "-a", help="Agent name")
parser.add_argument("--session-id", "-s", help="Session ID")
parser.add_argument("--local", "-l", action="store_true", help="Use local mode")
parser.add_argument("--timeout", "-t", type=int, default=3600, help="Timeout in seconds (default: 3600)")
args = parser.parse_args()
# Load configuration
config_path = Path.cwd() / ".bedrock_agentcore.yaml"
if not config_path.exists():
print("Error: .bedrock_agentcore.yaml not found")
sys.exit(1)
config = load_config(config_path)
agent_config = get_agent_config(config, args.agent)
# Parse payload
try:
payload_data = json.loads(args.payload)
except json.JSONDecodeError:
payload_data = {"message": args.payload}
# Generate session ID if not provided (must be at least 33 chars)
session_id = args.session_id or f"session-{os.urandom(16).hex()}-{os.urandom(8).hex()}"
print(f"Payload: {json.dumps(payload_data, indent=2, ensure_ascii=False)}")
print(f"Session ID: {session_id}")
print(f"Timeout: {args.timeout}s")
print(f"Mode: {'Local' if args.local else 'Cloud'}")
print("-" * 50)
try:
if args.local:
invoke_local(payload_data, session_id, args.timeout)
else:
agent_arn = agent_config['bedrock_agentcore']['agent_arn']
if not agent_arn:
print("Error: Agent not deployed. Run 'agentcore launch' first")
sys.exit(1)
region = agent_config['aws']['region']
invoke_cloud(agent_arn, payload_data, session_id, region, args.timeout)
except Exception as e:
print(f"Error: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
こんな感じで実行できます
uv run invoke.py '{"prompt": "Hello"}' --local --timeout 1800
BedrockModel の署名のタイムアウト問題(たぶん未解決)
Agent を作るときは以下のようなコードで↑で紹介したような create_bedrock_model
関数を用意すると簡単です。
agent = Agent(
model=create_bedrock_model(),
tools=tools,
system_prompt=SYSTEM_PROMPT
)
Agent の処理が長引いてくると、AWS の API を叩くための Sigv4 の有効期限が切れてエラーになることがあります。
Agent のレジュームの仕方に確信をもっていないのでなんともですが、こんな感じに実装すれば Agent 実行時に再度署名してくれてうまく動いてくれるっぽいですが、いまいち再現もしないし怪しいです。
class RetryableBedrockModel:
"""BedrockModel wrapper with retry capability"""
def __init__(self, model_id, boto_client_config, temperature=0, top_p=0, max_retries=5, base_delay=30):
self.model_id = model_id
self.boto_client_config = boto_client_config
self.temperature = temperature
self.top_p = top_p
self.max_retries = max_retries
self.base_delay = base_delay
self._model = None
self._create_model()
def _create_model(self):
self._model = BedrockModel(
model_id=self.model_id,
boto_client_config=self.boto_client_config,
temperature=self.temperature,
top_p=self.top_p,
)
retryable_model = RetryableBedrockModel(
model_id="us.anthropic.claude-sonnet-4-20250514-v1:0",
boto_client_config=Config(
retries={
'total_max_attempts': 100,
'mode': 'standard'
},
read_timeout=3600,
connect_timeout=3600
),
temperature=0,
top_p=0,
)
agent = Agent(
model=retryable_model,
tools=tools
system_prompt=SYSTEM_PROMPT
)