5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangChain Open Deep Research V2 徹底解説 ― 処理フローとコードから読み解くDeepResearchマルチエージェント

Posted at

目次

  1. はじめに
  2. アーキテクチャ全体像
  3. 処理詳細
  4. まとめ

1. はじめに

本記事では、LangChain チームが公開するDeepResearch実装 langchain-ai/open_deep_research (MIT License) の V2 実装 を、処理フローとコード断片を交えながら解説します。

open_deep_research は 2025 年初頭に初版が公開され、その後 2025 年 7 月のコミット でアーキテクチャが抜本的に再構成されました。本記事ではこのV2実装を取り扱います。

また本実装は Hugging Face の DeepResearch Bench Leaderboard で第 5 位を獲得しており、定量的に性能面が評価されています。(2025 年 12 月時点)

2. アーキテクチャ全体像

ノード役割概要

ノード 役割 サブグラフ
Clarify With User 質問明確化
Write Research Brief リサーチブリーフ生成
Supervisor & Supervisor Tools 戦略立案+委任 Supervisor Subgraph
Researcher & Researcher tools 個別トピック調査 Researcher Subgraph
Compress Research 調査結果圧縮 Researcher Subgraph
Final Report Generation 総合レポート生成

3. 処理詳細

本章では、各ノードの処理内容を、実際のモジュール open_deep_research / src / open_deep_research / deep_researcher.py をベースに解説します。

Clarify With User ノード

目的

ユーザーの質問を分析し、追加情報が必要かを判断

処理フロー

  1. 質問明確化オプションが無効ならスキップ
  2. 有効ならユーザーメッセージを分析
  3. 不明確な点があれば質問を生成
  4. 十分な情報があればリサーチブリーフ作成に進む

実装解説

async def clarify_with_user(state: AgentState, config: RunnableConfig) -> Command[Literal["write_research_brief", "__end__"]]:
    # 1. 設定チェック: 明確化が無効ならスキップ
    configurable = Configuration.from_runnable_config(config)
    if not configurable.allow_clarification:
        return Command(goto="write_research_brief")

...

    # 2. ユーザーメッセージを分析
    prompt_content = clarify_with_user_instructions.format(
        messages=get_buffer_string(messages),
        date=get_today_str()
    )
    response = await clarification_model.ainvoke([HumanMessage(content=prompt_content)])

    # 分岐処理
    if response.need_clarification:
        # 3. 明確化が必要 → 質問を返してユーザー入力待ち
        return Command(
            goto=END, 
            update={"messages": [AIMessage(content=response.question)]}
        )
    else:
        # 4. 明確化不要 → 次のステップへ
        return Command(
            goto="write_research_brief", 
            update={"messages": [AIMessage(content=response.verification)]}
        )

Write Research Brief ノード

目的

ユーザーメッセージから構造化されたリサーチブリーフを生成し、Supervisor の初期コンテキストを設定

処理フロー

  1. ユーザーメッセージを分析してリサーチブリーフを生成
  2. Supervisor 用のシステムプロンプトを作成
  3. Supervisor にブリーフを渡して調査フェーズを開始

実装解説

async def write_research_brief(state: AgentState, config: RunnableConfig) -> Command[Literal["research_supervisor"]]:

...

    # 1. ユーザーメッセージを分析してリサーチブリーフを生成
    prompt_content = transform_messages_into_research_topic_prompt.format(
        messages=get_buffer_string(state.get("messages", [])),
        date=get_today_str()
    )
    response = await research_model.ainvoke([HumanMessage(content=prompt_content)])

    # 2. Supervisor 用のシステムプロンプトを作成
    supervisor_system_prompt = lead_researcher_prompt.format(
        date=get_today_str(),
        max_concurrent_research_units=configurable.max_concurrent_research_units,
        max_researcher_iterations=configurable.max_researcher_iterations
    )

    # 3. Supervisor にブリーフを渡して調査フェーズを開始
    return Command(
        goto="research_supervisor",
        update={
            "research_brief": response.research_brief,
            "supervisor_messages": {
                "type": "override",
                "value": [
                    SystemMessage(content=supervisor_system_prompt),
                    HumanMessage(content=response.research_brief)
                ]
            }
        }
    )

Supervisor ノード + Supervisor Tools ノード

目的

全体戦略を立案し、複数の Researcher に調査を委任して結果を集約

処理フロー(Supervisor ノード)

  1. 利用可能なツールで Supervisor モデルを設定
  2. 現在の調査状況に基づいて次のアクションを決定
  3. イテレーション数を更新してツール実行へ遷移

実装解説(Supervisor ノード)

async def supervisor(state: SupervisorState, config: RunnableConfig) -> Command[Literal["supervisor_tools"]]:

...

    # 1. 利用可能なツールで Supervisor モデルを設定
    lead_researcher_tools = [ConductResearch, ResearchComplete, think_tool]

    research_model = (
        configurable_model
        .bind_tools(lead_researcher_tools)
        .with_retry(stop_after_attempt=configurable.max_structured_output_retries)
        .with_config(research_model_config)
    )

    # 2. 現在の調査状況に基づいて次のアクションを決定
    supervisor_messages = state.get("supervisor_messages", [])
    response = await research_model.ainvoke(supervisor_messages)

    # 3. イテレーション数を更新してツール実行へ遷移
    return Command(
        goto="supervisor_tools",
        update={
            "supervisor_messages": [response],
            "research_iterations": state.get("research_iterations", 0) + 1
        }
    )

処理フロー(Supervisor Tools ノード)

  1. 終了条件を評価して、必要に応じて調査を終了。終了条件は以下の通り
    • 最大イテレーション数超過
    • or ツールコールが存在しない
    • or ResearchComplete ツールコールが存在する
  2. 戦略的思考ツールの結果を記録
  3. 調査タスクを並列実行して結果を収集
  4. すべての結果を統合して Supervisor に返却

実装解説(Supervisor Tools ノード)

async def supervisor_tools(state: SupervisorState, config: RunnableConfig) -> Command[Literal["supervisor", "__end__"]]:

...

    # 1. 終了条件を評価して、必要に応じて調査を終了
    exceeded_allowed_iterations = research_iterations > configurable.max_researcher_iterations
    no_tool_calls = not most_recent_message.tool_calls
    research_complete_tool_call = any(
        tool_call["name"] == "ResearchComplete"
        for tool_call in most_recent_message.tool_calls
    )

    if exceeded_allowed_iterations or no_tool_calls or research_complete_tool_call:
        return Command(
            goto=END,
            update={
                "notes": get_notes_from_tool_calls(supervisor_messages),
                "research_brief": state.get("research_brief", "")
            }
        )

...

    # 2. 戦略的思考ツールの結果を記録
    for tool_call in think_tool_calls:
        reflection_content = tool_call["args"]["reflection"]
        all_tool_messages.append(ToolMessage(
            content=f"Reflection recorded: {reflection_content}",
            name="think_tool",
            tool_call_id=tool_call["id"]
        ))

...

    if conduct_research_calls:

        ...

            # 3. 調査タスクを並列実行して結果を収集
            research_tasks = [
                researcher_subgraph.ainvoke({
                    "researcher_messages": [
                        HumanMessage(content=tool_call["args"]["research_topic"])
                    ],
                    "research_topic": tool_call["args"]["research_topic"]
                }, config) 
                for tool_call in allowed_conduct_research_calls
            ]

            tool_results = await asyncio.gather(*research_tasks)

            for observation, tool_call in zip(tool_results, allowed_conduct_research_calls):
                all_tool_messages.append(ToolMessage(
                    content=observation.get("compressed_research", "Error synthesizing research report: Maximum retries exceeded"),
                    name=tool_call["name"],
                    tool_call_id=tool_call["id"]
                ))

...

    # 4. すべての結果を統合して Supervisor に返却
    update_payload["supervisor_messages"] = all_tool_messages
    return Command(
        goto="supervisor",
        update=update_payload
    )

Researcher ノード + Researcher Tools ノード

目的

割り当てられたトピックを、検索や外部ツールを用いて ReAct パターンで調査

処理フロー(Researcher ノード)

  1. 利用可能なツールを取得(検索ツール、思考ツール、MCP ツールなど)
  2. 取得したツールで Researcher モデルを設定
  3. システムプロンプトを含めて応答を生成
  4. イテレーション数を更新してツール実行へ遷移

実装解説(Researcher ノード)

async def researcher(state: ResearcherState, config: RunnableConfig) -> Command[Literal["researcher_tools"]]:

...

    # 1. 利用可能なツールを取得(検索ツール、思考ツール、MCP ツールなど)
    tools = await get_all_tools(config)
    if len(tools) == 0:
        raise ValueError(
            "No tools found to conduct research: Please configure either your "
            "search API or add MCP tools to your configuration."
        )

...

    # 2. 取得したツールで Researcher モデルを設定
    research_model = (
        configurable_model
        .bind_tools(tools)
        .with_retry(stop_after_attempt=configurable.max_structured_output_retries)
        .with_config(research_model_config)
    )

    # 3. システムプロンプトを含めて応答を生成
    messages = [SystemMessage(content=researcher_prompt)] + researcher_messages
    response = await research_model.ainvoke(messages)

    # 4. イテレーション数を更新してツール実行へ遷移
    return Command(
        goto="researcher_tools",
        update={
            "researcher_messages": [response],
            "tool_call_iterations": state.get("tool_call_iterations", 0) + 1
        }
    )

処理フロー(Researcher Tools ノード)

  1. 早期終了条件を確認して必要に応じて圧縮へ遷移。終了条件は以下の通り
    • ツールコールが存在しない
    • and ネイティブ検索ツールが呼び出されていない
  2. すべてのツールを並列実行して結果を収集
  3. 終了条件を評価して圧縮または継続を決定

実装解説(Researcher Tools ノード)

async def researcher_tools(state: ResearcherState, config: RunnableConfig) -> Command[Literal["researcher", "compress_research"]]:

...

    # 1. 早期終了条件を確認して必要に応じて圧縮へ遷移
    has_tool_calls = bool(most_recent_message.tool_calls)
    has_native_search = (
        openai_websearch_called(most_recent_message) or
        anthropic_websearch_called(most_recent_message)
    )

    if not has_tool_calls and not has_native_search:
        return Command(goto="compress_research")

...

    # 2. すべてのツールを並列実行して結果を収集
    tool_calls = most_recent_message.tool_calls
    tool_execution_tasks = [
        execute_tool_safely(tools_by_name[tool_call["name"]], tool_call["args"], config)
        for tool_call in tool_calls
    ]
    observations = await asyncio.gather(*tool_execution_tasks)

    tool_outputs = [
        ToolMessage(
            content=observation,
            name=tool_call["name"],
            tool_call_id=tool_call["id"]
        )
        for observation, tool_call in zip(observations, tool_calls)
    ]

    # 3. 終了条件を評価して圧縮または継続を決定
    exceeded_iterations = state.get("tool_call_iterations", 0) >= configurable.max_react_tool_calls
    research_complete_called = any(
        tool_call["name"] == "ResearchComplete"
        for tool_call in most_recent_message.tool_calls
    )

    if exceeded_iterations or research_complete_called:
        return Command(
            goto="compress_research",
            update={"researcher_messages": tool_outputs}
        )

    return Command(
        goto="researcher",
        update={"researcher_messages": tool_outputs}
    )

Compress Research ノード

目的

収集した調査結果を要約・整形し、レポート生成に適した形式に圧縮

処理フロー

  1. 圧縮指示を追加
  2. 圧縮を実行(トークン制限に対応したリトライ付き)
  3. 圧縮結果と圧縮前情報を返却

実装解説

async def compress_research(state: ResearcherState, config: RunnableConfig):

...

    # 1. 圧縮指示を追加
    researcher_messages = state.get("researcher_messages", [])

    researcher_messages.append(HumanMessage(content=compress_research_simple_human_message))

    # 2. 圧縮を実行(トークン制限に対応したリトライ付き)
    synthesis_attempts = 0
    max_attempts = 3

    while synthesis_attempts < max_attempts:
        try:
            compression_prompt = compress_research_system_prompt.format(date=get_today_str())
            messages = [SystemMessage(content=compression_prompt)] + researcher_messages

            response = await synthesizer_model.ainvoke(messages)

            raw_notes_content = "\n".join([
                str(message.content)
                for message in filter_messages(researcher_messages, include_types=["tool", "ai"])
            ])

            # 3. 圧縮結果と圧縮前情報を返却
            return {
                "compressed_research": str(response.content),
                "raw_notes": [raw_notes_content]
            }

        except Exception as e:
            synthesis_attempts += 1

            if is_token_limit_exceeded(e, configurable.research_model):
                researcher_messages = remove_up_to_last_ai_message(researcher_messages)
                continue

            continue

...

Final Report Generation ノード

目的

収集した全調査結果を統合し、包括的な最終レポートを生成

処理フロー

  1. 調査結果を取得して結合
  2. 全コンテキストを含むプロンプトでレポートを生成
  3. トークン制限時は段階的に切り詰めて再試行

実装解説

async def final_report_generation(state: AgentState, config: RunnableConfig):
    # 1. 調査結果を取得して結合
    notes = state.get("notes", [])
    cleared_state = {"notes": {"type": "override", "value": []}}
    findings = "\n".join(notes)

...

    while current_retry <= max_retries:
        try:
            # 2. 全コンテキストを含むプロンプトでレポートを生成
            final_report_prompt = final_report_generation_prompt.format(
                research_brief=state.get("research_brief", ""),
                messages=get_buffer_string(state.get("messages", [])),
                findings=findings,
                date=get_today_str()
            )

            final_report = await configurable_model.with_config(writer_model_config).ainvoke([
                HumanMessage(content=final_report_prompt)
            ])

            return {
                "final_report": final_report.content,
                "messages": [final_report],
                **cleared_state
            }

        except Exception as e:
            # 3. トークン制限時は段階的に切り詰めて再試行
            if is_token_limit_exceeded(e, configurable.final_report_model):
                current_retry += 1

                if current_retry == 1:
                    model_token_limit = get_model_token_limit(configurable.final_report_model)
                    if not model_token_limit:
                        return {
                            "final_report": f"Error generating final report: Token limit exceeded, however, we could not determine the model's maximum context length. Please update the model map in deep_researcher/utils.py with this information. {e}",
                            "messages": [AIMessage(content="Report generation failed due to token limits")],
                            **cleared_state
                        }
                    findings_token_limit = model_token_limit * 4
                else:
                    findings_token_limit = int(findings_token_limit * 0.9)

                findings = findings[:findings_token_limit]
                continue
            else:
                return {
                    "final_report": f"Error generating final report: {e}",
                    "messages": [AIMessage(content="Report generation failed due to an error")],
                    **cleared_state
                }

    return {
        "final_report": "Error generating final report: Maximum retries exceeded",
        "messages": [AIMessage(content="Report generation failed after maximum retries")],
        **cleared_state
    }

サブグラフ & メイングラフ構築概要

Supervisor Subgraph構築:

# Supervisor の状態管理と戦略立案・委任を担うサブグラフ
supervisor_builder = StateGraph(SupervisorState, config_schema=Configuration)

# ノード追加
supervisor_builder.add_node("supervisor", supervisor)                  # 戦略立案+委任
supervisor_builder.add_node("supervisor_tools", supervisor_tools)      # ツール実行

# エッジ定義(ループ構造)
supervisor_builder.add_edge(START, "supervisor")
# supervisor → supervisor_tools → supervisor のループは Command で制御

# コンパイル
supervisor_subgraph = supervisor_builder.compile()

Researcher Subgraph構築:

# 個別トピックの調査を担うサブグラフ
researcher_builder = StateGraph(
    ResearcherState,
    output=ResearcherOutputState,  # 出力: compressed_research, raw_notes
    config_schema=Configuration
)

# ノード追加
researcher_builder.add_node("researcher", researcher)                  # 個別トピック調査
researcher_builder.add_node("researcher_tools", researcher_tools)      # ツール実行
researcher_builder.add_node("compress_research", compress_research)    # 調査結果圧縮

# エッジ定義
researcher_builder.add_edge(START, "researcher")
researcher_builder.add_edge("compress_research", END)  # 圧縮後に終了
# researcher → researcher_tools → researcher のループは Command で制御

# コンパイル
researcher_subgraph = researcher_builder.compile()

メイングラフ構築:

# 全体のワークフローを統括するメイングラフ
deep_researcher_builder = StateGraph(
    AgentState,
    input=AgentInputState,
    config_schema=Configuration
)

# ノード追加
deep_researcher_builder.add_node("clarify_with_user", clarify_with_user)
deep_researcher_builder.add_node("write_research_brief", write_research_brief)
deep_researcher_builder.add_node("research_supervisor", supervisor_subgraph)
deep_researcher_builder.add_node("final_report_generation", final_report_generation)

# エッジ定義
deep_researcher_builder.add_edge(START, "clarify_with_user")
deep_researcher_builder.add_edge("research_supervisor", "final_report_generation")
deep_researcher_builder.add_edge("final_report_generation", END)
# clarify_with_user → write_research_brief は Command で制御

# コンパイル
deep_researcher = deep_researcher_builder.compile()

4. まとめ

本記事では、LangChain チームが公開する open_deep_research の V2 実装を、アーキテクチャから実装詳細まで掘り下げて解説しました。

全体像の振り返り

V2 実装は Clarify → Brief → Supervisor → Researcher(Parallel) → Compression → Final Report という直列処理とサブグラフを組み合わせたマルチエージェントとして設計されています。この構造により、ユーザーの曖昧な質問を明確化し、複数の調査トピックに分解し、並列に深掘り調査を実施し、最終的に統合されたレポートを生成するという一連の流れを実現しています。

アーキテクチャの特徴

本実装の中核は Supervisor - Researcher の階層構造 にあるでしょう。

  • Supervisor は全体戦略を立案し、複数の調査トピックに分解して Researcher に委任します。think_tool による戦略的思考の記録により、なぜそのトピックを調査するのかという意思決定プロセスが明確化されます。
  • Researcher は各トピックを ReAct パターン(Reasoning + Acting)で深掘りします。検索と思考を反復しながら、十分な情報が集まるまで調査を継続します。

この設計は以下のような観点で調査の精度向上・速度向上に寄与していると考えられます。

  • 専門的な深掘り: 各 Researcher が単一トピックに集中することで、表面的な調査にならず深い洞察を得られる
  • 多角的な視点: 複数の独立した調査により、偏りのない多角的な情報収集が可能になる
  • 反復的な品質向上: ReAct パターンで検索結果を評価しながら追加検索を行うため、情報の不足や矛盾を発見して補完できる
  • 戦略的なカバレッジ: Supervisor が全体を俯瞰して調査方針を決定するため、重要なトピックの見落としを防げる
  • 階層化による並列実行: 階層化により、asyncio.gather を用いた並列実行が可能になり、複数トピックの同時調査によって調査時間を大幅に短縮できる

さいごに

本実装は、LangGraph を用いたマルチエージェントシステムの設計において、階層化並列化をどのように実現するかの好例と言えるでしょう。実用的な調査エージェントの構築を検討している方にとって、有益な参考実装となるはずです。


参考:

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?