はじめに
本記事では、MoA(ここでは複数のエージェントをオーケストレーションする仕組みを示しています)のサンプルコードを、初心者の方にもわかりやすいように解説していきます。
複数のエージェントが並行してタスクを実行し、最終的な結果を統合する流れは、よりスケーラブルなアプリケーション開発の可能性を広げるものです。
例えば、自然言語処理タスクやコード生成タスク、翻訳タスクなど、それぞれ得意分野を持ったエージェントが同時に走ることで、効率的に成果を得られます。
本記事では、そういった並列処理の流れを実現する仕組みを具体的なコード例とともに示し、初心者の方でもイメージしやすいようにステップバイステップで解説していきます。
OpenAIのAPIを使った要約・翻訳・コード生成など、さまざまなタスクを分割して処理させる方法を学ぶことで、今後のLLM活用やAIアプリケーション開発のヒントとして役立てていただければ幸いです。
全体の流れ
今回のコードは、大きく以下の流れで動作するようになっています。
- ユーザーからのリクエストを受け取る。
- Task Decomposer(LLMTaskDecomposer)がリクエストをタスクに分割する。
- 分割された各タスクを、そのタスクを処理できる**エージェント(Agent)**に割り当てる。
- エージェントがそれぞれのタスクを並列に処理し、結果を返す。
- Orchestratorが結果をまとめて、最終的な出力を返す。
ざっくりイメージ
以下は mermaid を用いたフロー図です。
- Orchestratorはリクエストを受け取ったら、まずLLMTaskDecomposerで「どのタスクが必要か?」を判定させます。
- 判定結果(JSON形式のタスクリスト)をもとに、該当するエージェントを探して並列で処理させます。
- その後、全ての結果を統合してユーザーへ返却します。
コード全文
import openai
import logging
from typing import List, Any, Tuple
import concurrent.futures
import json
# ---------------------------------------
# 設定
# ---------------------------------------
# 実際に動かす場合はOpenAIのAPIキーを設定してください
openai.api_key = "YOUR_OPENAI_API_KEY"
# ---------------------------------------
# LLMを使用したTaskDecomposer
# ---------------------------------------
class LLMTaskDecomposer:
"""
LLMを使ってユーザーのリクエストを解析し、
利用可能なエージェント情報と照らし合わせてタスクを生成する。
"""
def __init__(self, model_name: str = "gpt-3.5-turbo"):
self.model_name = model_name
def decompose(self, user_request: str, known_agent_types: List[str]) -> List[dict]:
"""
- user_request: ユーザーが入力した指示・文章
- known_agent_types: ["summarize", "code_generation", "translate"] などの対応可能エージェントのリスト
戻り値: [{"type": str, "data": str}, ...] のリスト
"""
prompt_text = f"""
あなたはユーザーのリクエストを見て、対応できるエージェントタイプを活用し、必要なタスクをJSON形式で出力するシステムです。
利用可能なエージェントタイプ: {", ".join(known_agent_types)}
ユーザーのリクエスト: {user_request}
手順:
1. ユーザーのリクエストを分析し、実行すべきタスクを決める。
2. タスクはオブジェクト形式で、"type"キーにエージェントタイプ、"data"キーに対象の文字列を含む。
3. タスクが複数ある場合、配列にまとめる。
4. エージェントタイプが何も当てはまらない場合は、"summarize" タスクとして扱う。
出力例:
[
{{ "type": "summarize", "data": "..." }},
{{ "type": "code_generation", "data": "..." }}
]
出力をJSON形式だけで返してください。他の説明文や前置きは一切不要です。
"""
try:
# 実際にはChatCompletionを呼び出す想定
# response = openai.ChatCompletion.create(
# model=self.model_name,
# messages=[
# {"role": "system", "content": "You are a helpful assistant."},
# {"role": "user", "content": prompt_text}
# ],
# temperature=0.0
# )
# llm_output = response["choices"][0]["message"]["content"]
# tasks = json.loads(llm_output)
# デモ用: 以前の文字列検索ロジック
simple_tasks = []
if "要約" in user_request:
simple_tasks.append({"type": "summarize", "data": user_request})
if "コード" in user_request or "プログラム" in user_request:
simple_tasks.append({"type": "code_generation", "data": user_request})
if "翻訳" in user_request:
simple_tasks.append({"type": "translate", "data": user_request})
if not simple_tasks:
simple_tasks.append({"type": "summarize", "data": user_request})
tasks = simple_tasks
return tasks
except Exception as e:
logging.error(f"LLMタスク分割エラー: {e}")
# エラー時はデフォルトでsummarize
return [{"type": "summarize", "data": user_request}]
# ---------------------------------------
# エージェントの共通インターフェース
# ---------------------------------------
class BaseAgent:
def name(self) -> str:
"""エージェントの名前を返す"""
raise NotImplementedError()
def can_handle(self, task_type: str) -> bool:
"""
このエージェントが特定のタスクタイプを処理できるか。
対応可能なら True, それ以外なら False を返す。
"""
raise NotImplementedError()
def handle(self, task_data: Any) -> str:
"""
タスクを実行し、結果を文字列として返す。
例外があれば適宜キャッチして呼び出し側に伝える。
"""
raise NotImplementedError()
# ---------------------------------------
# 各エージェントの実装例
# ---------------------------------------
class SummarizerAgent(BaseAgent):
def name(self) -> str:
return "SummarizerAgent"
def can_handle(self, task_type: str) -> bool:
return task_type == "summarize"
def handle(self, task_data: Any) -> str:
text_to_summarize = task_data
logging.info(f"[{self.name()}] 要約を実行中...")
try:
# 本来はOpenAI API等を呼び出して要約
return "【要約】" + text_to_summarize
except Exception as e:
logging.error(f"[{self.name()}] 要約エラー: {e}")
return "要約に失敗しました。"
class CodeGeneratorAgent(BaseAgent):
def name(self) -> str:
return "CodeGeneratorAgent"
def can_handle(self, task_type: str) -> bool:
return task_type == "code_generation"
def handle(self, task_data: Any) -> str:
prompt_for_code = task_data
logging.info(f"[{self.name()}] コード生成を実行中...")
try:
# 本来はOpenAI API等を呼び出してコード生成
return "# 生成したPythonコードのサンプル\nprint('Hello World')"
except Exception as e:
logging.error(f"[{self.name()}] コード生成エラー: {e}")
return "# コード生成に失敗しました。"
class TranslatorAgent(BaseAgent):
def name(self) -> str:
return "TranslatorAgent"
def can_handle(self, task_type: str) -> bool:
return task_type == "translate"
def handle(self, task_data: Any) -> str:
text_to_translate = task_data
logging.info(f"[{self.name()}] 翻訳を実行中...")
try:
# 本来はOpenAI API等を呼び出して翻訳
return "Translated text of: " + text_to_translate
except Exception as e:
logging.error(f"[{self.name()}] 翻訳エラー: {e}")
return "翻訳に失敗しました。"
# ---------------------------------------
# オーケストレーター
# ---------------------------------------
class Orchestrator:
"""
タスクデコンポーザ(LLM)とエージェントを管理し、
タスクを最適なエージェントに並列で振り分け、結果をまとめる。
"""
def __init__(self, agents: List[BaseAgent], task_decomposer: LLMTaskDecomposer):
self.agents = agents
self.task_decomposer = task_decomposer
def process_request(self, user_request: str) -> str:
"""
ユーザーのリクエスト文字列をもとにタスクを分解し、
適切なエージェントで並列処理して、最終的な結果をまとめて返す。
"""
# 1. LLMでタスク分割
agent_types = [agent.name().replace("Agent", "").lower() for agent in self.agents]
# 例: ["summarizer", "codegenerator", "translator"] など
# ただしコード上は "summarize", "code_generation", "translate" が本来のタスク名と対応するため注意
# ここでは簡単に一致させるように工夫
# より正確にはagentのcan_handle()メソッドから逆引きして紐づけるなどを検討
# 今回はサンプルとして単純化
# 実際には上の処理を工夫し、["summarize", "code_generation", "translate"] を作る
# ここではハードコードの簡易マッピング
known_agent_types = ["summarize", "code_generation", "translate"]
decomposed_tasks = self.task_decomposer.decompose(user_request, known_agent_types)
# 2. 並列実行によるタスク処理
max_workers = max(1, len(decomposed_tasks))
results: List[Tuple[str, str]] = []
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
futures_and_task_types = []
for task in decomposed_tasks:
task_type = task["type"]
task_data = task["data"]
# 処理可能なエージェントを探す
assigned_agent = None
for agent in self.agents:
if agent.can_handle(task_type):
assigned_agent = agent
break
if assigned_agent is not None:
future = executor.submit(assigned_agent.handle, task_data)
futures_and_task_types.append((task_type, future))
else:
logging.warning(f"対応可能なエージェントが見つかりませんでした: {task_type}")
results.append((task_type, f"エージェントが見つかりませんでした (task_type={task_type})"))
for task_type, future in futures_and_task_types:
try:
result = future.result() # 実行結果を取得
results.append((task_type, result))
except Exception as e:
logging.error(f"タスク実行中にエラーが発生: {e}")
results.append((task_type, "エラーが発生しました。"))
# 3. 結果統合
final_answer = self.aggregate_results(results)
return final_answer
def aggregate_results(self, results: List[Tuple[str, str]]) -> str:
"""
エージェントごとの結果をまとめて最終的な出力を作る。
"""
aggregated_texts = []
for task_type, result_text in results:
aggregated_texts.append(f"[{task_type.upper()} RESULT]:\n{result_text}")
return "\n\n".join(aggregated_texts)
# ---------------------------------------
# 実行例 (メイン)
# ---------------------------------------
def main():
# エージェントを登録
agents = [
SummarizerAgent(),
CodeGeneratorAgent(),
TranslatorAgent()
]
# LLMベースのタスクデコンポーザを用意
llm_decomposer = LLMTaskDecomposer()
# オーケストレーターを初期化
orchestrator = Orchestrator(agents, llm_decomposer)
# ユーザーからのリクエスト例
user_request = "この文章を要約して、サンプルのPythonコードを書いて、英語に翻訳してください。"
# 実行
final_result = orchestrator.process_request(user_request)
print("=== Final Result ===")
print(final_result)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
main()
コード構成
コードは大きく以下のブロックに分かれています。
- 設定
- TaskDecomposer(LLMTaskDecomposer)
- エージェントの共通インターフェース (BaseAgent)
- 各エージェントの実装 (SummarizerAgent, CodeGeneratorAgent, TranslatorAgent)
- オーケストレーター (Orchestrator)
- 実行例 (main関数)
順番に見ていきましょう。
1. 設定
import openai
import logging
from typing import List, Any, Tuple
import concurrent.futures
import json
openai.api_key = "YOUR_OPENAI_API_KEY"
-
openai.api_key
にAPIキーを設定すると、OpenAI API を実際に呼び出すコードに切り替えることができます。 - ここではデモのため、サンプルの処理に置き換えています。
2. TaskDecomposer(LLMTaskDecomposer)
class LLMTaskDecomposer:
def __init__(self, model_name: str = "gpt-3.5-turbo"):
self.model_name = model_name
def decompose(self, user_request: str, known_agent_types: List[str]) -> List[dict]:
...
- ユーザーのリクエストを見て、「要約」「コード生成」「翻訳」などどんなタスクが必要かを判断するクラスです。
- 実際には
openai.ChatCompletion.create()
を使ってLLMに投げる想定ですが、サンプルでは文字列検索ベースでタスクを判定しています。
仕組み
-
user_request
に「要約」などのキーワードが含まれていれば、対応するタスクをリストに追加。 - 何も当てはまらなければ、デフォルトで
"summarize"
を返す。
実行結果はリスト形式で、たとえば以下のように返ってきます。
[
{"type": "summarize", "data": "..."},
{"type": "code_generation", "data": "..."},
{"type": "translate", "data": "..."},
]
3. エージェントの共通インターフェース (BaseAgent)
class BaseAgent:
def name(self) -> str:
raise NotImplementedError()
def can_handle(self, task_type: str) -> bool:
raise NotImplementedError()
def handle(self, task_data: Any) -> str:
raise NotImplementedError()
- どのエージェントでも共通のインターフェースを備え、
name()
,can_handle()
,handle()
を実装するようにしています。 - これによって、個別のエージェントがどんな処理をしていようが、共通の方法で扱えるようになります。
4. 各エージェントの実装
class SummarizerAgent(BaseAgent):
def name(self) -> str:
return "SummarizerAgent"
def can_handle(self, task_type: str) -> bool:
return task_type == "summarize"
def handle(self, task_data: Any) -> str:
...
-
SummarizerAgent:
"summarize"
タスクを処理。 -
CodeGeneratorAgent:
"code_generation"
タスクを処理。 -
TranslatorAgent:
"translate"
タスクを処理。
実際は openai
のAPIを呼び出して要約/コード生成/翻訳をする想定ですが、サンプルとして固定の文字列を返すようになっています。
5. オーケストレーター (Orchestrator)
class Orchestrator:
def __init__(self, agents: List[BaseAgent], task_decomposer: LLMTaskDecomposer):
self.agents = agents
self.task_decomposer = task_decomposer
def process_request(self, user_request: str) -> str:
...
- 全体の司令塔のような役割を担うクラスです。
- ユーザーのリクエストを受け取り、LLMTaskDecomposerでタスクに分解してから、適切なエージェントに振り分ける処理を行います。
- 並列実行には
ThreadPoolExecutor
を使っているのがポイントで、複数のタスクを同時に処理できます。
process_request
の流れ
-
llm_decomposer.decompose(user_request, known_agent_types)
でタスク分解。 -
ThreadPoolExecutor
を使って複数のタスクを同時に実行。 - 各タスクに対応するエージェントを探して
handle()
メソッドを呼び出し。 - 全ての結果を
aggregate_results()
でまとめて返却。
def aggregate_results(self, results: List[Tuple[str, str]]) -> str:
aggregated_texts = []
for task_type, result_text in results:
aggregated_texts.append(f"[{task_type.upper()} RESULT]:\n{result_text}")
return "\n\n".join(aggregated_texts)
- 結果のまとめ方としては、単純に各タスクの結果を
[task_type.upper() RESULT]
という文字列でつないでいるだけです。 - 実際のユースケースに応じて、JSONで統合したりテンプレートに埋め込んだりといった拡張が考えられます。
6. 実行例 (main関数)
def main():
agents = [
SummarizerAgent(),
CodeGeneratorAgent(),
TranslatorAgent()
]
llm_decomposer = LLMTaskDecomposer()
orchestrator = Orchestrator(agents, llm_decomposer)
user_request = "この文章を要約して、サンプルのPythonコードを書いて、英語に翻訳してください。"
final_result = orchestrator.process_request(user_request)
print("=== Final Result ===")
print(final_result)
-
SummarizerAgent
,CodeGeneratorAgent
,TranslatorAgent
の3つのエージェントを登録して、Orchestrator
に渡しています。 -
user_request
には上記のような、要約・コード生成・翻訳が混ざった依頼を試しに投げてみます。 - 実行すると、
- SummarizerAgent での要約結果
- CodeGeneratorAgent でのサンプルコード
- TranslatorAgent での翻訳
が表示されます。
まとめ
このMoA(複数エージェントの並列オーケストレーション)サンプルコードでは、以下の点がポイントです。
- LLMTaskDecomposer でリクエストを多段階に分解し、複数のエージェントに処理を委ねる設計。
- BaseAgent という共通インターフェースによって、エージェントごとの違いを吸収。
- Orchestrator で各タスクを並列処理することにより、スケーラブルに対応。
実際の開発では、OpenAI の API などを本格的に使って、より高度な要約・コード生成・翻訳を行うことができます。
複数のエージェントを利用して、色々なタスクを同時に行いたいときに非常に便利なアーキテクチャとなるでしょう。