0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【完全版】OpenClawでワークフロー設計:タスク分解からエージェントチェーンによる高度な自動化まで

0
Posted at

09-workflow_design_hero.png

複雑なタスクを自動化するDAG・エージェントチェーン・動的ルーティングの完全実装ガイド

はじめに

AIエージェントに複雑なタスクを任せる際、単に「これをやって」と言うだけでは思うような成果は得られません。

**「どのように分解するか」「どの順序で実行するか」「どのように連携するか」**を設計する必要があります。

これがAIワークフロー設計です。

本記事では、タスク分解、エージェントチェーン、DAG(有向非巡回グラフ)など、AIエージェントの高度なワークフローを設計するための考え方と実装パターンを解説します。

目次

  1. ワークフロー設計の基礎
  2. タスク分解のパターン
  3. エージェントチェーン
  4. DAG(有向非巡回グラフ)
  5. 動的ワークフロー
  6. OpenClawでの実装
  7. トラブルシューティング
  8. まとめ

ワークフロー設計の基礎

ワークフローとは

定義: 複数のエージェントが協力して複雑なタスクを完了するための、タスクの依存関係、実行順序、データの流れを定義したものです。

目的:

  • タスクの適切な分解
  • 並列実行の最適化
  • エージェント間の連携の明確化
  • エラーハンドリングの統一

簡単なワークフローと複雑なワークフロー


タスク分解のパターン

1. 階層的分解

概念: 大きなタスクを段階的に細かく分解します。

# 階層的分解の例
class TaskDecomposer:
    def decompose_hierarchically(self, root_task):
        """階層的分解"""
        tasks = {
            "root": root_task,
            "level_1": [],
            "level_2": [],
            "level_3": []
        }
        
        # レベル1: 主要なフェーズ
        tasks["level_1"] = self._split_into_phases(root_task)
        
        # レベル2: 各フェーズをサブタスクに分解
        for phase in tasks["level_1"]:
            tasks["level_2"].extend(self._split_into_subtasks(phase))
        
        # レベル3: 各サブタスクをさらに細かく分解
        for subtask in tasks["level_2"]:
            tasks["level_3"].extend(self._split_into_atomic_tasks(subtask))
        
        return tasks
    
    def _split_into_phases(self, task):
        """主要なフェーズに分割"""
        return ["調査", "設計", "実装", "テスト", "デプロイ"]
    
    def _split_into_subtasks(self, phase):
        """フェーズをサブタスクに分解"""
        if phase == "設計":
            return ["アーキテクチャ設計", "データベース設計", "API設計"]
        elif phase == "実装":
            return ["フロントエンド実装", "バックエンド実装", "データベース構築"]
        return []
    
    def _split_into_atomic_tasks(self, subtask):
        """アトミック(分割不可)なタスクに分解"""
        if subtask == "API設計":
            return [
                "エンドポイント定義",
                "リクエスト形式設計",
                "レスポンス形式設計",
                "エラーハンドリング設計"
            ]
        return []

# 使用例
decomposer = TaskDecomposer()
hierarchy = decomposer.decompose_hierarchically("記事作成システムの構築")
print(f"Task hierarchy: {hierarchy}")

2. 依存関係の明確化

概念: タスク間の依存関係を明確にします。

# 依存関係の表現
from dataclasses import dataclass
from enum import Enum

class TaskStatus(Enum):
    PENDING = "pending"
    READY = "ready"
    IN_PROGRESS = "in_progress"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class Task:
    id: str
    name: str
    dependencies: list[str]  # 依存タスクのIDリスト
    status: TaskStatus
    result: any = None

class TaskGraph:
    def __init__(self):
        self.tasks = {}
    
    def add_task(self, task):
        self.tasks[task.id] = task
    
    def get_ready_tasks(self):
        """実行可能なタスクを取得"""
        ready_tasks = []
        
        for task_id, task in self.tasks.items():
            if task.status != TaskStatus.PENDING:
                continue
            
            # 依存タスクが全て完了しているか確認
            dependencies_completed = all(
                self.tasks[dep_id].status == TaskStatus.COMPLETED
                for dep_id in task.dependencies
                if dep_id in self.tasks
            )
            
            if dependencies_completed:
                ready_tasks.append(task)
                task.status = TaskStatus.READY
        
        return ready_tasks

# 使用例
graph = TaskGraph()

# タスク定義(依存関係付き)
task_a = Task(id="a", name="調査", dependencies=[], status=TaskStatus.PENDING)
task_b = Task(id="b", name="設計", dependencies=["a"], status=TaskStatus.PENDING)
task_c = Task(id="c", name="実装", dependencies=["b"], status=TaskStatus.PENDING)
task_d = Task(id="d", name="テスト", dependencies=["c"], status=TaskStatus.PENDING)

graph.add_task(task_a)
graph.add_task(task_b)
graph.add_task(task_c)
graph.add_task(task_d)

ready_tasks = graph.get_ready_tasks()
print(f"Ready tasks: {[t.name for t in ready_tasks]}")  # ["調査"]

エージェントチェーン

定義と概念

エージェントチェーン: 複数のエージェントが順番に実行されるパターン

エージェントチェーンの実装

# エージェントチェーン
class AgentChain:
    def __init__(self, agents):
        self.agents = agents
        self.chain = self._define_chain()
    
    def _define_chain(self):
        """チェーンを定義"""
        return [
            {"agent": "researcher", "next": "writer", "action": "research"},
            {"agent": "writer", "next": "editor", "action": "write_draft"},
            {"agent": "editor", "next": "publisher", "action": "review"},
            {"agent": "publisher", "next": "analyst", "action": "publish"},
            {"agent": "analyst", "next": None, "action": "analyze"}
        ]
    
    def execute_chain(self, initial_input):
        """チェーンを実行"""
        current_input = initial_input
        results = {}
        
        for step in self.chain:
            agent = self.agents[step["agent"]]
            action = step["action"]
            
            print(f"Executing: {step['agent']}.{action}")
            
            try:
                result = agent.execute(action, current_input)
                results[step["agent"]] = result
                
                # 次のエージェントに結果を渡す
                current_input = {
                    "previous_results": results,
                    "current_result": result
                }
                
                # 成功なら次へ、失敗なら中断
                if result.get("success", True) == False:
                    print(f"Agent {step['agent']} failed. Stopping chain.")
                    break
                    
            except Exception as e:
                print(f"Agent {step['agent']} errored: {e}")
                break
        
        return results

# 使用例
class Researcher:
    def execute(self, action, input_data):
        if action == "research":
            return {"success": True, "data": "調査結果"}

class Writer:
    def execute(self, action, input_data):
        if action == "write_draft":
            return {"success": True, "data": "ドラフト"}

# ... 他のエージェントも同様に実装

agents = {
    "researcher": Researcher(),
    "writer": Writer(),
    "editor": Editor(),
    "publisher": Publisher(),
    "analyst": Analyst()
}

chain = AgentChain(agents)
results = chain.execute_chain({"topic": "AIワークフロー"})
print(f"Chain execution results: {results}")

DAG(有向非巡回グラフ)

DAGとは

DAG: Directed Acyclic Graph(有向非巡回グラフ)

  • 有向: エッジに方向がある
  • 非巡回: 循環していない(自己ループがない)

メリット:

  • 依存関係を可視化できる
  • 並列実行を最適化できる
  • ボトルネックを特定できる

DAGの表現

# DAGの実装
from collections import defaultdict, deque

class DAG:
    def __init__(self):
        self.graph = defaultdict(list)  # {task: [dependencies]}
        self.reverse_graph = defaultdict(list)  # {task: [dependents]}
        self.in_degree = defaultdict(int)  # {task: number of dependencies}
    
    def add_task(self, task_id, dependencies=None):
        """タスクと依存関係を追加"""
        if dependencies is None:
            dependencies = []
        
        self.graph[task_id] = dependencies
        
        for dep in dependencies:
            self.graph[dep].append(task_id)
            self.in_degree[task_id] += 1
    
    def topological_sort(self):
        """トポロジカルソート(実行順序の決定)"""
        in_degree = self.in_degree.copy()
        queue = deque([task for task, degree in in_degree.items() if degree == 0])
        order = []
        
        while queue:
            task = queue.popleft()
            order.append(task)
            
            # このタスクに依存しているタスクの入次数を減らす
            for dependent in self.reverse_graph[task]:
                in_degree[dependent] -= 1
                
                if in_degree[dependent] == 0:
                    queue.append(dependent)
        
        return order
    
    def get_parallel_tasks(self, order, available_agents=3):
        """並列実行可能なタスクを取得"""
        current_level = []
        available_tasks = []
        processed = set()
        
        for task in order:
            # このタスクの依存が全て処理済みか確認
            dependencies_processed = all(
                dep in processed
                for dep in self.graph.get(task, [])
            )
            
            if dependencies_processed:
                if len(current_level) < available_agents:
                    current_level.append(task)
                    processed.add(task)
                else:
                    available_tasks.append(task)
            
            # 現在レベルのタスクを全て処理したら次のレベルへ
            if len(current_level) == available_agents and task not in processed:
                if dependencies_processed:
                    current_level = []
                    current_level.append(task)
                    processed.add(task)
        
        return current_level, available_tasks

# 使用例
dag = DAG()

# タスクと依存関係を追加
dag.add_task("a", [])  # 依存なし
dag.add_task("b", ["a"])  # aに依存
dag.add_task("c", ["a"])  # aに依存
dag.add_task("d", ["b", "c"])  # bとcに依存
dag.add_task("e", ["d"])  # dに依存

# 実行順序を計算
order = dag.topological_sort()
print(f"Execution order: {order}")

# 並列実行可能なタスクを取得
level_1, remaining = dag.get_parallel_tasks(order, available_agents=2)
print(f"Level 1 (parallel): {level_1}")
print(f"Remaining tasks: {remaining}")

DAGの可視化


動的ワークフロー

定義

動的ワークフロー: 実行時の状況に応じて、実行パスやエージェントの割り当てを動的に変化させるワークフローです。

動的ルーティング

# 動的ルーティング
class DynamicRouter:
    def __init__(self, agents):
        self.agents = agents
        self.agent_capacities = {
            "researcher": {"queue": [], "max_concurrent": 2},
            "writer": {"queue": [], "max_concurrent": 1},
            "editor": {"queue": [], "max_concurrent": 1}
        }
    
    def route_task(self, task):
        """タスクを適切なエージェントにルーティング"""
        task_type = task.get("type")
        
        # タスクタイプに応じたエージェントを選択
        if task_type == "research":
            return self._route_to_least_loaded("researcher", task)
        elif task_type == "writing":
            return self._route_to_least_loaded("writer", task)
        elif task_type == "editing":
            return self._route_to_least_loaded("editor", task)
        else:
            # デフォルト: Coordinaterにルーティング
            return self.agents["coordinator"]
    
    def _route_to_least_loaded(self, agent_type, task):
        """最も負荷が低いエージェントにルーティング"""
        capacity = self.agent_capacities[agent_type]
        
        if len(capacity["queue"]) < capacity["max_concurrent"]:
            capacity["queue"].append(task)
            return self.agents[agent_type]
        else:
            # 負荷が高い場合はCoordinaterにフォールバック
            print(f"Agent {agent_type} at capacity. Routing to coordinator")
            return self.agents["coordinator"]

# 使用例
router = DynamicRouter(agents)

tasks = [
    {"id": "1", "type": "research", "topic": "AIエージェント"},
    {"id": "2", "type": "research", "topic": "ワークフロー設計"},
    {"id": "3", "type": "writing", "topic": "記事作成"}
]

for task in tasks:
    agent = router.route_task(task)
    print(f"Task {task['id']} routed to {agent.__class__.__name__}")

条件分岐

# 条件分岐
class ConditionalWorkflow:
    def __init__(self):
        self.conditions = []
    
    def add_condition(self, condition, true_branch, false_branch=None):
        self.conditions.append({
            "condition": condition,
            "true_branch": true_branch,
            "false_branch": false_branch
        })
    
    def execute(self, context):
        """条件分岐を実行"""
        for cond in self.conditions:
            if cond["condition"](context):
                print(f"Condition met: {cond['condition'].__name__}")
                cond["true_branch"](context)
            elif cond["false_branch"]:
                print(f"Condition not met: {cond['condition'].__name__}")
                cond["false_branch"](context)

# 使用例
workflow = ConditionalWorkflow()

def is_api_available(context):
    """APIが利用可能か確認"""
    return context.get("api_status") == "available"

def is_cache_valid(context):
    """キャッシュが有効か確認"""
    cache_time = context.get("cache_timestamp", 0)
    return time.time() - cache_time < 3600  # 1時間以内

workflow.add_condition(
    condition=is_api_available,
    true_branch=lambda ctx: {"use": "api"},
    false_branch=lambda ctx: {"use": "cache"}
)

workflow.add_condition(
    condition=is_cache_valid,
    true_branch=lambda ctx: print("Using cached data"),
    false_branch=lambda ctx: print("Cache expired, fetching new data")
)

context = {"api_status": "available", "cache_timestamp": time.time() - 7200}
workflow.execute(context)

OpenClawでの実装

タスク管理システム

# OpenClawでのタスク管理
class TaskManager:
    def __init__(self):
        self.tasks = []
        self.dependencies = {}
    
    def create_task(self, name, dependencies=None):
        """タスクを作成"""
        task = {
            "id": f"task_{len(self.tasks)}",
            "name": name,
            "status": "pending",
            "dependencies": dependencies or [],
            "created_at": time.time()
        }
        self.tasks.append(task)
        return task
    
    def get_ready_tasks(self):
        """実行可能なタスクを取得"""
        ready_tasks = []
        
        for task in self.tasks:
            if task["status"] != "pending":
                continue
            
            # 依存タスクが全て完了しているか確認
            deps_completed = all(
                self._is_task_completed(dep_id)
                for dep_id in task["dependencies"]
            )
            
            if deps_completed:
                ready_tasks.append(task)
                task["status"] = "ready"
        
        return ready_tasks
    
    def _is_task_completed(self, task_id):
        """タスクが完了しているか確認"""
        for task in self.tasks:
            if task["id"] == task_id:
                return task["status"] == "completed"
        return False

# 使用例
manager = TaskManager()

# タスク定義(依存関係付き)
task1 = manager.create_task("調査", dependencies=[])
task2 = manager.create_task("設計", dependencies=[task1["id"]])
task3 = manager.create_task("実装", dependencies=[task2["id"]])
task4 = manager.create_task("テスト", dependencies=[task3["id"]])

ready_tasks = manager.get_ready_tasks()
print(f"Ready tasks: {[t['name'] for t in ready_tasks]}")

サブエージェントの起動

# サブエージェントの起動
import subprocess
import json

class SubAgentManager:
    def __init__(self):
        self.running_agents = {}
    
    def start_agent(self, agent_name, task):
        """サブエージェントを起動"""
        if agent_name in self.running_agents:
            print(f"Agent {agent_name} is already running")
            return
        
        print(f"Starting agent: {agent_name}")
        
        # サブエージェントを起動(擬似)
        process = subprocess.Popen(
            ["python", "-m", "openclaw", "agent", agent_name],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        
        self.running_agents[agent_name] = {
            "process": process,
            "task": task,
            "started_at": time.time(),
            "status": "running"
        }
        
        return process
    
    def check_agent_status(self, agent_name):
        """エージェントの状態を確認"""
        if agent_name not in self.running_agents:
            return None
        
        agent_info = self.running_agents[agent_name]
        process = agent_info["process"]
        
        if process.poll() is not None:
            # プロセスが終了
            agent_info["status"] = "completed"
            agent_info["ended_at"] = time.time()
            agent_info["exit_code"] = process.returncode
            return agent_info
        else:
            # プロセスが実行中
            elapsed = time.time() - agent_info["started_at"]
            agent_info["elapsed"] = elapsed
            return agent_info

# 使用例
manager = SubAgentManager()

# Researcherエージェントを起動
manager.start_agent("researcher", {"topic": "AIワークフロー"})

# 5秒後に状態確認
time.sleep(5)
status = manager.check_agent_status("researcher")
print(f"Researcher status: {status}")

# Writerエージェントを起動(Researcherの完了を待た後)
manager.start_agent("writer", {"research_result": "調査結果"})

トラブルシューティング

よくある問題と解決策

問題1: デッドロックの発生

症状:

  • 複数のタスクが互いに相手を待って進まない
  • システム全体が停止する

原因:

  • 循環依存(A→B→C→A)
  • リソースの競合

解決策:

# デッドロック検出
def detect_deadlock(dag):
    """循環依存を検出"""
    visited = set()
    rec_stack = set()
    
    def visit(task):
        if task in rec_stack:
            return True  # 循環を検出
        
        if task in visited:
            return False
        
        visited.add(task)
        rec_stack.add(task)
        
        for dependent in dag.reverse_graph[task]:
            if visit(dependent):
                return True
        
        rec_stack.remove(task)
        return False
    
    for task in dag.graph:
        if visit(task):
            raise ValueError(f"Deadlock detected involving task: {task}")
    
    return False  # デッドロックなし

# 使用例
dag = DAG()
dag.add_task("a", ["b"])  # aはbに依存
dag.add_task("b", ["c"])  # bはcに依存
dag.add_task("c", ["a"])  # cはaに依存

try:
    detect_deadlock(dag)
except ValueError as e:
    print(f"⚠️  Deadlock detected: {e}")

問題2: パフォーマンスの低下

症状:

  • タスクの実行速度が遅くなる
  • エージェントのアイドル時間が長くなる

原因:

  • タスク分解が不十分
  • 並列度が低い
  • ボトルネックがある

解決策:

# パフォーマンスの最適化
class PerformanceOptimizer:
    def __init__(self):
        self.task_history = []
    
    def analyze_bottlenecks(self):
        """ボトルネックを分析"""
        if len(self.task_history) < 10:
            return None
        
        # タスク実行時間を分析
        durations = [t["duration"] for t in self.task_history]
        
        # 平均以上の時間がかかるタスクを特定
        avg_duration = sum(durations) / len(durations)
        slow_tasks = [
            t for t in self.task_history
            if t["duration"] > avg_duration * 1.5
        ]
        
        return slow_tasks
    
    def suggest_parallelization(self, task):
        """並列化を提案"""
        if task.get("parallelizable", False):
            return {
                "suggestion": "このタスクは並列実行可能です",
                "potential_speedup": f"{task['subtasks']}x"
            }
        return None

# 使用例
optimizer = PerformanceOptimizer()

for i in range(20):
    optimizer.task_history.append({
        "name": f"task_{i}",
        "duration": 10 + (i % 10) * 30,  # 10秒〜290秒
        "parallelizable": i < 10  # 最初の10個は並列可能
    })

bottlenecks = optimizer.analyze_bottlenecks()
if bottlenecks:
    print(f"Bottlenecks found: {bottlenecks}")

まとめ

ワークフロー設計の重要性

  • 効率性: タスクの並列実行による高速化
  • 信頼性: 依存関係の明確化によるエラー削減
  • 拡張性: 新しいタスクやエージェントの追加が容易
  • 可視化: DAGやフローチャートによる全体像の把握

実装のポイント

  1. タスク分解: 階層的分解、依存関係の明確化
  2. エージェントチェーン: 結果を渡すシンプルなチェーン
  3. DAG: トポロジカルソートによる実行順序の決定
  4. 動的ルーティング: 負荷に応じた柔軟なタスク割り当て
  5. 監視: 進捗管理、パフォーマンス分析

次のステップ

  1. 現状分析: 現在のワークフローを分析
  2. 改善点の特定: ボトルネック、非効率な箇所
  3. 実装: 新しいワークフローの導入
  4. テスト: 小規模なタスクでの試行
  5. 本番導入: 全体的な適用

作成日: 2026-03-11
更新日: 2026-03-11
作者: @autoopslab


0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?