きっかけ
Strands Agents(以後Strands)を日常的に触っており、以下のようなアプリケーションを作ろうとしました。
- バックエンド : 複数のエージェントでワークフローを構成
- フロントエンド : チャットベースでエージェントとコミュニケーションできるように
ひとまずフロントエンドをStreamlitで、バックエンドをStrandsのGraphでワークフローを組むようにしました。
結論
まず結論ですが、現状StrandsのGraphではストリーミングレスポンスがサポートされていません。
そのため、Graphではストリーミング形式のUIを実装できません。なのでThe LLMのチャットのような実装ができません。
Issueもあがっていますが、まだ実装されていないようです。
https://github.com/strands-agents/sdk-python/issues/608
ちなみにGraphだけでなく、Swarmも対応していないようです。
Agentクラスならストリーミングレスポンスできる。
Agentクラスであれば、以下のようなに簡単にStreamlitでストリーミング対応のチャットアプリケーションが実装できます。
import streamlit as st
import asyncio
import nest_asyncio
from strands import Agent
# nest_asyncioを適用してStreamlit環境でasyncを使用可能にする
nest_asyncio.apply()
st.title("Simple Chat with Strands Agent")
# Agentを初期化(session_stateで保持)
if "agent" not in st.session_state:
st.session_state.agent = Agent(
system_prompt="You are a helpful assistant. Please respond concisely and clearly.",
callback_handler=None
)
# チャット履歴を初期化
if "messages" not in st.session_state:
st.session_state.messages = []
# チャット履歴を表示
for message in st.session_state.messages:
with st.chat_message(message["role"]):
st.markdown(message["content"])
# ユーザー入力を受け取る
if prompt := st.chat_input("メッセージを入力してください"):
# ユーザーメッセージを履歴に追加して表示
st.session_state.messages.append({"role": "user", "content": prompt})
with st.chat_message("user"):
st.markdown(prompt)
# アシスタントの応答を生成して表示
with st.chat_message("assistant"):
# ストリーミング応答用のプレースホルダー
message_placeholder = st.empty()
full_response = ""
# Strands Agentからストリーミング応答を取得
async def get_streaming_response():
response_text = ""
async for event in st.session_state.agent.stream_async(prompt):
if "data" in event:
response_text += event["data"]
message_placeholder.markdown(response_text + "▌")
message_placeholder.markdown(response_text)
return response_text
# asyncio.runでストリーミング応答を実行
full_response = asyncio.run(get_streaming_response())
# アシスタントの応答を履歴に追加
st.session_state.messages.append({"role": "assistant", "content": full_response})
ソースコードを見てみる
StrandsのGraphの情報自体があまりないので、本当にストリーミングレスポンスができないのかソースコードを確認してみます。
Graphエージェントの実体はGraphクラスであり、このクラスはMultiAgentBaseクラスを継承しています。
class Graph(MultiAgentBase):
"""Directed Graph multi-agent orchestration with configurable revisit behavior."""
Graphクラスにてエージェント実行を行うメソッドは
- __call__メソッド
- invoke_asyncメソッド
であり、__call__メソッドは内部的にinvoke_asyncメソッドを呼び出しています。
def __call__(
self, task: str | list[ContentBlock], invocation_state: dict[str, Any] | None = None, **kwargs: Any
) -> GraphResult:
...
def execute() -> GraphResult:
return asyncio.run(self.invoke_async(task, invocation_state)) # invoke_asyncを呼び出している
そのため、invoke_asyncメソッドの実装を見ていけばわかりそうです。
実際にinvoke_asyncメソッドを見てみると、その中で_execute_graph
メソッドを呼び出しており、さらにそこから_execute_node
メソッドを呼び出しています。
async def invoke_async(
...
await self._execute_graph(invocation_state)
async def _execute_graph(self, invocation_state: dict[str, Any]) -> None:
...
tasks = [asyncio.create_task(self._execute_node(node, invocation_state)) for node in current_batch]
_execute_node_メソッドがGraph内のノード(エージェント)を実行する中核です。
この_execute_node内の実装を見ると、
async def _execute_node(self, node: GraphNode, invocation_state: dict[str, Any]) -> None:
...
if isinstance(node.executor, MultiAgentBase):
...
multi_agent_result = await node.executor.invoke_async(node_input, invocation_state)
...
elif isinstance(node.executor, Agent):
if self.node_timeout is not None:
agent_response = await asyncio.wait_for(
node.executor.invoke_async(node_input, **invocation_state),
timeout=self.node_timeout,
) # ここがノード実行部分
else:
agent_response = await node.executor.invoke_async(node_input, **invocation_state)
node.executorはAgentクラス、もしくはMultiAgentBaseクラスです。
class GraphNode:
...
node_id: str
executor: Agent | MultiAgentBase
Agentクラスにはストリーミング出力するstream_asyncメソッドがありますが、_execute_nodeでは利用されていません。MultiAgentBaseにはストリーミング出力する実装が見当たりません。
_execute_graphメソッドを見ると、グラフの実行は「バッチでノードを実行し、待つ」→「次の準備完了ノードを探す」の繰り返し、外部にイベントを流す仕組みは見当たりません。返り値がNoneなので、ノード毎に出力を得ることもできなさそうです。
async def _execute_graph(self, invocation_state: dict[str, Any]) -> None:
...
while ready_nodes:
...
current_batch = ready_nodes.copy()
ready_nodes.clear()
tasks = [asyncio.create_task(self._execute_node(node, invocation_state)) for node in current_batch]
for task in tasks:
await task
ready_nodes.extend(self._find_newly_ready_nodes(current_batch))
ソースコードを見ても、ストリーミングレスポンスに対応していないことがわかります。
というか、Graph全体が実行されるまで出力を得ることができなさそうです。
まとめ
- Graphはストリーミングレスポンスに対応していない
- というか、Graph全体が実行されるまで出力を得ることができない
- 今のGraphの実装だと、まだフロントエンドをもつアプリケーションに組み込むのは難しそう