2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Strands AgentsのGraphはストリーミングレスポンスに対応していない

Posted at

きっかけ

Strands Agents(以後Strands)を日常的に触っており、以下のようなアプリケーションを作ろうとしました。

  • バックエンド : 複数のエージェントでワークフローを構成
  • フロントエンド : チャットベースでエージェントとコミュニケーションできるように

ひとまずフロントエンドをStreamlitで、バックエンドをStrandsのGraphでワークフローを組むようにしました。

結論

まず結論ですが、現状StrandsのGraphではストリーミングレスポンスがサポートされていません。
そのため、Graphではストリーミング形式のUIを実装できません。なのでThe LLMのチャットのような実装ができません。

Issueもあがっていますが、まだ実装されていないようです。
https://github.com/strands-agents/sdk-python/issues/608

ちなみにGraphだけでなく、Swarmも対応していないようです。

Agentクラスならストリーミングレスポンスできる。

Agentクラスであれば、以下のようなに簡単にStreamlitでストリーミング対応のチャットアプリケーションが実装できます。

import streamlit as st
import asyncio
import nest_asyncio
from strands import Agent

# nest_asyncioを適用してStreamlit環境でasyncを使用可能にする
nest_asyncio.apply()

st.title("Simple Chat with Strands Agent")

# Agentを初期化(session_stateで保持)
if "agent" not in st.session_state:
    st.session_state.agent = Agent(
        system_prompt="You are a helpful assistant. Please respond concisely and clearly.",
        callback_handler=None
    )

# チャット履歴を初期化
if "messages" not in st.session_state:
    st.session_state.messages = []

# チャット履歴を表示
for message in st.session_state.messages:
    with st.chat_message(message["role"]):
        st.markdown(message["content"])

# ユーザー入力を受け取る
if prompt := st.chat_input("メッセージを入力してください"):
    # ユーザーメッセージを履歴に追加して表示
    st.session_state.messages.append({"role": "user", "content": prompt})
    with st.chat_message("user"):
        st.markdown(prompt)
    
    # アシスタントの応答を生成して表示
    with st.chat_message("assistant"):
        # ストリーミング応答用のプレースホルダー
        message_placeholder = st.empty()
        full_response = ""
        
        # Strands Agentからストリーミング応答を取得
        async def get_streaming_response():
            response_text = ""
            async for event in st.session_state.agent.stream_async(prompt):
                if "data" in event:
                    response_text += event["data"]
                    message_placeholder.markdown(response_text + "")
            message_placeholder.markdown(response_text)
            return response_text
        
        # asyncio.runでストリーミング応答を実行
        full_response = asyncio.run(get_streaming_response())
        
        # アシスタントの応答を履歴に追加
        st.session_state.messages.append({"role": "assistant", "content": full_response})

ソースコードを見てみる

StrandsのGraphの情報自体があまりないので、本当にストリーミングレスポンスができないのかソースコードを確認してみます。

Graphエージェントの実体はGraphクラスであり、このクラスはMultiAgentBaseクラスを継承しています。

class Graph(MultiAgentBase):
    """Directed Graph multi-agent orchestration with configurable revisit behavior."""

Graphクラスにてエージェント実行を行うメソッドは

  • __call__メソッド
  • invoke_asyncメソッド

であり、__call__メソッドは内部的にinvoke_asyncメソッドを呼び出しています。


def __call__(
	self, task: str | list[ContentBlock], invocation_state: dict[str, Any] | None = None, **kwargs: Any
) -> GraphResult:

	...

    def execute() -> GraphResult:
            return asyncio.run(self.invoke_async(task, invocation_state)) # invoke_asyncを呼び出している

そのため、invoke_asyncメソッドの実装を見ていけばわかりそうです。
実際にinvoke_asyncメソッドを見てみると、その中で_execute_graphメソッドを呼び出しており、さらにそこから_execute_nodeメソッドを呼び出しています。

async def invoke_async(

            ...

            await self._execute_graph(invocation_state)
async def _execute_graph(self, invocation_state: dict[str, Any]) -> None:

        ...
        
        tasks = [asyncio.create_task(self._execute_node(node, invocation_state)) for node in current_batch]

_execute_node_メソッドがGraph内のノード(エージェント)を実行する中核です。
この_execute_node内の実装を見ると、

async def _execute_node(self, node: GraphNode, invocation_state: dict[str, Any]) -> None:

	...

	if isinstance(node.executor, MultiAgentBase):
		...
		multi_agent_result = await node.executor.invoke_async(node_input, invocation_state)
		...
	elif isinstance(node.executor, Agent):
		if self.node_timeout is not None:
			agent_response = await asyncio.wait_for(
				node.executor.invoke_async(node_input, **invocation_state),
				timeout=self.node_timeout,
			) # ここがノード実行部分
		else:
			agent_response = await node.executor.invoke_async(node_input, **invocation_state)

node.executorはAgentクラス、もしくはMultiAgentBaseクラスです。

class GraphNode:

	...
	
	node_id: str
	executor: Agent | MultiAgentBase

Agentクラスにはストリーミング出力するstream_asyncメソッドがありますが、_execute_nodeでは利用されていません。MultiAgentBaseにはストリーミング出力する実装が見当たりません。

_execute_graphメソッドを見ると、グラフの実行は「バッチでノードを実行し、待つ」→「次の準備完了ノードを探す」の繰り返し、外部にイベントを流す仕組みは見当たりません。返り値がNoneなので、ノード毎に出力を得ることもできなさそうです。

async def _execute_graph(self, invocation_state: dict[str, Any]) -> None:

	...

        while ready_nodes:
            ...
            current_batch = ready_nodes.copy()
            ready_nodes.clear()
            tasks = [asyncio.create_task(self._execute_node(node, invocation_state)) for node in current_batch]
            for task in tasks:
                await task
            ready_nodes.extend(self._find_newly_ready_nodes(current_batch))

ソースコードを見ても、ストリーミングレスポンスに対応していないことがわかります。
というか、Graph全体が実行されるまで出力を得ることができなさそうです。

まとめ

  • Graphはストリーミングレスポンスに対応していない
  • というか、Graph全体が実行されるまで出力を得ることができない
  • 今のGraphの実装だと、まだフロントエンドをもつアプリケーションに組み込むのは難しそう
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?