こちらの続きです。
要件
- 複数のGenieスペースが存在している。
- それぞれのGenieスペースに対応するエージェントを構築する(Genieエージェント)。
- 問い合わせによっては、複数のGenieエージェントからの結果を統合して回答を行う。
冒頭の記事はマルチエージェントシステムの理解の助けになったのですが、こちらのグラフ構造は以下のようになっています。Genie_2022は2022年のCovid感染者数、Genie_2023は2023年のCovid感染者数のデータを持つGenieスペースとやり取りを行います。
このように、複数のGenieエージェントを接続することは可能なのですが、スーパーバイザーがどのエージェントを呼び出すのかは、過去のエージェントとのやり取りのみに依存しています。結果として東京都における2023/3と2022/3の感染者数の比較
という問い合わせをすると、Genie_2022は2023年のデータがありません
と延々と回答し続けたり、2022年のデータを取得しても繰り返しGenie_2022に問い合わせ続けるなど、期待した通りの動作をしてくれませんでした。
なので、上記要件に対応するスーパーバイザーを構築してみました。
アプローチ
要件が複数Genieエージェントの結果を統合した回答生成なので、以下のような構成にしています。
- ユーザーの質問に応じてスーパーバイザーが呼び出すエージェントを特定
- 複数のエージェントからの回答を統合
- 統合した結果から最終回答を生成
実装
%pip install -U -qqq mlflow langgraph==0.3.4 databricks-langchain databricks-agents uv
dbutils.library.restartPython()
以下のエージェント実装が大きく変わっています。Claudeに教えてもらいながら作りました。
%%writefile agent.py
import functools
import os
import json
from typing import Any, Generator, Literal, Optional, TypedDict, List, Dict
import mlflow
from databricks.sdk import WorkspaceClient
from databricks_langchain import (
ChatDatabricks,
DatabricksFunctionClient,
UCFunctionToolkit,
set_uc_function_client,
)
from databricks_langchain.genie import GenieAgent
from langchain_core.runnables import RunnableLambda
from langgraph.graph import START, END, StateGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt import create_react_agent
from mlflow.langchain.chat_agent_langgraph import ChatAgentState
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
ChatAgentChunk,
ChatAgentMessage,
ChatAgentResponse,
ChatContext,
)
from pydantic import BaseModel
import uuid
# LangChainモデルの自動ログを有効化
mlflow.langchain.autolog()
###################################################
## GenieAgentを作成してGenie Spaceにアクセス
###################################################
# 2022年の感染データのGenie Space ID
GENIE_SPACE_ID_2022 = "01f041bf967b1e97bb500f06c8dabf8f"
genie_agent_description_2022 = "このエージェントは2022年の感染者数に関する質問に答えることができます"
# GenieAgentを作成
genie_agent_2022 = GenieAgent(
genie_space_id=GENIE_SPACE_ID_2022 ,
genie_agent_name="Genie_2022",
description=genie_agent_description_2022 ,
# DB_MODEL_SERVING_HOST_URLはエージェントエンドポイントで設定されますが、ノートブックには存在しません
client=WorkspaceClient(
host=os.getenv("DATABRICKS_HOST") or os.getenv("DB_MODEL_SERVING_HOST_URL"),
token=os.getenv("DATABRICKS_GENIE_PAT"),
),
)
# 2022年の感染データのGenie Space ID
GENIE_SPACE_ID_2023 = "01f041bfaa43151c8412b28706fde53f"
genie_agent_description_2023 = "このエージェントは2023年の感染者数に関する質問に答えることができます"
# GenieAgentを作成
genie_agent_2023 = GenieAgent(
genie_space_id=GENIE_SPACE_ID_2023,
genie_agent_name="Genie_2023",
description=genie_agent_description_2023,
# DB_MODEL_SERVING_HOST_URLはエージェントエンドポイントで設定されますが、ノートブックには存在しません
client=WorkspaceClient(
host=os.getenv("DATABRICKS_HOST") or os.getenv("DB_MODEL_SERVING_HOST_URL"),
token=os.getenv("DATABRICKS_GENIE_PAT"),
),
)
############################################
# LLMエンドポイントとシステムプロンプトを定義
############################################
# マルチエージェントGenieはGPT 4oおよびGPT o1モデルで最適に動作します。
LLM_ENDPOINT_NAME = "taka-gpt-4o"
assert LLM_ENDPOINT_NAME is not None
llm = ChatDatabricks(endpoint=LLM_ENDPOINT_NAME)
#######################################
# 状態定義(メッセージ履歴なし版)
#######################################
class AgentState(TypedDict):
user_question: str # ユーザーの最初の質問のみ保持
next_nodes: Optional[List[str]] # 実行すべきエージェント名リスト
result_2022: Optional[str] # Genie_2022 → result_2022
result_2023: Optional[str] # Genie_2023 → result_2023
final_answer: Optional[str] # 最終統合回答
#######################################
# マルチエージェントのグラフ構造を定義
#######################################
# エージェントノード関数を定義(履歴管理なし版)
def agent_node(state: AgentState, agent, result_key: str, agent_name: str) -> AgentState:
"""エージェントを実行し、結果のみをstateに保存する"""
try:
print(f"Executing agent: {agent_name}")
# エージェント用の最小限のメッセージ構造を作成
agent_input = {
"messages": [
{"role": "user", "content": state["user_question"]}
]
}
# エージェント実行
result = agent.invoke(agent_input)
# stateのコピーを作成
updated_state = dict(state)
# エージェントの結果のみを保存
agent_content = result["messages"][-1].content
updated_state[result_key] = agent_content
print(f"Agent {agent_name} completed successfully")
print(f"Result length: {len(agent_content)} characters")
return updated_state
except Exception as e:
print(f"Error in {agent_name}: {str(e)}")
# エラー時もstateを更新
updated_state = dict(state)
error_message = f"{agent_name}でエラーが発生しました: {str(e)}"
updated_state[result_key] = error_message
return updated_state
# エージェントノードの部分関数を作成
agent_node_2022 = functools.partial(
agent_node,
agent=genie_agent_2022,
result_key="result_2022",
agent_name="Genie_2022"
)
agent_node_2023 = functools.partial(
agent_node,
agent=genie_agent_2023,
result_key="result_2023",
agent_name="Genie_2023"
)
#############################
# スーパーバイザーエージェントを定義(簡素化版)
#############################
def supervisor_fn(state: AgentState) -> AgentState:
"""どのエージェントが必要かを判断"""
user_input = state["user_question"]
prompt = f"""次の質問に対して、以下の2つのエージェントのうち、どちらが必要かを考えてください。
1. 2022年エージェント(Covid感染者数の情報) -> "agent_2022"
2. 2023年エージェント(Covid感染者数の情報) -> "agent_2023"
必要なエージェントを以下のJSON形式で返してください:
- 2022年エージェントのみ必要: ["agent_2022"]
- 2023年エージェントのみ必要: ["agent_2023"]
- 両方必要: ["agent_2022", "agent_2023"]
質問: {user_input}
回答(JSON配列のみ):"""
try:
response = llm.invoke(prompt)
response_text = response.content.strip()
print(f"Supervisor response: {response_text}")
# コードブロックを除去する処理
if response_text.startswith('```json'):
response_text = response_text.replace('```json', '').replace('```', '').strip()
elif response_text.startswith('```'):
response_text = response_text.replace('```', '').strip()
response_text = response_text.replace('\n', '').replace('\r', '').strip()
# JSONパースを試行
next_nodes = None
try:
next_nodes = json.loads(response_text)
except json.JSONDecodeError:
# 正規表現でJSONを抽出
import re
json_match = re.search(r'\[(.*?)\]', response_text)
if json_match:
json_str = '[' + json_match.group(1) + ']'
try:
next_nodes = json.loads(json_str)
except:
next_nodes = None
# パースに失敗した場合のフォールバック
if next_nodes is None:
response_lower = response_text.lower()
if 'agent_2022' in response_lower and 'agent_2023' in response_lower:
next_nodes = ["agent_2022", "agent_2023"]
elif 'agent_2022' in response_lower:
next_nodes = ["agent_2022"]
elif 'agent_2023' in response_lower:
next_nodes = ["agent_2023"]
else:
next_nodes = ["agent_2022"] # デフォルト
# 有効性チェック
valid_nodes = ["agent_2022", "agent_2023"]
if isinstance(next_nodes, list):
next_nodes = [node for node in next_nodes if node in valid_nodes]
else:
next_nodes = ["agent_2022"]
if not next_nodes:
next_nodes = ["agent_2022"]
print(f"Final supervisor decision: {next_nodes}")
# stateを更新
updated_state = dict(state)
updated_state["next_nodes"] = next_nodes
return updated_state
except Exception as e:
print(f"Error in supervisor: {str(e)}")
updated_state = dict(state)
updated_state["next_nodes"] = ["agent_2022"] # エラー時のデフォルト
return updated_state
def aggregator_fn(state: AgentState) -> AgentState:
"""各エージェントの回答を統合"""
user_input = state["user_question"]
prompt = f"""以下は複数のAIエージェントからの回答です。それらを統合してユーザーの質問にわかりやすく日本語で回答してください。
ユーザーの質問:
{user_input}
2022年エージェント(Genie_2022)からの回答:
{state.get("result_2022", "回答なし")}
2023年エージェント(Genie_2023)からの回答:
{state.get("result_2023", "回答なし")}
統合された回答を提供してください:"""
try:
response = llm.invoke(prompt)
# stateを更新
updated_state = dict(state)
updated_state["final_answer"] = response.content
print(f"Aggregator completed. Final answer length: {len(response.content)} characters")
return updated_state
except Exception as e:
print(f"Error in aggregator: {str(e)}")
updated_state = dict(state)
error_message = f"回答統合でエラーが発生しました: {str(e)}"
updated_state["final_answer"] = error_message
return updated_state
#############################
# ルーティング関数の定義
#############################
def route_after_supervisor(state: AgentState) -> str:
"""スーパーバイザー後のルーティング"""
next_nodes = state.get("next_nodes", [])
print(f"Routing after supervisor with next_nodes: {next_nodes}")
if not next_nodes:
return "final_answer_node"
elif len(next_nodes) == 1:
return next_nodes[0]
elif len(next_nodes) == 2:
return "agent_2022" # 両方の場合はagent_2022から開始
else:
return "final_answer_node"
def route_after_2022(state: AgentState) -> str:
"""agent_2022後のルーティング"""
next_nodes = state.get("next_nodes", [])
print(f"Routing after agent_2022 with next_nodes: {next_nodes}")
if "agent_2023" in next_nodes and len(next_nodes) > 1:
return "agent_2023"
else:
return "aggregator"
#############################
# デバッグ関数
#############################
def debug_state(state: AgentState, node_name: str) -> AgentState:
"""デバッグ用のstate確認"""
print(f"\n=== DEBUG: {node_name} ===")
print(f"User question: {state.get('user_question', 'None')[:100]}...")
print(f"Next nodes: {state.get('next_nodes', 'None')}")
print(f"2022 result: {'Present' if state.get('result_2022') else 'None'}")
print(f"2023 result: {'Present' if state.get('result_2023') else 'None'}")
print(f"Final answer: {'Present' if state.get('final_answer') else 'None'}")
print("=" * 30)
return state
#############################
# LangGraph定義
#############################
workflow = StateGraph(AgentState)
# 各ノードを登録
workflow.add_node("supervisor", supervisor_fn)
workflow.add_node("agent_2022", agent_node_2022)
workflow.add_node("agent_2023", agent_node_2023)
workflow.add_node("aggregator", aggregator_fn)
workflow.add_node("final_answer_node", lambda state: state)
# エッジの定義
workflow.add_edge(START, "supervisor")
# supervisor → 条件分岐
workflow.add_conditional_edges(
"supervisor",
route_after_supervisor,
{
"agent_2022": "agent_2022",
"agent_2023": "agent_2023",
"final_answer_node": "final_answer_node"
}
)
# agent_2022 → 条件分岐
workflow.add_conditional_edges(
"agent_2022",
route_after_2022,
{
"agent_2023": "agent_2023",
"aggregator": "aggregator"
}
)
# agent_2023 → aggregator
workflow.add_edge("agent_2023", "aggregator")
# aggregator → final_answer_node → END
workflow.add_edge("aggregator", "final_answer_node")
workflow.add_edge("final_answer_node", END)
# Graphコンパイル
multi_agent = workflow.compile()
print("Workflow compiled successfully!")
###################################
# マルチエージェントをChatAgentにラップ(簡素化版)
###################################
class LangGraphChatAgent(ChatAgent):
def __init__(self, agent: CompiledStateGraph):
self.agent = agent
def predict(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> ChatAgentResponse:
"""同期推論用の予測関数"""
# 最初のユーザーメッセージを取得
user_question = ""
for m in messages:
if m.role == "user":
user_question = m.content
break
if not user_question:
return ChatAgentResponse(messages=[
ChatAgentMessage(
id=str(uuid.uuid4()),
role="assistant",
content="ユーザーの質問が見つかりませんでした。"
)
])
# 初期状態を設定(messagesなし)
initial_state: AgentState = {
"user_question": user_question,
"next_nodes": None,
"result_2022": None,
"result_2023": None,
"final_answer": None
}
try:
print(f"Processing question: {user_question[:100]}...")
# エージェントを実行
final_state = self.agent.invoke(initial_state)
# 最終回答を取得
final_answer = final_state.get("final_answer", "")
if not final_answer:
# フォールバック: 個別の結果を結合
result_2022 = final_state.get("result_2022", "")
result_2023 = final_state.get("result_2023", "")
if result_2022 or result_2023:
combined_result = ""
if result_2023:
combined_result += f"2023年情報: {result_2023}\n\n"
if result_2022:
combined_result += f"2022年情報: {result_2022}"
final_answer = combined_result.strip()
else:
final_answer = "申し訳ございませんが、回答を生成できませんでした。"
response_message = ChatAgentMessage(
id=str(uuid.uuid4()),
role="assistant",
content=final_answer
)
return ChatAgentResponse(messages=[response_message])
except Exception as e:
print(f"Error in predict: {str(e)}")
error_message = ChatAgentMessage(
id=str(uuid.uuid4()),
role="assistant",
content=f"エラーが発生しました: {str(e)}"
)
return ChatAgentResponse(messages=[error_message])
# エージェントオブジェクトを作成
AGENT = LangGraphChatAgent(multi_agent)
mlflow.models.set_model(AGENT)
import os
# シークレットスコープ名とシークレットキー名を設定して、PATにアクセスします
secret_scope_name = "demo-token-takaaki.yayoi"
secret_key_name = "pat"
os.environ["DATABRICKS_GENIE_PAT"] = dbutils.secrets.get(
scope=secret_scope_name, key=secret_key_name
)
assert os.environ["DATABRICKS_GENIE_PAT"] is not None, (
"The DATABRICKS_GENIE_PAT was not properly set to the PAT secret"
)
dbutils.library.restartPython()
グラフ構造はこちら。
from agent import AGENT
from IPython.display import Image, display
display(Image(AGENT.agent.get_graph().draw_mermaid_png()))
結果を統合するaggregator
を追加しています。
動作確認します。
input_example = {
"messages": [
{
"role": "user",
"content": "東京都の2023/3の感染者数の推移",
}
]
}
AGENT.predict(input_example)
Genie_2023
が選択されます。
Processing question: 東京都の2023/3の感染者数の推移...
Supervisor response: ```json
["agent_2023"]
```
Final supervisor decision: ['agent_2023']
Routing after supervisor with next_nodes: ['agent_2023']
Executing agent: Genie_2023
Agent Genie_2023 completed successfully
Result length: 1187 characters
Aggregator completed. Final answer length: 532 characters
input_example = {
"messages": [
{
"role": "user",
"content": "東京都の2022/3の感染者数の推移",
}
]
}
AGENT.predict(input_example)
Genie_2022
が選択されます。
Processing question: 東京都の2022/3の感染者数の推移...
Supervisor response: ```json
["agent_2022"]
```
Final supervisor decision: ['agent_2022']
Routing after supervisor with next_nodes: ['agent_2022']
Executing agent: Genie_2022
Agent Genie_2022 completed successfully
Result length: 1187 characters
Routing after agent_2022 with next_nodes: ['agent_2022']
Aggregator completed. Final answer length: 602 characters
input_example = {
"messages": [
{
"role": "user",
"content": "東京都における2023/3と2022/3の感染者数の比較",
}
]
}
AGENT.predict(input_example)
両方が選択されます。
Processing question: 東京都における2023/3と2022/3の感染者数の比較...
Supervisor response: ```json
["agent_2022", "agent_2023"]
```
Final supervisor decision: ['agent_2022', 'agent_2023']
Routing after supervisor with next_nodes: ['agent_2022', 'agent_2023']
Executing agent: Genie_2022
Agent Genie_2022 completed successfully
Result length: 68 characters
Routing after agent_2022 with next_nodes: ['agent_2022', 'agent_2023']
Executing agent: Genie_2023
Agent Genie_2023 completed successfully
Result length: 68 characters
Aggregator completed. Final answer length: 126 characters
動きました!
しかし、デバッグにはトレースもそうですが、途中の処理を確認できるようにしておかないとなかなか大変ですね。
これをデプロイしていきます。
# デプロイ時に自動認証パススルーを指定するためのDatabricksリソースを決定
import mlflow
from agent import GENIE_SPACE_ID_2022, GENIE_SPACE_ID_2023, LLM_ENDPOINT_NAME
from databricks_langchain import UnityCatalogTool, VectorSearchRetrieverTool
from mlflow.models.resources import (
DatabricksFunction,
DatabricksGenieSpace,
DatabricksServingEndpoint,
)
# TODO: 必要に応じて基礎となるリソースを手動で含める。詳細は上記のマークダウンのTODOを参照。
resources = [
DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
DatabricksGenieSpace(genie_space_id=GENIE_SPACE_ID_2022),
DatabricksGenieSpace(genie_space_id=GENIE_SPACE_ID_2023),
]
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
artifact_path="agent",
python_model="agent.py",
input_example=input_example,
pip_requirements=[
"mlflow",
"langgraph==0.3.4",
"databricks-langchain",
"pydantic",
],
resources=resources,
)
mlflow.models.predict(
model_uri=f"runs:/{logged_agent_info.run_id}/agent",
input_data=input_example,
env_manager="uv",
)
mlflow.set_registry_uri("databricks-uc")
# TODO: UCモデルのカタログ、スキーマ、およびモデル名を定義する
catalog = "takaakiyayoi_catalog"
schema = "agents"
model_name = "multi_genie_agent"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"
# モデルをUCに登録
uc_registered_model_info = mlflow.register_model(
model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME
)
secret_scope_name = "demo-token-takaaki.yayoi"
secret_key_name = "pat"
from databricks import agents
agents.deploy(
UC_MODEL_NAME,
uc_registered_model_info.version,
tags={"endpointSource": "docs"},
environment_vars={
"DATABRICKS_GENIE_PAT": f"{{{{secrets/{secret_scope_name}/{secret_key_name}}}}}"
},
)
デプロイされました。
AI Playgroundでも動きました!
まとめ
-
要件に従ってエージェントシステムを構築することで、LangGraphのお作法に慣れ親しむことができました。
-
構成要素が多い(LLM、プロンプト、ノード、エッジ)ので、トレースやデバッグ情報の出力処理を活用することが大事。例えば、今回は複数エージェントからのアウトプットが統合できないバグに悩みましたが、以下のトレースを確認することでアウトプットが埋め込まれているかどうかを確認できました。
-
デバッグの観点ではすべてのプロンプトは日本語にした方が吉。
-
データの統合のパターンは多岐にわたるので、どのような統合を行いたいかによって実装が異なります。例えば、JOINを行うとなるとさらに高度な実装が求められます(そもそも可能かという疑問もあります)。
-
ユーザーの質問をそのままエージェントに渡すのではなく、それぞれのエージェントに適した質問に変換するのも有効そう。
-
GenieAgent
のプロパティdecritpion
はRunnableLambda
のdecritpion
に設定されるものであり、Genieスペースの指示を上書きするものではありません。Genieスペースの挙動を制御する場合には、Genieスペースの指示を設定しましょう。今回のケースでは確認を求める場合があったので、以下の指示を追加しています。COVID感染者数データを提供するエージェントとして動作します。問い合わせが来たら確認を行わずにデータを提供します。
-
こういう新しい概念を学びながらコーディングするには、生成AIの助けは不可欠です。困ったときにはエージェントの実装(agent.py)をClaudeやChatGPTにアップロードしてQ&Aしていました。