1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

並列実行 MoA(Mixture of Agents)完全解説 〜初心者でもわかる!クラス設計からコード解説まで〜

Posted at

はじめに

本記事では、MoA(ここでは複数のエージェントをオーケストレーションする仕組みを示しています)のサンプルコードを、初心者の方にもわかりやすいように解説していきます。

複数のエージェントが並行してタスクを実行し、最終的な結果を統合する流れは、よりスケーラブルなアプリケーション開発の可能性を広げるものです。
例えば、自然言語処理タスクやコード生成タスク、翻訳タスクなど、それぞれ得意分野を持ったエージェントが同時に走ることで、効率的に成果を得られます。

本記事では、そういった並列処理の流れを実現する仕組みを具体的なコード例とともに示し、初心者の方でもイメージしやすいようにステップバイステップで解説していきます。
OpenAIのAPIを使った要約・翻訳・コード生成など、さまざまなタスクを分割して処理させる方法を学ぶことで、今後のLLM活用やAIアプリケーション開発のヒントとして役立てていただければ幸いです。


全体の流れ

今回のコードは、大きく以下の流れで動作するようになっています。

  1. ユーザーからのリクエストを受け取る。
  2. Task Decomposer(LLMTaskDecomposer)がリクエストをタスクに分割する。
  3. 分割された各タスクを、そのタスクを処理できる**エージェント(Agent)**に割り当てる。
  4. エージェントがそれぞれのタスクを並列に処理し、結果を返す。
  5. Orchestratorが結果をまとめて、最終的な出力を返す。

ざっくりイメージ

以下は mermaid を用いたフロー図です。

  • Orchestratorはリクエストを受け取ったら、まずLLMTaskDecomposerで「どのタスクが必要か?」を判定させます。
  • 判定結果(JSON形式のタスクリスト)をもとに、該当するエージェントを探して並列で処理させます。
  • その後、全ての結果を統合してユーザーへ返却します。

コード全文

import openai
import logging
from typing import List, Any, Tuple
import concurrent.futures
import json

# ---------------------------------------
# 設定
# ---------------------------------------
# 実際に動かす場合はOpenAIのAPIキーを設定してください
openai.api_key = "YOUR_OPENAI_API_KEY"


# ---------------------------------------
# LLMを使用したTaskDecomposer
# ---------------------------------------
class LLMTaskDecomposer:
    """
    LLMを使ってユーザーのリクエストを解析し、
    利用可能なエージェント情報と照らし合わせてタスクを生成する。
    """
    def __init__(self, model_name: str = "gpt-3.5-turbo"):
        self.model_name = model_name
    
    def decompose(self, user_request: str, known_agent_types: List[str]) -> List[dict]:
        """
        - user_request: ユーザーが入力した指示・文章
        - known_agent_types: ["summarize", "code_generation", "translate"] などの対応可能エージェントのリスト
        戻り値: [{"type": str, "data": str}, ...] のリスト
        """
        prompt_text = f"""
あなたはユーザーのリクエストを見て、対応できるエージェントタイプを活用し、必要なタスクをJSON形式で出力するシステムです。
利用可能なエージェントタイプ: {", ".join(known_agent_types)} 
ユーザーのリクエスト: {user_request}

手順:
1. ユーザーのリクエストを分析し、実行すべきタスクを決める。
2. タスクはオブジェクト形式で、"type"キーにエージェントタイプ、"data"キーに対象の文字列を含む。
3. タスクが複数ある場合、配列にまとめる。
4. エージェントタイプが何も当てはまらない場合は、"summarize" タスクとして扱う。

出力例:
[
  {{ "type": "summarize", "data": "..." }},
  {{ "type": "code_generation", "data": "..." }}
]

出力をJSON形式だけで返してください。他の説明文や前置きは一切不要です。
"""
        try:
            # 実際にはChatCompletionを呼び出す想定
            # response = openai.ChatCompletion.create(
            #     model=self.model_name,
            #     messages=[
            #         {"role": "system", "content": "You are a helpful assistant."},
            #         {"role": "user", "content": prompt_text}
            #     ],
            #     temperature=0.0
            # )
            # llm_output = response["choices"][0]["message"]["content"]
            # tasks = json.loads(llm_output)

            # デモ用: 以前の文字列検索ロジック
            simple_tasks = []
            if "要約" in user_request:
                simple_tasks.append({"type": "summarize", "data": user_request})
            if "コード" in user_request or "プログラム" in user_request:
                simple_tasks.append({"type": "code_generation", "data": user_request})
            if "翻訳" in user_request:
                simple_tasks.append({"type": "translate", "data": user_request})

            if not simple_tasks:
                simple_tasks.append({"type": "summarize", "data": user_request})

            tasks = simple_tasks

            return tasks
        
        except Exception as e:
            logging.error(f"LLMタスク分割エラー: {e}")
            # エラー時はデフォルトでsummarize
            return [{"type": "summarize", "data": user_request}]


# ---------------------------------------
# エージェントの共通インターフェース
# ---------------------------------------
class BaseAgent:
    def name(self) -> str:
        """エージェントの名前を返す"""
        raise NotImplementedError()

    def can_handle(self, task_type: str) -> bool:
        """
        このエージェントが特定のタスクタイプを処理できるか。
        対応可能なら True, それ以外なら False を返す。
        """
        raise NotImplementedError()

    def handle(self, task_data: Any) -> str:
        """
        タスクを実行し、結果を文字列として返す。
        例外があれば適宜キャッチして呼び出し側に伝える。
        """
        raise NotImplementedError()


# ---------------------------------------
# 各エージェントの実装例
# ---------------------------------------
class SummarizerAgent(BaseAgent):
    def name(self) -> str:
        return "SummarizerAgent"
    
    def can_handle(self, task_type: str) -> bool:
        return task_type == "summarize"

    def handle(self, task_data: Any) -> str:
        text_to_summarize = task_data
        logging.info(f"[{self.name()}] 要約を実行中...")

        try:
            # 本来はOpenAI API等を呼び出して要約
            return "【要約】" + text_to_summarize
        except Exception as e:
            logging.error(f"[{self.name()}] 要約エラー: {e}")
            return "要約に失敗しました。"


class CodeGeneratorAgent(BaseAgent):
    def name(self) -> str:
        return "CodeGeneratorAgent"
    
    def can_handle(self, task_type: str) -> bool:
        return task_type == "code_generation"

    def handle(self, task_data: Any) -> str:
        prompt_for_code = task_data
        logging.info(f"[{self.name()}] コード生成を実行中...")

        try:
            # 本来はOpenAI API等を呼び出してコード生成
            return "# 生成したPythonコードのサンプル\nprint('Hello World')"
        except Exception as e:
            logging.error(f"[{self.name()}] コード生成エラー: {e}")
            return "# コード生成に失敗しました。"


class TranslatorAgent(BaseAgent):
    def name(self) -> str:
        return "TranslatorAgent"

    def can_handle(self, task_type: str) -> bool:
        return task_type == "translate"

    def handle(self, task_data: Any) -> str:
        text_to_translate = task_data
        logging.info(f"[{self.name()}] 翻訳を実行中...")

        try:
            # 本来はOpenAI API等を呼び出して翻訳
            return "Translated text of: " + text_to_translate
        except Exception as e:
            logging.error(f"[{self.name()}] 翻訳エラー: {e}")
            return "翻訳に失敗しました。"


# ---------------------------------------
# オーケストレーター
# ---------------------------------------
class Orchestrator:
    """
    タスクデコンポーザ(LLM)とエージェントを管理し、
    タスクを最適なエージェントに並列で振り分け、結果をまとめる。
    """
    def __init__(self, agents: List[BaseAgent], task_decomposer: LLMTaskDecomposer):
        self.agents = agents
        self.task_decomposer = task_decomposer

    def process_request(self, user_request: str) -> str:
        """
        ユーザーのリクエスト文字列をもとにタスクを分解し、
        適切なエージェントで並列処理して、最終的な結果をまとめて返す。
        """
        # 1. LLMでタスク分割
        agent_types = [agent.name().replace("Agent", "").lower() for agent in self.agents]
        # 例: ["summarizer", "codegenerator", "translator"] など
        # ただしコード上は "summarize", "code_generation", "translate" が本来のタスク名と対応するため注意
        # ここでは簡単に一致させるように工夫
        # より正確にはagentのcan_handle()メソッドから逆引きして紐づけるなどを検討
        # 今回はサンプルとして単純化

        # 実際には上の処理を工夫し、["summarize", "code_generation", "translate"] を作る
        # ここではハードコードの簡易マッピング
        known_agent_types = ["summarize", "code_generation", "translate"]

        decomposed_tasks = self.task_decomposer.decompose(user_request, known_agent_types)
        
        # 2. 並列実行によるタスク処理
        max_workers = max(1, len(decomposed_tasks))
        results: List[Tuple[str, str]] = []

        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures_and_task_types = []
            
            for task in decomposed_tasks:
                task_type = task["type"]
                task_data = task["data"]
                
                # 処理可能なエージェントを探す
                assigned_agent = None
                for agent in self.agents:
                    if agent.can_handle(task_type):
                        assigned_agent = agent
                        break

                if assigned_agent is not None:
                    future = executor.submit(assigned_agent.handle, task_data)
                    futures_and_task_types.append((task_type, future))
                else:
                    logging.warning(f"対応可能なエージェントが見つかりませんでした: {task_type}")
                    results.append((task_type, f"エージェントが見つかりませんでした (task_type={task_type})"))
            
            for task_type, future in futures_and_task_types:
                try:
                    result = future.result()  # 実行結果を取得
                    results.append((task_type, result))
                except Exception as e:
                    logging.error(f"タスク実行中にエラーが発生: {e}")
                    results.append((task_type, "エラーが発生しました。"))

        # 3. 結果統合
        final_answer = self.aggregate_results(results)
        return final_answer

    def aggregate_results(self, results: List[Tuple[str, str]]) -> str:
        """
        エージェントごとの結果をまとめて最終的な出力を作る。
        """
        aggregated_texts = []
        for task_type, result_text in results:
            aggregated_texts.append(f"[{task_type.upper()} RESULT]:\n{result_text}")
        return "\n\n".join(aggregated_texts)


# ---------------------------------------
# 実行例 (メイン)
# ---------------------------------------
def main():
    # エージェントを登録
    agents = [
        SummarizerAgent(),
        CodeGeneratorAgent(),
        TranslatorAgent()
    ]

    # LLMベースのタスクデコンポーザを用意
    llm_decomposer = LLMTaskDecomposer()

    # オーケストレーターを初期化
    orchestrator = Orchestrator(agents, llm_decomposer)

    # ユーザーからのリクエスト例
    user_request = "この文章を要約して、サンプルのPythonコードを書いて、英語に翻訳してください。"

    # 実行
    final_result = orchestrator.process_request(user_request)
    
    print("=== Final Result ===")
    print(final_result)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    main()

コード構成

コードは大きく以下のブロックに分かれています。

  1. 設定
  2. TaskDecomposer(LLMTaskDecomposer)
  3. エージェントの共通インターフェース (BaseAgent)
  4. 各エージェントの実装 (SummarizerAgent, CodeGeneratorAgent, TranslatorAgent)
  5. オーケストレーター (Orchestrator)
  6. 実行例 (main関数)

順番に見ていきましょう。

1. 設定

import openai
import logging
from typing import List, Any, Tuple
import concurrent.futures
import json

openai.api_key = "YOUR_OPENAI_API_KEY"
  • openai.api_key にAPIキーを設定すると、OpenAI API を実際に呼び出すコードに切り替えることができます。
  • ここではデモのため、サンプルの処理に置き換えています。

2. TaskDecomposer(LLMTaskDecomposer)

class LLMTaskDecomposer:
    def __init__(self, model_name: str = "gpt-3.5-turbo"):
        self.model_name = model_name
    
    def decompose(self, user_request: str, known_agent_types: List[str]) -> List[dict]:
        ...
  • ユーザーのリクエストを見て、「要約」「コード生成」「翻訳」などどんなタスクが必要かを判断するクラスです。
  • 実際には openai.ChatCompletion.create() を使ってLLMに投げる想定ですが、サンプルでは文字列検索ベースでタスクを判定しています。

仕組み

  1. user_request に「要約」などのキーワードが含まれていれば、対応するタスクをリストに追加。
  2. 何も当てはまらなければ、デフォルトで "summarize" を返す。

実行結果はリスト形式で、たとえば以下のように返ってきます。

[
  {"type": "summarize", "data": "..."},
  {"type": "code_generation", "data": "..."},
  {"type": "translate", "data": "..."},
]

3. エージェントの共通インターフェース (BaseAgent)

class BaseAgent:
    def name(self) -> str:
        raise NotImplementedError()

    def can_handle(self, task_type: str) -> bool:
        raise NotImplementedError()

    def handle(self, task_data: Any) -> str:
        raise NotImplementedError()
  • どのエージェントでも共通のインターフェースを備え、name(), can_handle(), handle() を実装するようにしています。
  • これによって、個別のエージェントがどんな処理をしていようが、共通の方法で扱えるようになります。

4. 各エージェントの実装

class SummarizerAgent(BaseAgent):
    def name(self) -> str:
        return "SummarizerAgent"
    
    def can_handle(self, task_type: str) -> bool:
        return task_type == "summarize"

    def handle(self, task_data: Any) -> str:
        ...
  • SummarizerAgent: "summarize" タスクを処理。
  • CodeGeneratorAgent: "code_generation" タスクを処理。
  • TranslatorAgent: "translate" タスクを処理。

実際は openai のAPIを呼び出して要約/コード生成/翻訳をする想定ですが、サンプルとして固定の文字列を返すようになっています。

5. オーケストレーター (Orchestrator)

class Orchestrator:
    def __init__(self, agents: List[BaseAgent], task_decomposer: LLMTaskDecomposer):
        self.agents = agents
        self.task_decomposer = task_decomposer

    def process_request(self, user_request: str) -> str:
        ...
  • 全体の司令塔のような役割を担うクラスです。
  • ユーザーのリクエストを受け取り、LLMTaskDecomposerでタスクに分解してから、適切なエージェントに振り分ける処理を行います。
  • 並列実行には ThreadPoolExecutor を使っているのがポイントで、複数のタスクを同時に処理できます。

process_request の流れ

  1. llm_decomposer.decompose(user_request, known_agent_types) でタスク分解。
  2. ThreadPoolExecutor を使って複数のタスクを同時に実行。
  3. 各タスクに対応するエージェントを探して handle() メソッドを呼び出し。
  4. 全ての結果を aggregate_results() でまとめて返却。
def aggregate_results(self, results: List[Tuple[str, str]]) -> str:
    aggregated_texts = []
    for task_type, result_text in results:
        aggregated_texts.append(f"[{task_type.upper()} RESULT]:\n{result_text}")
    return "\n\n".join(aggregated_texts)
  • 結果のまとめ方としては、単純に各タスクの結果を [task_type.upper() RESULT] という文字列でつないでいるだけです。
  • 実際のユースケースに応じて、JSONで統合したりテンプレートに埋め込んだりといった拡張が考えられます。

6. 実行例 (main関数)

def main():
    agents = [
        SummarizerAgent(),
        CodeGeneratorAgent(),
        TranslatorAgent()
    ]

    llm_decomposer = LLMTaskDecomposer()
    orchestrator = Orchestrator(agents, llm_decomposer)

    user_request = "この文章を要約して、サンプルのPythonコードを書いて、英語に翻訳してください。"
    final_result = orchestrator.process_request(user_request)

    print("=== Final Result ===")
    print(final_result)
  • SummarizerAgent, CodeGeneratorAgent, TranslatorAgent の3つのエージェントを登録して、Orchestrator に渡しています。
  • user_request には上記のような、要約・コード生成・翻訳が混ざった依頼を試しに投げてみます。
  • 実行すると、
    • SummarizerAgent での要約結果
    • CodeGeneratorAgent でのサンプルコード
    • TranslatorAgent での翻訳
      が表示されます。

まとめ

このMoA(複数エージェントの並列オーケストレーション)サンプルコードでは、以下の点がポイントです。

  • LLMTaskDecomposer でリクエストを多段階に分解し、複数のエージェントに処理を委ねる設計。
  • BaseAgent という共通インターフェースによって、エージェントごとの違いを吸収。
  • Orchestrator で各タスクを並列処理することにより、スケーラブルに対応。

実際の開発では、OpenAI の API などを本格的に使って、より高度な要約・コード生成・翻訳を行うことができます。
複数のエージェントを利用して、色々なタスクを同時に行いたいときに非常に便利なアーキテクチャとなるでしょう。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?