はじめに
マルチエージェントのフロー
この図は、ユーザーからの入力に基づいて特定のタスクを遂行するために協力するマルチエージェントシステムのフローチャートです。
例として「アラスカの過去10年間の平均気温のグラフを生成する」というリクエストに応じるプロセスが描かれています。
エージェント | 役割 | 機能 |
---|---|---|
ユーザー | リクエストを入力 | 「アラスカの過去10年間の平均気温のグラフを生成する」というリクエストを入力する |
リサーチャー | 必要なデータを検索 | 「search」関数を呼び出して適切な気温データを見つける 出力は「続ける」と「送信者がチャートジェネレーターである」場合にチャートジェネレーターに送られる |
ルーター | 次のエージェントを決定 | 各エージェントからの出力に基づいて条件分岐を行う 「続ける」という指示があり、送信者が「リサーチャー」の場合、メッセージはチャートジェネレーターにルーティングされる 特定の関数が呼び出された場合、追加のツール(例:API呼び出し)が使用される可能性がある |
チャートジェネレーター | グラフの生成とコード実行 | 収集されたデータを基にグラフを生成する 生成されたチャートは最終的なアウトプットとしてユーザーに提供される |
1.エージェントを作成する
from langchain_core.messages import BaseMessage, ToolMessage, HumanMessage
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langgraph.graph import END, StateGraph
def create_agent(llm, tools, system_message: str):
"""エージェントを作成する。
Args:
llm (LanguageModel): 使用する言語モデル。
tools (list): エージェントがアクセスするツールのリスト。
system_message (str): エージェントに表示するシステムメッセージ。
Returns:
PromptTemplate: 構成されたプロンプトテンプレート。
"""
# エージェントの役割とツールの使用方法を説明するシステムメッセージを作成
prompt = ChatPromptTemplate.from_messages([
(
"system",
"You are a helpful AI assistant, collaborating with other assistants. "
"Use the provided tools to progress towards answering the question. "
"If you are unable to fully answer, that's OK, another assistant with different tools "
"will help where you left off. Execute what you can to make progress. "
"If you or any of the other assistants have the final answer or deliverable, "
"prefix your response with FINAL ANSWER so the team knows to stop. "
"You have access to the following tools: {tool_names}.\n{system_message}",
),
MessagesPlaceholder(variable_name="messages"),
])
prompt = prompt.partial(system_message=system_message) # システムメッセージをプロンプトに追加
prompt = prompt.partial(tool_names=", ".join([tool.name for tool in tools])) # ツール名をプロンプトに追加
return prompt | llm.bind_tools(tools) # プロンプトとツールを結合
2.ツールを定義する
from langchain_core.tools import tool
from typing import Annotated
from langchain_experimental.utilities import PythonREPL
from langchain_community.tools.tavily_search import TavilySearchResults
tavily_tool = TavilySearchResults(max_results=5) # 検索ツールをインスタンス化
# Pythonコード実行用のREPL環境をセットアップ
repl = PythonREPL()
@tool
def python_repl(code: Annotated[str, "The python code to execute to generate your chart."]):
"""Pythonコードを実行するためのツール。
Args:
code (str): 実行するPythonコード。
Returns:
str: 実行結果の文字列。
"""
try:
result = repl.run(code) # コードを実行
except BaseException as e:
return f"Failed to execute. Error: {repr(e)}" # エラーが発生した場合の処理
result_str = f"Successfully executed:\n```python\n{code}\n```\nStdout: {result}"
return result_str + "\n\nIf you have completed all tasks, respond with FINAL ANSWER." # 成功した場合の出力
3. グラフの定義
次に、LangGraphのグラフステートを定義し、それを用いてエージェントノード間でのメッセージの受け渡しを管理する方法を示します。これにより、各エージェントがどのようにして情報を共有し、タスクを効果的に処理するかを制御します。
グラフステートの定義
グラフの状態は、エージェント間で共有される情報の構造を定義します。この状態は、すべてのエージェントノード間で共有され、各エージェントが次のアクションを決定する際の基盤となります。
import operator
from typing import Annotated, Sequence, TypedDict
from langchain_openai import ChatOpenAI
from typing_extensions import TypedDict
# AgentStateは各ノード間で受け渡されるオブジェクトを定義します。
# ここでは、メッセージのリストと最新の送信者を追跡するキーを含みます。
class AgentState(TypedDict):
messages: Annotated[Sequence[BaseMessage], operator.add] # メッセージのシーケンス。operator.addによってリストの追加操作が可能
sender: str # 最後にメッセージを送信したエージェントの名前
エージェントノードを定義
import functools
from langchain_core.messages import AIMessage, ToolMessage
# 特定のエージェントのためのノードを作成するヘルパー関数
def agent_node(state, agent, name):
result = agent.invoke(state) # エージェントを実行して結果を取得
# エージェントの出力をグローバル状態に適した形式に変換
if isinstance(result, ToolMessage):
pass # ツールメッセージの場合は特別な処理を行わない
else:
# 通常のエージェントメッセージの場合、エージェント名を付けてメッセージリストに追加
result = AIMessage(**result.dict(exclude={"type", "name"}), name=name)
return {
"messages": [result], # メッセージリストに結果を追加
"sender": name, # 送信者情報を更新
}
# OpenAIのGPT-4モデルを利用するエージェントのインスタンスを作成
llm = ChatOpenAI(model="gpt-4-1106-preview")
# 研究エージェントとノード
research_agent = create_agent(
llm,
[tavily_tool],
system_message="You should provide accurate data for the chart_generator to use.",
)
research_node = functools.partial(agent_node, agent=research_agent, name="Researcher")
# チャート生成エージェントとノード
chart_agent = create_agent(
llm,
[python_repl],
system_message="Any charts you display will be visible by the user.",
)
chart_node = functools.partial(agent_node, agent=chart_agent, name="chart_generator")
ツールノードの定義
ツールノードは、特定のツールを実行するための専用のノードです。このノードを定義することで、エージェントがタスクを実行する際に必要なツールの呼び出しが可能になります。
from langgraph.prebuilt import ToolNode
# 利用可能なツールのリストを定義
tools = [tavily_tool, python_repl]
# ToolNodeを使用してツールノードを作成
tool_node = ToolNode(tools)
エッジロジックの定義
エッジロジックは、エージェントの出力に基づいて次にどのノードに遷移するかを決定します。このロジックにより、グラフ内の流れが制御され、適切なエージェントやツールがタイミング良く呼び出されます。
from typing import Literal
# エージェントの出力に基づいて次のアクションを決定するルーター関数
def router(state) -> Literal["call_tool", "__end__", "continue"]:
# 現在の状態から最新のメッセージを取得
messages = state["messages"]
last_message = messages[-1]
if last_message.tool_calls:
# 前のエージェントがツールを呼び出した場合、ツールノードへ遷移
return "call_tool"
if "FINAL ANSWER" in last_message.content:
# どれかのエージェントが作業完了を決定した場合、プロセスを終了
return "__end__"
# 上記の条件に当てはまらない場合、次のエージェントへ継続
return "continue"
グラフの定義
エージェント、ツール、エッジロジックをすべて組み合わせて、LangGraphの状態グラフを定義します。このグラフは、エージェント間でタスクがどのように移動し、どのように処理されるかを視覚的に表現するためのものです。
from langgraph.graph import StateGraph
from IPython.display import Image, display
# 新しい状態グラフのインスタンスを作成
workflow = StateGraph(AgentState)
# エージェントノードを追加
workflow.add_node("Researcher", research_node)
workflow.add_node("chart_generator", chart_node)
workflow.add_node("call_tool", tool_node)
# 条件付きエッジを追加
workflow.add_conditional_edges(
"Researcher",
router,
{"continue": "chart_generator", "call_tool": "call_tool", "__end__": END},
)
workflow.add_conditional_edges(
"chart_generator",
router,
{"continue": "Researcher", "call_tool": "call_tool", "__end__": END},
)
# ツール呼び出し後のルーティングを設定
workflow.add_conditional_edges(
"call_tool",
lambda x: x["sender"], # ツールを呼び出した元のエージェントに戻るための条件
{
"Researcher": "Researcher",
"chart_generator": "chart_generator",
},
)
# グラフの開始点を設定
workflow.set_entry_point("Researcher")
# グラフをコンパイル
graph = workflow.compile()
# グラフをビジュアル化して表示(オプション)
try:
display(Image(graph.get_graph(xray=True).draw_mermaid_png()))
except Exception as e:
print("グラフの表示に必要な依存関係がありません:", e)
グラフの起動と実行
定義したグラフを使用して実際にタスクを実行する手順です。この例では、イギリスの過去5年間の国内総生産(GDP)を取得し、それをライングラフに描画するようエージェントに指示します。グラフは指定されたメッセージを元にプロセスを進め、結果をステップごとに出力します。
from langchain_core.messages import HumanMessage
# グラフを起動し、指定されたタスクを処理するストリームを開始
events = graph.stream(
{
"messages": [
HumanMessage(
content="Fetch the UK's GDP over the past 5 years, then draw a line graph of it. Once you code it up, finish."
)
],
},
# グラフの実行ステップ数の上限を設定
{"recursion_limit": 150},
)
# イベントの結果を出力
for s in events:
print(s)
print("----")
プロジェクトのディレクトリ構造
LangGraphProject/
│
├── agents/ # エージェント関連のスクリプト
│ ├── __init__.py
│ ├── researcher_agent.py # 研究者エージェントの定義
│ └── chart_agent.py # チャート生成エージェントの定義
│
├── tools/ # ツール定義用ディレクトリ
│ ├── __init__.py
│ ├── database_tools.py # データベース検索や処理ツール
│ └── chart_tools.py # チャート作成ツール
│
├── graph/ # グラフ構造定義
│ ├── __init__.py
│ └── workflow.py # エージェントとツールのワークフロー定義
│
├── messages/ # メッセージ形式の定義
│ ├── __init__.py
│ └── message_types.py
│
└── main.py # メイン実行スクリプト