0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangGraphで構築!マルチエージェントRAGの協調ワークフロー実装ハンズオン

0
Posted at

LangGraphで構築!マルチエージェントRAGの協調ワークフロー実装ハンズオン

LLMアプリケーションの精度と複雑性を向上させるため、LangGraphを用いて複数のエージェントを協調させ、より高度なRAGシステムを実装する具体的な手順とコード例を提供します。本記事では、単一エージェントでは困難だった複雑な問い合わせへの対応や、役割分担による効率的な情報処理を実現するマルチエージェントRAGの構築に焦点を当てます。

LangGraphとは

LangGraphは、LangChainを基盤としたエージェントオーケストレーションフレームワークです。長時間実行されるステートフルなエージェントを構築、管理、デプロイするために設計されています。耐久性のある実行、ストリーミング、Human-in-the-loop、永続性、メモリといったインフラストラクチャを提供し、複雑なエージェントワークフローの定義を可能にします。LangChainエージェントもLangGraphの上に構築されており、これらの機能を利用しています。

LangGraphのコアコンセプトは、エージェントのワークフローを「State(状態)」「Nodes(ノード)」「Edges(エッジ)」の3つの主要な概念で構成されるグラフとしてモデル化することです。各ノードはエージェントまたはロジックの一部であり、エッジは次に実行するものを決定し、状態はノード間を流れ、共有メモリとして機能します。(出典: LangGraph公式ドキュメント "What is LangGraph?", "Core Concepts")

前提と環境

本記事の実装はPython 3.9以上を想定しています。
以下のライブラリをインストールしてください。

pip install langgraph langchain_core

langchain_coreはLangGraphが依存するLangChainのコアライブラリであり、メッセージ型など共通のコンポーネントを提供します。

LangGraphの基本的な使い方

LangGraphでグラフを構築する基本的な流れは以下の通りです。

  1. 状態の定義: TypedDictを使用して、グラフ全体で共有される状態を定義します。これはノード間で受け渡されるデータ構造です。
  2. ノード関数の定義: 各ステップで実行されるロジックを関数として定義します。この関数は現在の状態を受け取り、更新された状態を返します。
  3. グラフの構築: StateGraphインスタンスを作成し、ノードとエッジを追加します。
    • add_node(): ノード関数をグラフに登録します。
    • set_entry_point(): グラフの開始ノードを指定します。
    • add_edge(): ノード間の線形遷移を定義します。
    • add_conditional_edges(): 条件に基づいて次に実行するノードを決定する遷移を定義します。
    • END: グラフの終了点を示します。
  4. グラフのコンパイルと実行: compile()メソッドでグラフをコンパイルし、invoke()メソッドで実行します。

以下に、シンプルな2つのノードを持つグラフの例を示します。

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator

# グラフの状態を定義
# StateGraphはTypedDictを推奨します。Annotated[List[T], operator.add] はリストの要素を結合する挙動を示します。
class AgentState(TypedDict):
    messages: Annotated[List[str], operator.add] # メッセージのリストを蓄積

# ノード関数1
def node_one(state: AgentState) -> AgentState:
    print("Executing Node One")
    current_messages = state.get("messages", [])
    current_messages.append("Message from Node One")
    return {"messages": current_messages}

# ノード関数2
def node_two(state: AgentState) -> AgentState:
    print("Executing Node Two")
    current_messages = state.get("messages", [])
    current_messages.append("Message from Node Two")
    return {"messages": current_messages}

# グラフの構築
workflow = StateGraph(AgentState)

# ノードの追加
workflow.add_node("node_one", node_one)
workflow.add_node("node_two", node_two)

# エントリポイントの設定
workflow.set_entry_point("node_one")

# エッジの追加 (node_one -> node_two)
workflow.add_edge("node_one", "node_two")

# 終了ポイントの設定
workflow.add_edge("node_two", END)

# グラフのコンパイル
app = workflow.compile()

# 実行例
initial_state = {"messages": ["Initial message."]}
result = app.invoke(initial_state)
print("\nFinal State:", result)

このコードを実行すると、node_onenode_twoの順に実行され、最終的な状態に両ノードからのメッセージが追加されていることが確認できます。

マルチエージェントRAGの協調ワークフロー実装

単一のRAGシステムでは、多様なドメインの質問や複雑な意図を持つクエリへの対応が難しい場合があります。そこで、LangGraphを用いて複数の専門エージェントを協調させるマルチエージェントRAGシステムを構築します。

このシステムは以下のエージェントで構成されます。

  • スーパーバイザーエージェント: ユーザーからの質問を受け取り、その意図やドメインを分析し、適切な専門エージェントにルーティングします。
  • 専門RAGエージェント: 特定のドメイン(例: ヘルスケア、金融)に特化した知識ベース(ベクトルストア)とプロンプトを持ち、関連情報を検索して回答を生成します。本例ではヘルスケア、金融、一般的なRAGエージェントを用意します。
  • 応答アグリゲーター: 各専門エージェントからの応答を収集し、最終的な回答として統合・要約します。

ワークフローの設計意図

この設計により、以下のメリットが期待できます。

  • 専門性の向上: 各エージェントが特定のドメインに特化することで、より正確で詳細な情報を提供できます。
  • 複雑なクエリへの対応: スーパーバイザーがクエリを分解・ルーティングすることで、多岐にわたる質問にも対応可能になります。
  • 幻覚の低減: ドメイン固有の知識ベースに限定して検索を行うため、無関係な情報による幻覚を抑制できます。
  • スケーラビリティ: 新しいドメインのエージェントを追加する際も、既存のワークフローを大きく変更することなく拡張できます。

実装コード

from langgraph.graph import StateGraph, END
from typing import TypedDict, Annotated, List
import operator
from langchain_core.messages import BaseMessage, HumanMessage

# グラフの状態を定義
class MultiAgentRAGState(TypedDict):
    question: str
    responses: Annotated[List[str], operator.add] # 各エージェントからの応答を蓄積
    next_node: str # ルーティングの決定に使用
    final_answer: str # LLMからの最終回答

# スーパーバイザーエージェント (ルーティングロジック)
# 実際にはLLM呼び出しで意図分類を行います。
# 例: LLM = ChatOpenAI(model="gpt-4o", temperature=0)
# response = LLM.invoke(f"Categorize the following question: '{question}'. Options: health, finance, general. Return only the category name.")
# category = response.content.strip().lower()
def supervisor_agent(state: MultiAgentRAGState) -> MultiAgentRAGState:
    question = state["question"]
    print(f"Supervisor: Analyzing question: '{question}'")
    # ここでは簡略化のためキーワードマッチングで次のノードを決定します
    if "health" in question.lower():
        print("Supervisor: Routing to health_agent")
        return {"next_node": "health_agent"}
    elif "finance" in question.lower():
        print("Supervisor: Routing to finance_agent")
        return {"next_node": "finance_agent"}
    else:
        print("Supervisor: Routing to general_rag_agent")
        return {"next_node": "general_rag_agent"}

# 専門RAGエージェントの例 (ヘルスケア)
def health_agent(state: MultiAgentRAGState) -> MultiAgentRAGState:
    question = state["question"]
    print(f"Health Agent: Processing question: '{question}'")
    # ヘルスケア関連のベクトルストアから情報を検索し、応答を生成するロジックをここに実装
    # 例: retriever = HealthVectorStore.as_retriever()
    #     docs = retriever.invoke(question)
    #     response = LLM.invoke(f"Based on: {docs}, answer: {question}")
    response = f"Health agent processed: '{question}'. (e.g., Consult a doctor for personalized advice.)"
    return {"responses": [response]}

# 専門RAGエージェントの例 (金融)
def finance_agent(state: MultiAgentRAGState) -> MultiAgentRAGState:
    question = state["question"]
    print(f"Finance Agent: Processing question: '{question}'")
    # 金融関連のベクトルストアから情報を検索し、応答を生成するロジックをここに実装
    response = f"Finance agent processed: '{question}'. (e.g., Consider consulting a financial advisor.)"
    return {"responses": [response]}

# 一般RAGエージェント
def general_rag_agent(state: MultiAgentRAGState) -> MultiAgentRAGState:
    question = state["question"]
    print(f"General RAG Agent: Processing question: '{question}'")
    # 一般的なRAGシステムで情報を検索し、応答を生成するロジックをここに実装
    response = f"General RAG agent processed: '{question}'. (e.g., Here is some general information.)"
    return {"responses": [response]}

# 応答アグリゲーター
def response_aggregator(state: MultiAgentRAGState) -> MultiAgentRAGState:
    print("Aggregator: Combining responses.")
    # 各エージェントからの応答を結合して最終回答を生成
    # 実際にはLLMを使って複数の応答を要約・統合する処理を実装します。
    combined_responses = " ".join(state["responses"])
    final_answer = f"Here is a combined answer: {combined_responses}"
    return {"final_answer": final_answer}

# グラフの構築
workflow = StateGraph(MultiAgentRAGState)

workflow.add_node("supervisor", supervisor_agent)
workflow.add_node("health_agent", health_agent)
workflow.add_node("finance_agent", finance_agent)
workflow.add_node("general_rag_agent", general_rag_agent)
workflow.add_node("aggregator", response_aggregator)

workflow.set_entry_point("supervisor")

# 条件付きエッジ (スーパーバイザーの決定に基づいてルーティング)
# state["next_node"] の値に応じて、次に実行するノードを決定します。
workflow.add_conditional_edges(
    "supervisor",
    lambda state: state["next_node"],
    {
        "health_agent": "health_agent",
        "finance_agent": "finance_agent",
        "general_rag_agent": "general_rag_agent",
    },
)

# 各専門エージェントからアグリゲーターへのエッジ
workflow.add_edge("health_agent", "aggregator")
workflow.add_edge("finance_agent", "aggregator")
workflow.add_edge("general_rag_agent", "aggregator")

# アグリゲーターから終了
workflow.add_edge("aggregator", END)

app = workflow.compile()

print("--- Test Case 1: Health Question ---")
inputs_health = {"question": "What are the latest health recommendations for flu season?"}
result_health = app.invoke(inputs_health)
print("Final Result for Health:", result_health["final_answer"])
print("-" * 30)

print("--- Test Case 2: Finance Question ---")
inputs_finance = {"question": "Investment strategies for retirement planning."}
result_finance = app.invoke(inputs_finance)
print("Final Result for Finance:", result_finance["final_answer"])
print("-" * 30)

print("--- Test Case 3: General Question ---")
inputs_general = {"question": "Tell me about current events in technology."}
result_general = app.invoke(inputs_general)
print("Final Result for General:", result_general["final_answer"])
print("-" * 30)

この実装では、質問に含まれるキーワードに基づいてスーパーバイザーがルーティングを行いますが、実際にはLLMを用いてより高度な意図分類を行うことで、柔軟性と精度を向上させることができます。各専門エージェントの内部では、LangChainのRAGコンポーネント(Retriever, LLM, Prompt Template)を組み合わせて具体的な検索・応答生成ロジックを実装します。

つまずきポイントと回避策

LangGraphのような複雑なワークフローを構築する際には、いくつかの一般的な問題に直面する可能性があります。

  1. 状態スキーマの不一致:
    • 問題: TypedDictで定義した状態と、ノード関数が実際に返すキーや値の型が一致しない場合、実行時エラーが発生したり、データが期待通りに伝播しなかったりします。特に、Annotated[List[T], operator.add]のようなアノテーションを使用している場合、ノード関数が単一の要素を返すか、リストを返すかで挙動が変わります。
    • 回避策: 各ノード関数が返す辞書が、AgentStateの型定義に厳密に準拠していることを確認します。operator.addでリストを結合する場合、ノード関数は常にリストを返すように徹底します。開発中はmypyなどの型チェッカーを導入し、早期に型エラーを検出することも有効です。
  2. 条件付きエッジのロジックエラー:
    • 問題: add_conditional_edgesの第二引数(セレクター関数)が返す値と、第三引数(マッピング辞書)のキーが一致しない場合、ワークフローが意図しないノードに遷移したり、エラーで停止したりします。
    • 回避策: セレクター関数が返す値が、マッピング辞書のキーのいずれか一つに正確に一致するようにします。特に、文字列を返す場合は大文字小文字の違いや余分なスペースに注意します。セレクター関数のデバッグ時に、実際にどのような値を返しているかprint文などで確認することが有効です。
  3. LangSmithトレーシングの欠落・不完全:
    • 問題: アプリケーションは動作するものの、LangSmithのUIでトレースが断片化されていたり、一部のステップが欠落していたり、スパンがメインフローに結びついていなかったりする場合があります。これはデバッグやパフォーマンスチューニングを困難にします。
    • 回避策: LangSmith環境変数(LANGCHAIN_TRACING_V2=true, LANGCHAIN_API_KEY, LANGCHAIN_PROJECT)が正しく設定されていることを確認します。カスタムノードや外部ツールを組み込む場合、OpenTelemetryなどの標準的なトレーシングライブラリを使用して、明示的にスパンを開始・終了し、親スパンとの関連付けを行います。また、LangChainやLangGraphのコンポーネントが正しくラップされていることを確認し、公式ドキュメントの推奨設定に従います。(出典: LangGraph公式ドキュメント "LangSmith Integration")
  4. 永続性(Persistence)の未設定による状態の喪失:
    • 問題: Human-in-the-loop (HITL) のためにinterrupt()を呼び出した後、チェックポインターが設定されていないと、ワークフローの状態が失われ、中断したところから再開できません。
    • 回避策: StateGraphをインスタンス化する際に、適切なチェックポインターを設定します。開発中はInMemorySaverで十分ですが、本番環境ではPostgresSaverSqliteSaverのような耐久性のあるチェックポインターを使用します。
      from langgraph.checkpoint.sqlite import SqliteSaver
      memory = SqliteSaver.from_conn_string(":memory:") # または "sqlite:///my_db.sqlite"
      app = workflow.compile(checkpointer=memory)
      
      (出典: LangGraph公式ドキュメント "Persistence")
  5. LLMによるリカバリーループの爆発:
    • 問題: LLMがツール呼び出しなどでエラーを生成し、そのエラーがLLM自身に戻されて再試行を促すと、わずかに異なる間違った引数で再度エラーが発生するといった無限ループに陥ることがあります。
    • 回避策: グラフの状態にretry_countのようなカウンターを追加し、LLMによるリカバリー試行回数を制限します。例えば、3回試行した後にinterrupt()を発動させるか、明確なエラーメッセージでワークフローを失敗させるようにします。また、LLMへのプロンプトで、ツールの使用方法や期待される出力形式をより明確に指示し、エラーが発生した場合の対処法も指示することで、ループを回避できる場合があります。

設計上のトレードオフとベストプラクティス

LangGraphで堅牢なマルチエージェントシステムを構築するためには、いくつかの設計上の考慮事項とベストプラクティスが存在します。

ノードの粒度

  • トレードオフ: ノードの粒度を細かくすることで、各ステップの回復力と可観測性が向上します。LangGraphの永続化層はノード境界でチェックポイントを作成するため、障害発生時にそのノードから再開できます。しかし、細かすぎるとグラフが複雑になり、オーバーヘッドが増加します。
  • ベストプラクティス: 論理的な一連の処理(例: 情報検索、応答生成、ルーティング決定)を1つのノードにまとめることを基本とし、リカバリやHuman-in-the-loopが必要な箇所で粒度を調整します。

状態設計

  • トレードオフ: 状態に何を含めるかは、デバッグの容易さ、メモリ使用量、セキュリティに影響します。
  • ベストプラクティス:
    • 状態は「つまらない」ものに保ち、型付けする: 状態オブジェクトはグラフのバックボーンです。最小限に、明示的に、型付けされた状態を保ちます。TypedDict、Pydantic、dataclassesのいずれかを選択し、コードベース全体で一貫して使用することで、デバッグが容易になり、予期せぬ状態変更を防ぎます。
    • 生データを保持し、プロンプトはオンデマンドでフォーマットする: 状態にはフォーマットされていない生データを格納し、プロンプトはノード内で必要なときにフォーマットします。これにより、異なるノードが同じデータを異なる方法でフォーマットでき、プロンプトテンプレートの変更が状態スキーマに影響を与えず、デバッグが容易になります。

エッジのフロー

  • トレードオフ: 線形エッジはシンプルですが柔軟性に欠け、条件付きエッジは柔軟ですが複雑さが増します。
  • ベストプラクティス:
    • シンプルなエッジを優先し、動作が分岐する場合にのみ条件付きエッジを追加する: 線形ステップにはシンプルなエッジを連続させ、状態が実際に分岐する場合にのみ条件付きエッジを使用します。これにより、グラフの可読性と保守性が向上します。
    • LLMをルーターとして使用する: 軽量なLLM呼び出し(例えば、gpt-3.5-turboやファインチューニングされた小規模モデル)を使用して、高価な検索を行う前に意図を分類します。これはルールベースのルーティングよりも高速で柔軟ですが、LLMの推論コストとレイテンシを考慮する必要があります。

メモリと永続性

  • トレードオフ: メモリを適切に管理することで、エージェントの会話能力が向上しますが、メモリリークや状態の複雑化のリスクも伴います。
  • ベストプラクティス:
    • 本番環境ではチェックポインターを使用する: PostgresSaverSqliteSaverのような本番環境対応の耐久性のあるチェックポインターを使用します。これにより、システム障害時にも状態が失われず、ワークフローを再開できます。
    • メモリの種類を理解する: LangGraphエージェントはデフォルトではステートレスですが、チャット履歴などのメモリを状態に含めることで、エージェントの会話能力を高めます。

エラーハンドリング

  • トレードオフ: 詳細なエラーハンドリングは堅牢性を高めますが、コードの複雑さが増します。
  • ベストプラクティス:
    • 各ノードでエラーハンドリングを実装する: 信頼性を維持するために、各ノードでエラーハンドリングを実装します。これにより、特定のエラーがシステム全体に波及するのを防ぎます。
    • リトライポリシーを活用する: 一時的な障害に対しては、LangGraphのリトライポリシーを使用して、グラフレベルで構造化された方法で再試行を管理します。
    • 型付きエラーハンドリング: エラーを特定のタイプに基づいて分類・管理することで、正確で効果的なエラー解決を促進します。
    • Human-in-the-loop (HITL) でエラーを修正する: システムやLLMでは修正できないエラー(例: ドキュメントに署名日がない、入力が曖昧)に対しては、interrupt()を使用して人間が介入できるようにします。ただし、これにはチェックポインターが必要です。(出典: LangGraph公式ドキュメント "Error Handling", "Retry Policies")

可観測性

  • ベストプラクティス:
    • LangSmithによるトレーシング: マルチエージェントシステムで問題が発生した場合、「どのエージェントが誤動作したのか、なぜか」を特定するために、LangSmithとの統合によるトレーシングが不可欠です。これにより、複雑なワークフローのデバッグと最適化が容易になります。

テストと品質

  • ベストプラクティス:
    • 関数だけでなくグラフもテストする: グラフのルーティングロジックや終了条件など、エッジに多くのバグが潜むため、エージェント自体と同じくらい、あるいはそれ以上にテストが必要です。単体テストだけでなく、統合テストも重視します。
    • 評価ループの欠如を避ける: マルチエージェントシステムが単一エージェントよりも実際に優れているかどうかを測定できない場合、構築すべきではありません。リファクタリングを開始する前に評価ベンチマークを設定し、継続的に評価を行います。

まとめ

本記事では、LangGraphを用いてマルチエージェントRAGの協調ワークフローを実装する具体的な手順とコード例を解説しました。スーパーバイザーエージェントによるルーティング、専門RAGエージェントによる情報検索、応答アグリゲーターによる最終回答の統合という流れで、複雑な問い合わせに対する対応能力と回答精度を向上させるシステムを構築できることを示しました。

LangGraphの強力なグラフベースのオーケストレーション機能は、エージェント間の複雑な連携や状態管理を簡潔に記述することを可能にします。今回紹介した実装例は基礎的なものですが、これをベースにLLMの活用、より高度なルーティングロジック、永続性、Human-in-the-loopなどの機能を組み込むことで、実用的なアプリケーションへと発展させることが可能です。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?