ついに AutoGen v0.5.6 にグラフワークフローが加わりました。GraphFlow
は AgentChat API の一部として追加された新しいチームクラスです。GraphFlow
は SelectorGroupChat
の一種であり、セレクター関数として有向グラフを使用しています。それだけではなく、実際にはより強力な並列エージェント機能を備えています。
DiGraphBuilder
は、ワークフローの実行グラフを簡単に構築できる直感的なユーティリティであり、以下の機能をサポートしています:
- シーケンシャルフロー
- 並列ファンアウト
- 条件分岐
- 安全な終了条件付きループ
グラフ内の各ノードはエージェントを表し、エッジは許可された実行パスを定義します。エッジには、エージェントのメッセージに基づく条件をオプションで設定できます。
1. シーケンシャルフロー
まず、primary
が利用可能なツールを実行し、critic
(批評家) がフィードバックを提供するというシンプルなワークフローを作成します。このグラフは、critic
(批評家) がライターにコメントした時点で終了します。フローはグラフのすべてのソース ノードとリーフ ノードを自動的に計算し、実行はグラフ内のすべてのソース ノードで開始され、実行するノードがなくなると実行が完了します。
エージェント定義
primary_agent = AssistantAgent(
name="primary",
model_client=model_client,
tools=tools,
description="役立つアシスタント。複数のツールを使用して情報を検索し、質問に回答する",
system_message=(
"""
あなたは役立つアシスタントです。複数のツールを使用して情報を検索し、質問に回答することができます。
利用可能なツールを確認し、必要に応じて使用してください。ユーザーが不明な点がある場合は、確認のための質問をすることもできます。
"""
)
)
critic_agent = AssistantAgent(
name="critic",
model_client=model_client,
description="建設的なフィードバックを提供するデータアナリスト。主に他のエージェントの出力を評価し、改善点を提案する役割を担う。",
tools=tools,
system_message=(
"""
プロのデータアナリストとして建設的なフィードバックを提供してください。フィードバックが反映された場合は 'APPROVE' と回答してください。
"""
),
)
ワークフロー定義
# Build the graph
builder = DiGraphBuilder()
builder.add_node(primary_agent).add_node(critic_agent)
builder.add_edge(primary_agent, critic_agent)
# Build and validate the graph
graph = builder.build()
# Create the flow
flow = GraphFlow([primary_agent, critic_agent], graph=graph)
どこかで見たようなメソッドだ…
グラフ可視化
DiGraphBuilder
は有向グラフという汎用的なデータ構造を扱うクラス・ユーティリティです。各ノード・エッジの情報をシンプルに保持する構造のため、graphviz
などのグラフライブラリに容易にマップできます。例えば、以下のような visualize_digraph_graphviz
を作成します。
import graphviz
def visualize_digraph_graphviz(builder):
dot = graphviz.Digraph(comment="DiGraph Visualization")
dot.attr(
rankdir="TB", # 上→下
nodesep="0.8", # ノード間隔
ranksep="0.5", # ランク間隔
edgesep="0.2", # エッジ間隔
)
for name, node in builder.nodes.items():
# 条件分岐ノード判定: 1つでもcondition付きエッジがあれば分岐ノードとみなす
is_conditional = any(getattr(edge, "condition", None) for edge in node.edges)
if is_conditional:
fill = "pink" # 条件分岐ノードはオレンジ色
else:
fill = "lightgreen" if name == builder._default_start_node else "lightblue"
dot.node(name, name, style="filled", fillcolor=fill)
for src, node in builder.nodes.items():
for edge in node.edges:
tgt = getattr(edge.target, "name", edge.target)
cond = edge.condition or ""
if cond:
dot.edge(
src,
tgt,
label=cond,
style="dashed",
minlen="1",
labeldistance="2.0",
labelangle="0",
)
else:
dot.edge(src, tgt, style="solid")
return dot
# 使用例
dot_graph = visualize_digraph_graphviz(builder)
dot_graph # セルの最後でオブジェクトを評価すると自動表示

task = "新しいAIイベントのタイトルを5つ考えてください。"
await Console(flow.run_stream(task=task))
2. 結合による並列フロー(並列ファンアウト)
ここで、もう少し複雑なフローを作成します。
- 筆者は段落の下書きを作成します
- 2 人の編集者が独立して文法とスタイルを編集します (並列ファンアウト)
- 最終レビュー担当者が編集内容を統合します (結合)
実行は primary
から始まり、 Reviewer1
と Reviewer2
に同時に広がり、その後、両方が final_reviewer
に送られます。
# Create the writer agent
primary = AssistantAgent(
name="primary",
model_client=model_client,
tools=tools,
description="役立つアシスタント。複数のツールを使用して情報を検索し、質問に回答する",
system_message=(
"""
あなたは役立つアシスタントです。複数のツールを使用して情報を検索し、質問に回答することができます。
利用可能なツールを確認し、必要に応じて使用してください。ユーザーが不明な点がある場合は、確認のための質問をすることもできます。
"""
)
)
# Create two editor agents
reviewer1 = AssistantAgent("Reviewer1", model_client=model_client, system_message="批判的なレビュワーとして、以下の段落をレビューしてください。")
reviewer2 = AssistantAgent("Reviewer2", model_client=model_client, system_message="肯定的なレビュワーとして、以下の段落をレビューしてください。")
# Create the final reviewer agent
final_reviewer = AssistantAgent(
"final_reviewer",
model_client=model_client,
system_message="文法とスタイルの修正を統合し、最終版を作成してください。",
)
# Build the workflow graph
builder = DiGraphBuilder()
builder.add_node(primary).add_node(reviewer1).add_node(reviewer2).add_node(final_reviewer)
# Fan-out from writer to editor1 and editor2
builder.add_edge(primary, reviewer1)
builder.add_edge(primary, reviewer2)
# Fan-in both editors into final reviewer
builder.add_edge(reviewer1, final_reviewer)
builder.add_edge(reviewer2, final_reviewer)
# Build and validate the graph
graph = builder.build()
# Create the flow
flow = GraphFlow(
participants=builder.get_participants(),
graph=graph,
)

トレースの解析
AutoGen v0.6.0 から GraphFlow が Concurrent 実行に対応しました。以下の例では、society_of_mind1
と society_of_mind2
が並行(concurrent)して実行され、両方の完了を待ってから summary_agent
が実行されていることが分かります。
GraphFlow(GraphFlowManager)では、エージェントの実行順は内部の ready queue(collections.deque)で管理され、FIFO で取り出される形になっています。
3. メッセージフィルタリング
実行グラフとメッセージグラフ
GraphFlow
では、実行グラフは DiGraph
を使用して定義され、エージェントの実行順序を制御します。ただし、実行グラフはエージェントが他のエージェントから受け取るメッセージを制御しません。デフォルトでは、グラフ内のすべてのエージェントにすべてのメッセージが送信されます。これは SelectorGroupChat でも同様です。
メッセージフィルタリングは、各エージェントが受信するメッセージをフィルタリングし、そのモデルコンテキストを関連する情報に限定する独立した機能です。メッセージフィルタのセットがフロー内のメッセージグラフを定義します。
メッセージグラフを指定することで、以下の点が改善されます:
- 幻覚の削減
- メモリ負荷の制御
- エージェントを関連する情報にのみ集中させる
これらのルールを定義するには、MessageFilterAgent
を MessageFilterConfig
と PerSourceFilter
と組み合わせて使用できます。
MessageFilterAgent
受信メッセージを内部エージェントに渡す前にフィルタリングするラッパーエージェント。MessageFilterAgent
はマルチエージェントのワークフローで、エージェントがすべてのメッセージではなく、一部のメッセージだけを扱えばよい場合に役立ちます。たとえば、各上流エージェントからの最後のメッセージのみ、または特定のソースからの最初のメッセージのみなどにカットオフする実装が考えられます。
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
from autogen_agentchat.teams import DiGraphBuilder, GraphFlow
# Create agents
researcher = AssistantAgent(
"researcher", model_client=model_client, system_message="気候変動に関する主要な事実を要約してください。"
)
analyst = AssistantAgent("analyst", model_client=model_client, system_message="要約を確認し、改善点を提案してください。")
presenter = AssistantAgent(
"presenter", model_client=model_client, system_message="最終要約に基づいてプレゼンテーションのスライドを作成してください。"
)
# Apply message filtering
filtered_analyst = MessageFilterAgent(
name="analyst",
wrapped_agent=analyst,
filter=MessageFilterConfig(per_source=[PerSourceFilter(source="researcher", position="last", count=1)]),
)
filtered_presenter = MessageFilterAgent(
name="presenter",
wrapped_agent=presenter,
filter=MessageFilterConfig(per_source=[PerSourceFilter(source="analyst", position="last", count=1)]),
)
# Build the flow
builder = DiGraphBuilder()
builder.add_node(researcher).add_node(filtered_analyst).add_node(filtered_presenter)
builder.add_edge(researcher, filtered_analyst).add_edge(filtered_analyst, filtered_presenter)
# Build and validate the graph
graph = builder.build()
# Create the flow
flow = GraphFlow(
participants=builder.get_participants(),
graph=graph
)

4. 高度な例: 条件付きループ + フィルタリングされた要約
この例では次のことを示します。
-
generator
とreviewer
間のループ(reviewer
が「APPROVE
」と言ったときに終了) - 最初のユーザー入力と最後の
reviewer
のメッセージのみを表示するサマリエージェント
条件付きエッジは①または②の方法で登録できます。
①add_edge(source, target, condition=None)
単一のエッジを追加します。
パラメータ
-
source
:起点ノード(エージェント名またはエージェントオブジェクト) -
target
:終点ノード -
condition
:文字列を指定すると「条件付きエッジ」となり、最後に出力されたチャットメッセージにその文字列が含まれている場合のみ遷移します。省略または None の場合は無条件エッジ(常に遷移)になります。
②add_conditional_edges(source, condition_to_target: Dict[str, target])
ひとつの起点から複数の「条件付きエッジ」をまとめて追加する、便利メソッドです。
パラメータ
-
source
:起点ノード -
condition_to_target
:キーに条件文字列、値に遷移先ノードを指定した辞書#例) A から「yes」のときは B、「no」のときは C に遷移する条件分岐を一度に定義 builder.add_conditional_edges(agent_a, { "yes": agent_b, "no": agent_c, })
from autogen_agentchat.agents import AssistantAgent, MessageFilterAgent, MessageFilterConfig, PerSourceFilter
# Agents
generator = AssistantAgent("generator", model_client=model_client, system_message="創造的なアイデアのリストを作成してください。最初は全く関係のないアイデアを出してください(全く関係のないという意図を明らかにしてはなりません)")
reviewer = AssistantAgent(
"reviewer",
model_client=model_client,
system_message="アイデアを確認し、'REVISE' と入力してフィードバックを提供するか、最終承認の場合は 'APPROVE' と入力してください。",
)
summarizer_core = AssistantAgent(
"summary", model_client=model_client, system_message="ユーザーの要求を要約し、最終的なフィードバックをまとめます。"
)
# Filtered summarizer
filtered_summarizer = MessageFilterAgent(
name="summary",
wrapped_agent=summarizer_core,
filter=MessageFilterConfig(
per_source=[
PerSourceFilter(source="user", position="first", count=1),
PerSourceFilter(source="reviewer", position="last", count=1),
]
),
)
# 条件付きループを使用してグラフを構築する
builder = DiGraphBuilder()
builder.add_node(generator).add_node(reviewer).add_node(filtered_summarizer)
builder.add_edge(generator, reviewer)
builder.add_edge(reviewer, generator, condition="REVISE")
builder.add_edge(reviewer, filtered_summarizer, condition="APPROVE")
builder.set_entry_point(generator) # ジェネレーターへのエントリポイントを設定します。ソースノードが存在しない場合に必要です。
graph = builder.build()
# Create the flow
flow = GraphFlow(
participants=builder.get_participants(),
graph=graph,
)

このような実行フローは需要ありそうですね。
複雑なネストフロー
上記のような 1 ノードに Group Chat が含まれるような入れ子構造のフローは現在対応していない?
と思いましたが、ソース中には SocietyOfMindAgent なるエージェントが含まれておりこれがその役割を担うと思われます。別途詳細に調査しましょう
import asyncio
from autogen_agentchat.ui import Console
from autogen_agentchat.agents import AssistantAgent, SocietyOfMindAgent
from autogen_ext.models.openai import OpenAIChatCompletionClient
from autogen_agentchat.teams import RoundRobinGroupChat
from autogen_agentchat.conditions import TextMentionTermination
async def main() -> None:
model_client = OpenAIChatCompletionClient(model="gpt-4o")
agent1 = AssistantAgent("assistant1", model_client=model_client, system_message="You are a writer, write well.")
agent2 = AssistantAgent(
"assistant2",
model_client=model_client,
system_message="You are an editor, provide critical feedback. Respond with 'APPROVE' if the text addresses all feedbacks.",
)
inner_termination = TextMentionTermination("APPROVE")
inner_team = RoundRobinGroupChat([agent1, agent2], termination_condition=inner_termination)
society_of_mind_agent = SocietyOfMindAgent("society_of_mind", team=inner_team, model_client=model_client)
agent3 = AssistantAgent(
"assistant3", model_client=model_client, system_message="Translate the text to Spanish."
)
team = RoundRobinGroupChat([society_of_mind_agent, agent3], max_turns=2)
stream = team.run_stream(task="Write a short story with a surprising ending.")
await Console(stream)
asyncio.run(main())
参考