複雑なタスクを自動化するDAG・エージェントチェーン・動的ルーティングの完全実装ガイド
はじめに
AIエージェントに複雑なタスクを任せる際、単に「これをやって」と言うだけでは思うような成果は得られません。
**「どのように分解するか」「どの順序で実行するか」「どのように連携するか」**を設計する必要があります。
これがAIワークフロー設計です。
本記事では、タスク分解、エージェントチェーン、DAG(有向非巡回グラフ)など、AIエージェントの高度なワークフローを設計するための考え方と実装パターンを解説します。
目次
ワークフロー設計の基礎
ワークフローとは
定義: 複数のエージェントが協力して複雑なタスクを完了するための、タスクの依存関係、実行順序、データの流れを定義したものです。
目的:
- タスクの適切な分解
- 並列実行の最適化
- エージェント間の連携の明確化
- エラーハンドリングの統一
簡単なワークフローと複雑なワークフロー
タスク分解のパターン
1. 階層的分解
概念: 大きなタスクを段階的に細かく分解します。
# 階層的分解の例
class TaskDecomposer:
def decompose_hierarchically(self, root_task):
"""階層的分解"""
tasks = {
"root": root_task,
"level_1": [],
"level_2": [],
"level_3": []
}
# レベル1: 主要なフェーズ
tasks["level_1"] = self._split_into_phases(root_task)
# レベル2: 各フェーズをサブタスクに分解
for phase in tasks["level_1"]:
tasks["level_2"].extend(self._split_into_subtasks(phase))
# レベル3: 各サブタスクをさらに細かく分解
for subtask in tasks["level_2"]:
tasks["level_3"].extend(self._split_into_atomic_tasks(subtask))
return tasks
def _split_into_phases(self, task):
"""主要なフェーズに分割"""
return ["調査", "設計", "実装", "テスト", "デプロイ"]
def _split_into_subtasks(self, phase):
"""フェーズをサブタスクに分解"""
if phase == "設計":
return ["アーキテクチャ設計", "データベース設計", "API設計"]
elif phase == "実装":
return ["フロントエンド実装", "バックエンド実装", "データベース構築"]
return []
def _split_into_atomic_tasks(self, subtask):
"""アトミック(分割不可)なタスクに分解"""
if subtask == "API設計":
return [
"エンドポイント定義",
"リクエスト形式設計",
"レスポンス形式設計",
"エラーハンドリング設計"
]
return []
# 使用例
decomposer = TaskDecomposer()
hierarchy = decomposer.decompose_hierarchically("記事作成システムの構築")
print(f"Task hierarchy: {hierarchy}")
2. 依存関係の明確化
概念: タスク間の依存関係を明確にします。
# 依存関係の表現
from dataclasses import dataclass
from enum import Enum
class TaskStatus(Enum):
PENDING = "pending"
READY = "ready"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class Task:
id: str
name: str
dependencies: list[str] # 依存タスクのIDリスト
status: TaskStatus
result: any = None
class TaskGraph:
def __init__(self):
self.tasks = {}
def add_task(self, task):
self.tasks[task.id] = task
def get_ready_tasks(self):
"""実行可能なタスクを取得"""
ready_tasks = []
for task_id, task in self.tasks.items():
if task.status != TaskStatus.PENDING:
continue
# 依存タスクが全て完了しているか確認
dependencies_completed = all(
self.tasks[dep_id].status == TaskStatus.COMPLETED
for dep_id in task.dependencies
if dep_id in self.tasks
)
if dependencies_completed:
ready_tasks.append(task)
task.status = TaskStatus.READY
return ready_tasks
# 使用例
graph = TaskGraph()
# タスク定義(依存関係付き)
task_a = Task(id="a", name="調査", dependencies=[], status=TaskStatus.PENDING)
task_b = Task(id="b", name="設計", dependencies=["a"], status=TaskStatus.PENDING)
task_c = Task(id="c", name="実装", dependencies=["b"], status=TaskStatus.PENDING)
task_d = Task(id="d", name="テスト", dependencies=["c"], status=TaskStatus.PENDING)
graph.add_task(task_a)
graph.add_task(task_b)
graph.add_task(task_c)
graph.add_task(task_d)
ready_tasks = graph.get_ready_tasks()
print(f"Ready tasks: {[t.name for t in ready_tasks]}") # ["調査"]
エージェントチェーン
定義と概念
エージェントチェーン: 複数のエージェントが順番に実行されるパターン
エージェントチェーンの実装
# エージェントチェーン
class AgentChain:
def __init__(self, agents):
self.agents = agents
self.chain = self._define_chain()
def _define_chain(self):
"""チェーンを定義"""
return [
{"agent": "researcher", "next": "writer", "action": "research"},
{"agent": "writer", "next": "editor", "action": "write_draft"},
{"agent": "editor", "next": "publisher", "action": "review"},
{"agent": "publisher", "next": "analyst", "action": "publish"},
{"agent": "analyst", "next": None, "action": "analyze"}
]
def execute_chain(self, initial_input):
"""チェーンを実行"""
current_input = initial_input
results = {}
for step in self.chain:
agent = self.agents[step["agent"]]
action = step["action"]
print(f"Executing: {step['agent']}.{action}")
try:
result = agent.execute(action, current_input)
results[step["agent"]] = result
# 次のエージェントに結果を渡す
current_input = {
"previous_results": results,
"current_result": result
}
# 成功なら次へ、失敗なら中断
if result.get("success", True) == False:
print(f"Agent {step['agent']} failed. Stopping chain.")
break
except Exception as e:
print(f"Agent {step['agent']} errored: {e}")
break
return results
# 使用例
class Researcher:
def execute(self, action, input_data):
if action == "research":
return {"success": True, "data": "調査結果"}
class Writer:
def execute(self, action, input_data):
if action == "write_draft":
return {"success": True, "data": "ドラフト"}
# ... 他のエージェントも同様に実装
agents = {
"researcher": Researcher(),
"writer": Writer(),
"editor": Editor(),
"publisher": Publisher(),
"analyst": Analyst()
}
chain = AgentChain(agents)
results = chain.execute_chain({"topic": "AIワークフロー"})
print(f"Chain execution results: {results}")
DAG(有向非巡回グラフ)
DAGとは
DAG: Directed Acyclic Graph(有向非巡回グラフ)
- 有向: エッジに方向がある
- 非巡回: 循環していない(自己ループがない)
メリット:
- 依存関係を可視化できる
- 並列実行を最適化できる
- ボトルネックを特定できる
DAGの表現
# DAGの実装
from collections import defaultdict, deque
class DAG:
def __init__(self):
self.graph = defaultdict(list) # {task: [dependencies]}
self.reverse_graph = defaultdict(list) # {task: [dependents]}
self.in_degree = defaultdict(int) # {task: number of dependencies}
def add_task(self, task_id, dependencies=None):
"""タスクと依存関係を追加"""
if dependencies is None:
dependencies = []
self.graph[task_id] = dependencies
for dep in dependencies:
self.graph[dep].append(task_id)
self.in_degree[task_id] += 1
def topological_sort(self):
"""トポロジカルソート(実行順序の決定)"""
in_degree = self.in_degree.copy()
queue = deque([task for task, degree in in_degree.items() if degree == 0])
order = []
while queue:
task = queue.popleft()
order.append(task)
# このタスクに依存しているタスクの入次数を減らす
for dependent in self.reverse_graph[task]:
in_degree[dependent] -= 1
if in_degree[dependent] == 0:
queue.append(dependent)
return order
def get_parallel_tasks(self, order, available_agents=3):
"""並列実行可能なタスクを取得"""
current_level = []
available_tasks = []
processed = set()
for task in order:
# このタスクの依存が全て処理済みか確認
dependencies_processed = all(
dep in processed
for dep in self.graph.get(task, [])
)
if dependencies_processed:
if len(current_level) < available_agents:
current_level.append(task)
processed.add(task)
else:
available_tasks.append(task)
# 現在レベルのタスクを全て処理したら次のレベルへ
if len(current_level) == available_agents and task not in processed:
if dependencies_processed:
current_level = []
current_level.append(task)
processed.add(task)
return current_level, available_tasks
# 使用例
dag = DAG()
# タスクと依存関係を追加
dag.add_task("a", []) # 依存なし
dag.add_task("b", ["a"]) # aに依存
dag.add_task("c", ["a"]) # aに依存
dag.add_task("d", ["b", "c"]) # bとcに依存
dag.add_task("e", ["d"]) # dに依存
# 実行順序を計算
order = dag.topological_sort()
print(f"Execution order: {order}")
# 並列実行可能なタスクを取得
level_1, remaining = dag.get_parallel_tasks(order, available_agents=2)
print(f"Level 1 (parallel): {level_1}")
print(f"Remaining tasks: {remaining}")
DAGの可視化
動的ワークフロー
定義
動的ワークフロー: 実行時の状況に応じて、実行パスやエージェントの割り当てを動的に変化させるワークフローです。
動的ルーティング
# 動的ルーティング
class DynamicRouter:
def __init__(self, agents):
self.agents = agents
self.agent_capacities = {
"researcher": {"queue": [], "max_concurrent": 2},
"writer": {"queue": [], "max_concurrent": 1},
"editor": {"queue": [], "max_concurrent": 1}
}
def route_task(self, task):
"""タスクを適切なエージェントにルーティング"""
task_type = task.get("type")
# タスクタイプに応じたエージェントを選択
if task_type == "research":
return self._route_to_least_loaded("researcher", task)
elif task_type == "writing":
return self._route_to_least_loaded("writer", task)
elif task_type == "editing":
return self._route_to_least_loaded("editor", task)
else:
# デフォルト: Coordinaterにルーティング
return self.agents["coordinator"]
def _route_to_least_loaded(self, agent_type, task):
"""最も負荷が低いエージェントにルーティング"""
capacity = self.agent_capacities[agent_type]
if len(capacity["queue"]) < capacity["max_concurrent"]:
capacity["queue"].append(task)
return self.agents[agent_type]
else:
# 負荷が高い場合はCoordinaterにフォールバック
print(f"Agent {agent_type} at capacity. Routing to coordinator")
return self.agents["coordinator"]
# 使用例
router = DynamicRouter(agents)
tasks = [
{"id": "1", "type": "research", "topic": "AIエージェント"},
{"id": "2", "type": "research", "topic": "ワークフロー設計"},
{"id": "3", "type": "writing", "topic": "記事作成"}
]
for task in tasks:
agent = router.route_task(task)
print(f"Task {task['id']} routed to {agent.__class__.__name__}")
条件分岐
# 条件分岐
class ConditionalWorkflow:
def __init__(self):
self.conditions = []
def add_condition(self, condition, true_branch, false_branch=None):
self.conditions.append({
"condition": condition,
"true_branch": true_branch,
"false_branch": false_branch
})
def execute(self, context):
"""条件分岐を実行"""
for cond in self.conditions:
if cond["condition"](context):
print(f"Condition met: {cond['condition'].__name__}")
cond["true_branch"](context)
elif cond["false_branch"]:
print(f"Condition not met: {cond['condition'].__name__}")
cond["false_branch"](context)
# 使用例
workflow = ConditionalWorkflow()
def is_api_available(context):
"""APIが利用可能か確認"""
return context.get("api_status") == "available"
def is_cache_valid(context):
"""キャッシュが有効か確認"""
cache_time = context.get("cache_timestamp", 0)
return time.time() - cache_time < 3600 # 1時間以内
workflow.add_condition(
condition=is_api_available,
true_branch=lambda ctx: {"use": "api"},
false_branch=lambda ctx: {"use": "cache"}
)
workflow.add_condition(
condition=is_cache_valid,
true_branch=lambda ctx: print("Using cached data"),
false_branch=lambda ctx: print("Cache expired, fetching new data")
)
context = {"api_status": "available", "cache_timestamp": time.time() - 7200}
workflow.execute(context)
OpenClawでの実装
タスク管理システム
# OpenClawでのタスク管理
class TaskManager:
def __init__(self):
self.tasks = []
self.dependencies = {}
def create_task(self, name, dependencies=None):
"""タスクを作成"""
task = {
"id": f"task_{len(self.tasks)}",
"name": name,
"status": "pending",
"dependencies": dependencies or [],
"created_at": time.time()
}
self.tasks.append(task)
return task
def get_ready_tasks(self):
"""実行可能なタスクを取得"""
ready_tasks = []
for task in self.tasks:
if task["status"] != "pending":
continue
# 依存タスクが全て完了しているか確認
deps_completed = all(
self._is_task_completed(dep_id)
for dep_id in task["dependencies"]
)
if deps_completed:
ready_tasks.append(task)
task["status"] = "ready"
return ready_tasks
def _is_task_completed(self, task_id):
"""タスクが完了しているか確認"""
for task in self.tasks:
if task["id"] == task_id:
return task["status"] == "completed"
return False
# 使用例
manager = TaskManager()
# タスク定義(依存関係付き)
task1 = manager.create_task("調査", dependencies=[])
task2 = manager.create_task("設計", dependencies=[task1["id"]])
task3 = manager.create_task("実装", dependencies=[task2["id"]])
task4 = manager.create_task("テスト", dependencies=[task3["id"]])
ready_tasks = manager.get_ready_tasks()
print(f"Ready tasks: {[t['name'] for t in ready_tasks]}")
サブエージェントの起動
# サブエージェントの起動
import subprocess
import json
class SubAgentManager:
def __init__(self):
self.running_agents = {}
def start_agent(self, agent_name, task):
"""サブエージェントを起動"""
if agent_name in self.running_agents:
print(f"Agent {agent_name} is already running")
return
print(f"Starting agent: {agent_name}")
# サブエージェントを起動(擬似)
process = subprocess.Popen(
["python", "-m", "openclaw", "agent", agent_name],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
text=True
)
self.running_agents[agent_name] = {
"process": process,
"task": task,
"started_at": time.time(),
"status": "running"
}
return process
def check_agent_status(self, agent_name):
"""エージェントの状態を確認"""
if agent_name not in self.running_agents:
return None
agent_info = self.running_agents[agent_name]
process = agent_info["process"]
if process.poll() is not None:
# プロセスが終了
agent_info["status"] = "completed"
agent_info["ended_at"] = time.time()
agent_info["exit_code"] = process.returncode
return agent_info
else:
# プロセスが実行中
elapsed = time.time() - agent_info["started_at"]
agent_info["elapsed"] = elapsed
return agent_info
# 使用例
manager = SubAgentManager()
# Researcherエージェントを起動
manager.start_agent("researcher", {"topic": "AIワークフロー"})
# 5秒後に状態確認
time.sleep(5)
status = manager.check_agent_status("researcher")
print(f"Researcher status: {status}")
# Writerエージェントを起動(Researcherの完了を待た後)
manager.start_agent("writer", {"research_result": "調査結果"})
トラブルシューティング
よくある問題と解決策
問題1: デッドロックの発生
症状:
- 複数のタスクが互いに相手を待って進まない
- システム全体が停止する
原因:
- 循環依存(A→B→C→A)
- リソースの競合
解決策:
# デッドロック検出
def detect_deadlock(dag):
"""循環依存を検出"""
visited = set()
rec_stack = set()
def visit(task):
if task in rec_stack:
return True # 循環を検出
if task in visited:
return False
visited.add(task)
rec_stack.add(task)
for dependent in dag.reverse_graph[task]:
if visit(dependent):
return True
rec_stack.remove(task)
return False
for task in dag.graph:
if visit(task):
raise ValueError(f"Deadlock detected involving task: {task}")
return False # デッドロックなし
# 使用例
dag = DAG()
dag.add_task("a", ["b"]) # aはbに依存
dag.add_task("b", ["c"]) # bはcに依存
dag.add_task("c", ["a"]) # cはaに依存
try:
detect_deadlock(dag)
except ValueError as e:
print(f"⚠️ Deadlock detected: {e}")
問題2: パフォーマンスの低下
症状:
- タスクの実行速度が遅くなる
- エージェントのアイドル時間が長くなる
原因:
- タスク分解が不十分
- 並列度が低い
- ボトルネックがある
解決策:
# パフォーマンスの最適化
class PerformanceOptimizer:
def __init__(self):
self.task_history = []
def analyze_bottlenecks(self):
"""ボトルネックを分析"""
if len(self.task_history) < 10:
return None
# タスク実行時間を分析
durations = [t["duration"] for t in self.task_history]
# 平均以上の時間がかかるタスクを特定
avg_duration = sum(durations) / len(durations)
slow_tasks = [
t for t in self.task_history
if t["duration"] > avg_duration * 1.5
]
return slow_tasks
def suggest_parallelization(self, task):
"""並列化を提案"""
if task.get("parallelizable", False):
return {
"suggestion": "このタスクは並列実行可能です",
"potential_speedup": f"{task['subtasks']}x"
}
return None
# 使用例
optimizer = PerformanceOptimizer()
for i in range(20):
optimizer.task_history.append({
"name": f"task_{i}",
"duration": 10 + (i % 10) * 30, # 10秒〜290秒
"parallelizable": i < 10 # 最初の10個は並列可能
})
bottlenecks = optimizer.analyze_bottlenecks()
if bottlenecks:
print(f"Bottlenecks found: {bottlenecks}")
まとめ
ワークフロー設計の重要性
- 効率性: タスクの並列実行による高速化
- 信頼性: 依存関係の明確化によるエラー削減
- 拡張性: 新しいタスクやエージェントの追加が容易
- 可視化: DAGやフローチャートによる全体像の把握
実装のポイント
- タスク分解: 階層的分解、依存関係の明確化
- エージェントチェーン: 結果を渡すシンプルなチェーン
- DAG: トポロジカルソートによる実行順序の決定
- 動的ルーティング: 負荷に応じた柔軟なタスク割り当て
- 監視: 進捗管理、パフォーマンス分析
次のステップ
- 現状分析: 現在のワークフローを分析
- 改善点の特定: ボトルネック、非効率な箇所
- 実装: 新しいワークフローの導入
- テスト: 小規模なタスクでの試行
- 本番導入: 全体的な適用
作成日: 2026-03-11
更新日: 2026-03-11
作者: @autoopslab
