3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Strands Agentsのグラフパターンのワークフローについて

Posted at

はじめに

Strands Agentsは、AWSが開発したAIエージェント構築用のオープンソースSDKです。本記事では、Strands Agentsのマルチエージェントパターンのひとつである「グラフパターン」について、具体的な実装例とともに詳しく解説していこうと思います。

今回はマルチエージェントオーケストレーションに焦点を当て、以下の内容を中心にしてきます。

  • グラフパターンの内部動作原理
  • 条件分岐を含む複雑なワークフローの実装
  • サイクリックグラフによる反復処理の実装
  • 本番環境を考慮したベストプラクティス

また、本記事はStrandsAgentsの基礎知識やグラフデータベース周辺の知識については理解している前提で書いているのでよろしくお願いします。

StrandsAgentsとは

StrandsAgentsは、AWSのオープンソース部門が発表した、モデル駆動型(model‑driven)アプローチをとるAIエージェント向けのSDKです。
現在はPythonに加え、TypeScript版も登場しています。

最小3行で書ける!?

StrandsAgentsは以下のように最小3行で動かすことができます。
デフォルトでの呼び出し時はClaude Sonnet4が指定され呼ばれるようになっています。
別のモデルを利用したい場合、コードないで指定することで利用することができます。
Strandsの基本的なお話はそこそこにして次から今回の本題となる、グラフパターンについて入っていきます。

agent.py
from strands import Agent
agent = Agent()
response = agent("こんにちは!")

グラフパターンとは

ではここから本題となるグラフパターンについてです。
グラフパターンは、エージェントや他のマルチエージェントシステムをノードとして持つ、決定論的な有向グラフベースのオーケストレーションシステムです。

主な特徴

  1. 決定論的な実行順序: グラフ構造に基づいた明確な実行フロー
  2. 出力の自動伝播: ノード間でのデータフロー
  3. 明確な依存関係管理: ノード間の依存関係を明示的に定義
  4. ネストされたパターンのサポート: グラフやSwarmを別のグラフのノードとして使用可能
  5. サイクリックグラフのサポート: フィードバックループや反復処理が可能

グラフパターンの動作原理

グラフは以下の原則で動作します:

  1. ノードはエージェント、カスタムノード、またはマルチエージェントシステムを表す
  2. エッジは依存関係と情報フローを定義
  3. 実行は依存関係を尊重しながらグラフ構造に従う
  4. あるノードの出力は、依存する次のノードの入力となる
  5. エントリーポイントは元のタスクを入力として受け取る

※イメージ図は以下

image.png

基本的な実装例

GraphBuilderによるグラフ構築

Gremlinなどではグラフを構築する際、エッジやノード、エッジの始発点と終着点を全部定義する必要がありますが、StrandsではGraphBuilderといったインターフェースを利用することで比較的容易にグラフを構築することができます。

  • add_node():ノードを追加する。この時追加するのはエージェントやマルチエージェントを丸ごと追加できたりする
  • add_edge():ノード間の依存関係(上図で言う矢印)を追加
  • set_entry_point():開始点となるエントリーポイントを指定する
  • set_execution_timeout():全体での最大実行時間を決める
  • build():実際にグラフを構築して検証したり実行する

シンプルな並列処理の実装例

以下はシンプルな研究レポート作成のワークフローになります。
これは複数の独立したタスクを並列実行し、その結果を集約するFan-out/Fan-inパターンです。これはデータ分析やリサーチタスクで頻繁に使用されます。

import logging
from strands import Agent
from strands.multiagent import GraphBuilder
from strands.multiagent.graph import GraphState
from strands.multiagent.base import Status

# デバッグログを有効化(本番環境では適切なログレベルに調整)
logging.getLogger("strands.multiagent").setLevel(logging.INFO)

# 専門分野ごとのリサーチエージェントを作成
medical_researcher = Agent(
    name="medical_researcher",
    system_prompt="""あなたは医療分野の研究専門家です。
    医学的な観点から、提供されたトピックについて最新の研究や
    臨床データを調査し、エビデンスベースの情報を提供してください。"""
)

tech_researcher = Agent(
    name="tech_researcher",
    system_prompt="""あなたは技術分野の研究専門家です。
    テクノロジーの観点から、提供されたトピックについて
    最新の技術動向やイノベーションを調査してください。"""
)

economic_researcher = Agent(
    name="economic_researcher",
    system_prompt="""あなたは経済分野の研究専門家です。
    経済的な影響や市場動向の観点から、提供されたトピックを
    分析してください。"""
)

# 集約エージェント
synthesizer = Agent(
    name="synthesizer",
    system_prompt="""あなたは情報統合の専門家です。
    複数の専門家から提供された調査結果を統合し、
    包括的で矛盾のない分析レポートを作成してください。
    各専門分野の知見を適切に関連付けてください。"""
)

# グラフを構築
builder = GraphBuilder()

# ノードを追加
builder.add_node(medical_researcher, "medical_research")
builder.add_node(tech_researcher, "tech_research")
builder.add_node(economic_researcher, "economic_research")
builder.add_node(synthesizer, "synthesis")

# 並列実行の設定(依存関係なし = 自動的に並列実行)
# 各リサーチエージェントは独立して実行される
builder.add_edge("medical_research", "synthesis")
builder.add_edge("tech_research", "synthesis")
builder.add_edge("economic_research", "synthesis")

# エントリーポイントを設定
# 設定しない場合、依存関係のないノードが自動的にエントリーポイントになる
builder.set_entry_point("medical_research")
builder.set_entry_point("tech_research")
builder.set_entry_point("economic_research")

# タイムアウトを設定
builder.set_execution_timeout(600)  # 全体で10分
builder.set_node_timeout(120)       # 各ノード2分

# グラフをビルド
graph = builder.build()

# 実行
result = graph("AIのヘルスケアへの影響を、医療・技術・経済の観点から分析してください")

# 結果の確認
print(f"ステータス: {result.status}")
print(f"実行時間: {result.execution_time}ms")

並列実行の仕組み

# これらのノードは依存関係がないため、自動的に並列実行される
builder.add_edge("medical_research", "synthesis")
builder.add_edge("tech_research", "synthesis")
builder.add_edge("economic_research", "synthesis")

Graphは依存関係を解析し、それぞれのノード、medical_researchtech_researcheconomic_researchが独立していることを検出します。これにより、これら3つのノードは同時に実行されます。

すべての依存関係を待つ集約ノード

デフォルトでは、synthesisノードは最初の依存関係(例:medical_research)が完了した時点で実行を開始します。すべてのリサーチ結果を待つには、条件付きエッジを使用します:

def all_dependencies_complete(required_nodes: list[str]):
    """すべての依存ノードが完了するまで待機する条件関数"""
    def check_all_complete(state: GraphState) -> bool:
        # すべてのノードが存在し、かつ完了状態であることを確認
        return all(
            node_id in state.results and 
            state.results[node_id].status == Status.COMPLETED
            for node_id in required_nodes
        )
    return check_all_complete

# 3つすべてのリサーチが完了してから集約を実行
required = ["medical_research", "tech_research", "economic_research"]
builder.add_edge("medical_research", "synthesis", 
                condition=all_dependencies_complete(required))
builder.add_edge("tech_research", "synthesis", 
                condition=all_dependencies_complete(required))
builder.add_edge("economic_research", "synthesis", 
                condition=all_dependencies_complete(required))

この条件関数により、synthesisノードは3つすべてのリサーチが完了するまで実行されません。これは集約処理で不完全なデータを扱うのを防ぎます。

入力の自動伝播

Graphは各ノードへの入力を自動的に構築します:

  • エントリーポイント(3つのリサーチャー): 元のタスクを受け取る
  • synthesisノード: 元のタスク + 3つのリサーチ結果を受け取る
# synthesisノードが受け取る入力の形式(自動生成)
"""
Original Task: AIのヘルスケアへの影響を、医療・技術・経済の観点から分析してください

Inputs from previous nodes:

From medical_research:
  - medical_researcher: [医療観点の調査結果]

From tech_research:
  - tech_researcher: [技術観点の調査結果]

From economic_research:
  - economic_researcher: [経済観点の調査結果]
"""

この自動伝播により、各エージェントは必要な情報を自動的に受け取り、手動でのデータ受け渡しが不要になります。

条件分岐による動的なルーティング

実際のユースケースでは、前のノードの結果に基づいて動的に処理を分岐させたい場合があります。
以下のように、依存関係を持つノードからの情報をもとに、行動を処理を変えることができ、条件付きエッジを使用することで、これを実現できます。

image.png

条件付きエッジを含むワークフローの実装例

from strands.multiagent.graph import GraphState
from strands.multiagent.base import Status

def only_if_research_successful(state: GraphState) -> bool:
    """研究が成功した場合にのみエッジをトラバース"""
    research_node = state.results.get("research")
    if not research_node:
        return False
    
    # 研究結果が成功を示しているかチェック
    result_text = str(research_node.result)
    return "成功" in result_text or "完了" in result_text

def needs_deep_analysis(state: GraphState) -> bool:
    """深い分析が必要かどうかを判定"""
    research_node = state.results.get("research")
    if not research_node:
        return False
    
    result_text = str(research_node.result).lower()
    # 複雑なキーワードが含まれている場合は深い分析が必要
    complex_keywords = ["複雑", "多面的", "詳細な分析"]
    return any(keyword in result_text for keyword in complex_keywords)

# 条件付きエッジを追加
builder.add_edge("research", "analysis", condition=only_if_research_successful)
builder.add_edge("analysis", "deep_analyst", condition=needs_deep_analysis)

条件分岐の設計パターン

条件関数はGraphStateを受け取り、boolを返します。条件関数の実装には以下のような処理が必要です:

def needs_deep_analysis(state: GraphState) -> bool:
    """深い分析が必要かどうかを判定"""
    research_node = state.results.get("research")
    if not research_node:
        return False
    
    result_text = str(research_node.result).lower()
    # 複雑なキーワードが含まれている場合は深い分析が必要
    complex_keywords = ["複雑", "多面的", "詳細な分析"]
    return any(keyword in result_text for keyword in complex_keywords)

フィードバックループを利用した反復処理

次は、フィードバックループを使った反復的な改善プロセスです。これを使うと、一定の品質基準を満たすまで処理を繰り返すワークフローを構築できます。

処理イメージ

image.png

実装例

def needs_revision(state: GraphState) -> bool:
    """レビュー結果に基づいて修正が必要かを判定"""
    review_result = state.results.get("reviewer")
    if not review_result:
        return False
    
    result_text = str(review_result.result)
    return "修正が必要" in result_text or "改善の余地" in result_text

def is_approved(state: GraphState) -> bool:
    """レビュー結果が承認されたかを判定"""
    review_result = state.results.get("reviewer")
    if not review_result:
        return False
    
    result_text = str(review_result.result)
    return "承認" in result_text or "問題なし" in result_text

# エージェントの作成
draft_writer = Agent(
    name="draft_writer",
    system_prompt="""あなたは技術文書の下書き作成者です。
    フィードバックがあれば、それを反映して改善してください。"""
)

reviewer = Agent(
    name="reviewer",
    system_prompt="""あなたは技術レビュアーです。
    文書の品質を評価し、修正が必要な場合は具体的なフィードバックを提供してください。
    問題がなければ「承認」と明記してください。"""
)

publisher = Agent(
    name="publisher",
    system_prompt="""あなたは最終出版担当者です。
    承認された文書を最終形式に整えてください。"""
)

# サイクリックグラフの構築
builder = GraphBuilder()
builder.add_node(draft_writer, "draft_writer")
builder.add_node(reviewer, "reviewer")
builder.add_node(publisher, "publisher")

# フィードバックループの作成
builder.add_edge("draft_writer", "reviewer")
builder.add_edge("reviewer", "draft_writer", condition=needs_revision)  # サイクル
builder.add_edge("reviewer", "publisher", condition=is_approved)

# 無限ループを防ぐための制限を設定
builder.set_max_node_executions(10)  # 最大10回のノード実行
builder.set_execution_timeout(300)   # 5分のタイムアウト
builder.reset_on_revisit(True)       # 再訪時にノードの状態をリセット

graph = builder.build()

# 実行
result = graph("Pythonの非同期プログラミングに関する技術記事を作成してください")

print(f"\nステータス: {result.status}")
print(f"実行されたノード数: {result.completed_nodes}")
print(f"実行時間: {result.execution_time}ms")

無限ループの防止

# 安全機構1: 最大実行回数
builder.set_max_node_executions(15)  # グラフ全体で最大15回のノード実行

# 安全機構2: タイムアウト
builder.set_execution_timeout(600)  # 10分で強制終了

# 安全機構3: 条件関数内でのカウンター
def needs_revision_with_limit(state: GraphState) -> bool:
    revision_count = state.metadata.get("revision_count", 0)
    if revision_count >= 3:  # 最大3回の改訂
        return False
    return "修正が必要" in str(state.results.get("reviewer").result)

これにより、以下のような状況を防ぐことができます。

  • エージェントが常に「修正が必要」と判断する
  • 条件関数のバグによる無限ループ
  • リソース枯渇

明確な条件の設計

効果的なフィードバックループのために、明確な反復条件を追加しています。

def is_approved_comprehensive(state: GraphState) -> bool:
    """複数の基準で承認を判定"""
    reviewer_result = state.results.get("reviewer")
    if not reviewer_result:
        return False
    
    result_text = str(reviewer_result.result)
    
    # 基準1: 明示的な承認
    has_approval = "承認" in result_text
    
    # 基準2: スコアチェック
    score_match = re.search(r'総合スコア:\s*(\d+)', result_text)
    score_threshold = score_match and int(score_match.group(1)) >= 35
    
    # 基準3: 改訂回数の上限
    revision_count = state.metadata.get("revision_count", 0)
    max_revisions_reached = revision_count >= 3
    
    # いずれかの条件で承認
    return has_approval or score_threshold or max_revisions_reached

この設計により、以下を保証します:

  • 品質基準の達成: スコアが基準を満たす
  • タイムリーな完了: 最大回数で強制的に進む
  • 明示的な承認: レビュアーの判断を尊重

ベストプラクティス

ここまでグラフワークフローの解説や、具体的な実装例などの解説をしてきましたが、実装時に気をつけたいベスプラをドキュメントを参考にまとめておきます。

1. 意味のあるノードIDを使用

実装や実行結果の分析か容易になるためです。
あれ?この名前のノードってなんだっけ..という事象が発生しうるので、当たり前ですがわかりやすい名前をつけておくのは必要です。

# 良い例
builder.add_node(researcher, "market_research")
builder.add_node(analyst, "competitive_analysis")

# 避けるべき例
builder.add_node(researcher, "node1")
builder.add_node(analyst, "node2")

2. エラーハンドリングの実装

ある程度のワークフローだけでなく、処理しきれなかった場合のエクセプションを用意しておくのも大事です。

def safe_condition(state: GraphState) -> bool:
    """安全な条件チェック"""
    try:
        node_result = state.results.get("previous_node")
        if not node_result or node_result.status != Status.COMPLETED:
            return False
        
        # 実際の条件チェック
        return "success" in str(node_result.result).lower()
    
    except Exception as e:
        logging.error(f"条件評価エラー: {e}")
        return False

3. タイムアウトと実行制限の設定

タイムアウト含め、処理の上限を決めておきます。
処理や推論を重ねれば重なるほど、成果物のクオリティは高くなりますが、その分コストが高くなったりリソースの枯渇につながったりします。

# サイクリックグラフには必ず制限を設定
builder.set_max_node_executions(15)  # 無限ループ防止
builder.set_execution_timeout(600)    # 全体のタイムアウト
builder.set_node_timeout(120)         # 個別ノードのタイムアウト

4. 監視とロギング

どういった処理順でフローを流れたのかや、どういった処理の中身になっているのかも確認したいと思います。
Cloud WatchやOpen Telemtryにログを送信することも一つかとお思います。

import logging

# 詳細なロギングを有効化
logging.getLogger("strands").setLevel(logging.DEBUG)
logging.getLogger("strands.multiagent").setLevel(logging.DEBUG)

# カスタムハンドラーの追加
handler = logging.FileHandler("graph_execution.log")
handler.setFormatter(logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
))
logging.getLogger("strands.multiagent").addHandler(handler)

まとめ

今回はStrandsを利用した複雑なマルチエージェントワークフローを色々検証してみました。
ワークフローとの違いもあり思想としては非常に面白いです。

グラフパターンを活用することで、以下のような複雑なシナリオに対応できます

  • 多段階のデータ処理パイプライン
  • 条件分岐を含む意思決定フロー
  • フィードバックループによる反復的改善
  • 並列処理と集約を組み合わせた効率的なワークフロー

今後もワークフローやグラフ中心にあれこれいじって書いていこうとおもいます。

参考

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?