こちらの続きです。
前編ではデータの準備とエージェントのプロトタイピングを行いました。後編ではエージェントの構築と評価、デプロイメントまでをカバーします。(本当はAppsまで行きたいですが)
ノートブックなどはこちらに格納しています。
Databricksでエージェントシステムを構築する パート2 - エージェント評価
エージェントを作成したので、そのパフォーマンスをどのように評価するのでしょうか? 第2部では、評価に焦点を当てるために製品サポートエージェントを作成します。 このエージェントは、RAGアプローチを使用して製品ドキュメントを活用し、製品に関する質問に回答します。
エージェントの実装: agent.py
こちらが後半の肝となります。前編で定義したツールに加えて、データ準備で作成したVector Search Indexもツールとして取り込んでいます。
注意
ここでの評価はRAGの評価なので、正直なところ前半のツールはなくても大丈夫です。しかし、実運用を考えるとこれまでのツールも登録しておいた方がいいと思って含めています。実際にはVector Searchだけではなくその他のツールも具備したエージェントを構築することになりますし、その際にはそれらのツールの挙動も含めた評価データセットを準備して評価することになります。
ここでのポイントはLLMエンドポイントやツール、Vector Search Indexを環境変数経由で設定できるようにしているところです。これによって、agent.py
を編集することなしに他のLLMやツールと連携させることができます。
from typing import Any, Generator, Optional, Sequence, Union
import mlflow
from databricks_langchain import (
ChatDatabricks,
VectorSearchRetrieverTool,
DatabricksFunctionClient,
UCFunctionToolkit,
set_uc_function_client,
)
from langchain_core.language_models import LanguageModelLike
from langchain_core.runnables import RunnableConfig, RunnableLambda
from langchain_core.tools import BaseTool
from langgraph.graph import END, StateGraph
from langgraph.graph.graph import CompiledGraph
from langgraph.graph.state import CompiledStateGraph
from langgraph.prebuilt.tool_node import ToolNode
from mlflow.langchain.chat_agent_langgraph import ChatAgentState
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
ChatAgentChunk,
ChatAgentMessage,
ChatAgentResponse,
ChatContext,
)
import os
from typing import List, Dict, Any
import json
############################################
# 環境変数から設定を取得
############################################
def get_env_config():
"""環境変数から必要最小限の設定を取得"""
# 必須の環境変数を取得
llm_endpoint = os.environ.get("LLM_ENDPOINT_NAME")
uc_tool_names = os.environ.get("UC_TOOL_NAMES", "")
vs_name = os.environ.get("VS_NAME", "")
# 必須項目の検証
if not llm_endpoint:
raise ValueError("LLM_ENDPOINT_NAME environment variable is required")
if not uc_tool_names:
raise ValueError("UC_TOOL_NAMES environment variable is required")
if not vs_name:
raise ValueError("VS_NAME environment variable is required")
# ツール名を分割(空文字列や空白を除去)
tool_names = [name.strip() for name in uc_tool_names.split(",") if name.strip()]
config = {
"llm_endpoint": llm_endpoint,
"uc_tool_names": tool_names,
"vs_name": vs_name
}
return config
# 設定を取得
config = get_env_config()
LLM_ENDPOINT_NAME = config['llm_endpoint']
UC_TOOL_NAMES = config['uc_tool_names']
VS_NAME = config['vs_name']
# 設定確認用の出力
print("エージェントの設定:")
print(f"LLM_ENDPOINT_NAME: {LLM_ENDPOINT_NAME}")
print(f"UC_TOOL_NAMES: {UC_TOOL_NAMES}")
print(f"VS_NAME: {VS_NAME}")
# LangChain/MLflowの自動ロギングを有効化
mlflow.langchain.autolog()
# Databricks Function Clientを初期化し、UC関数クライアントとしてセット
client = DatabricksFunctionClient(disable_notice=True, suppress_warnings=True)
set_uc_function_client(client)
############################################
# LLMインスタンスの作成
############################################
llm = ChatDatabricks(
endpoint=config["llm_endpoint"]
)
# システムプロンプト(エージェントの振る舞いを制御)
system_prompt = "あなたはDatabricksラボのカスタマーサクセススペシャリストです。ユーザーからの製品に関する質問に対し、必要な情報はツールを使って取得し、ユーザーが製品を十分に理解できるようサポートしてください。お客様の興味を引くであろう情報を可能な限り盛り込んで、すべてのやり取りで価値を提供することを心がけてください。"
#system_prompt = "あなたはDatabricksラボのカスタマーサクセススペシャリストです。ユーザーからの製品に関する質問に対し、必要な情報はツールを使って取得し、質問に対してのみ簡潔に答え、架空の機能や色、一般的なコメントは加えないでください。マーケティング的な表現や余計な背景説明は不要です。"
###############################################################################
## エージェント用のツールを定義。これにより、テキスト生成以外のデータ取得やアクションが可能になる
## さらに多くのツールの作成や使用例については
## https://docs.databricks.com/generative-ai/agent-framework/agent-tool.html を参照
###############################################################################
############################################
# ツールの作成
############################################
def create_tools() -> List[BaseTool]:
"""環境変数の設定に基づいてツールを作成"""
tools = []
# Vector Searchツールの追加
if VS_NAME:
try:
vs_tool = VectorSearchRetrieverTool(
index_name=VS_NAME,
tool_name="search_product_docs",
num_results=3, # VSから3件の文書を取得
#num_results=1, # VSから1件の文書を取得
tool_description="このツールを使用して製品ドキュメントを検索します。"
)
tools.append(vs_tool)
print(f"Vector Searchツールを追加: {VS_NAME}")
except Exception as e:
print(f"Warning: Vector Searchツール {VS_NAME} をロードできませんでした: {e}")
# UC関数ツールの追加
if UC_TOOL_NAMES:
try:
uc_toolkit = UCFunctionToolkit(function_names=UC_TOOL_NAMES)
tools.extend(uc_toolkit.tools)
print(f"UC関数ツールを追加: {UC_TOOL_NAMES}")
except Exception as e:
print(f"Warning: UCツール {UC_TOOL_NAMES} を追加できませんでした: {e}")
return tools
# ツールを作成
# MLflowのロギングでも使用
tools = create_tools()
#####################
## エージェントのロジックを定義
#####################
def create_tool_calling_agent(
model: LanguageModelLike,
tools: Union[Sequence[BaseTool], ToolNode],
system_prompt: Optional[str] = None,
) -> CompiledGraph:
# モデルにツールをバインド
model = model.bind_tools(tools)
# 次にどのノードに進むかを決定する関数を定義
def should_continue(state: ChatAgentState):
messages = state["messages"]
last_message = messages[-1]
# 関数呼び出しがあれば継続、なければ終了
if last_message.get("tool_calls"):
return "continue"
else:
return "end"
# システムプロンプトを先頭に付与する前処理
if system_prompt:
preprocessor = RunnableLambda(
lambda state: [{"role": "system", "content": system_prompt}]
+ state["messages"]
)
else:
preprocessor = RunnableLambda(lambda state: state["messages"])
model_runnable = preprocessor | model
# モデル呼び出し用の関数
def call_model(
state: ChatAgentState,
config: RunnableConfig,
):
response = model_runnable.invoke(state, config)
return {"messages": [response]}
# カスタムツール実行関数
def execute_tools(state: ChatAgentState):
messages = state["messages"]
last_message = messages[-1]
# tool_callsを取得
tool_calls = last_message.get("tool_calls", [])
if not tool_calls:
return {"messages": []}
# ツールを実行
tool_outputs = []
for tool_call in tool_calls:
tool_name = tool_call.get("function", {}).get("name") if isinstance(tool_call, dict) else tool_call.function.name
tool_args = tool_call.get("function", {}).get("arguments") if isinstance(tool_call, dict) else tool_call.function.arguments
tool_id = tool_call.get("id") if isinstance(tool_call, dict) else tool_call.id
# ツールを見つけて実行
tool_result = None
for tool in tools:
if tool.name == tool_name:
try:
# 引数をパース
import json
if isinstance(tool_args, str):
args = json.loads(tool_args)
else:
args = tool_args
# ツールを実行
result = tool.invoke(args)
tool_result = str(result)
except Exception as e:
tool_result = f"Error executing tool: {str(e)}"
break
if tool_result is None:
tool_result = f"Tool {tool_name} not found"
# ツール実行結果のメッセージを作成
tool_message = {
"role": "tool",
"content": tool_result,
"tool_call_id": tool_id,
"name": tool_name
}
tool_outputs.append(tool_message)
return {"messages": tool_outputs}
# LangGraphのワークフローを構築
workflow = StateGraph(ChatAgentState)
workflow.add_node("agent", RunnableLambda(call_model))
workflow.add_node("tools", execute_tools)
workflow.set_entry_point("agent")
workflow.add_conditional_edges(
"agent",
should_continue,
{
"continue": "tools",
"end": END,
},
)
workflow.add_edge("tools", "agent")
return workflow.compile()
# LangGraphChatAgentクラス(MLflow推論用ラッパー)
class LangGraphChatAgent(ChatAgent):
def __init__(self, agent: CompiledStateGraph):
self.agent = agent
def _convert_messages_to_dict(self, messages: list[ChatAgentMessage]) -> list[dict]:
"""ChatAgentMessageを辞書形式に変換(修正版)"""
converted = []
# messagesがNoneまたは空の場合の処理
if not messages:
return converted
for msg in messages:
try:
if msg is None:
print("Warning: None message encountered")
continue
# ChatAgentMessageオブジェクトを辞書に変換
if hasattr(msg, 'dict'):
msg_dict = msg.dict()
elif isinstance(msg, dict):
msg_dict = msg
else:
print(f"Warning: Unexpected message type: {type(msg)}")
continue
# toolロールのメッセージの場合、contentが空の場合は処理
if msg_dict.get("role") == "tool":
# contentが空またはNoneの場合、デフォルト値を設定
if not msg_dict.get("content"):
msg_dict["content"] = "Tool execution completed"
# tool_call_idが必要な場合は設定
if "tool_call_id" not in msg_dict and msg_dict.get("id"):
msg_dict["tool_call_id"] = msg_dict["id"]
converted.append(msg_dict)
except Exception as e:
print(f"Error converting message: {e}, Message: {msg}")
continue
return converted
def predict(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> ChatAgentResponse:
# 入力メッセージを辞書形式に変換
request = {"messages": self._convert_messages_to_dict(messages)}
messages = []
# LangGraphのストリームからメッセージを収集(元のコードに近い形で)
try:
for event in self.agent.stream(request, stream_mode="updates"):
if event and isinstance(event, dict):
for node_data in event.values():
if node_data and isinstance(node_data, dict) and "messages" in node_data:
for msg in node_data.get("messages", []):
if msg is None:
continue
# メッセージオブジェクトを辞書に変換
if hasattr(msg, 'dict'):
msg_dict = msg.dict()
elif isinstance(msg, dict):
msg_dict = msg
else:
print(f"Warning: Unexpected message type: {type(msg)}")
continue
# toolメッセージの内容を確認
if msg_dict.get("role") == "tool":
# contentがない場合はデフォルト値を設定
if not msg_dict.get("content") and not msg_dict.get("tool_calls"):
msg_dict["content"] = "Tool executed successfully"
# tool_call_idが必要な場合
if "tool_call_id" not in msg_dict and "id" in msg_dict:
msg_dict["tool_call_id"] = msg_dict["id"]
try:
messages.append(ChatAgentMessage(**msg_dict))
except Exception as e:
print(f"Warning: Failed to create ChatAgentMessage: {e}")
print(f"Message data: {msg_dict}")
except Exception as e:
print(f"Error in predict method: {e}")
import traceback
traceback.print_exc()
return ChatAgentResponse(messages=messages)
def predict_stream(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> Generator[ChatAgentChunk, None, None]:
# 入力メッセージを辞書形式に変換
request = {"messages": self._convert_messages_to_dict(messages)}
# ストリームで逐次応答を生成
try:
for event in self.agent.stream(request, stream_mode="updates"):
if event and isinstance(event, dict):
for node_data in event.values():
if node_data and isinstance(node_data, dict) and "messages" in node_data:
for msg in node_data.get("messages", []):
if msg is None:
continue
# メッセージオブジェクトを辞書に変換
if hasattr(msg, 'dict'):
msg_dict = msg.dict()
elif isinstance(msg, dict):
msg_dict = msg
else:
print(f"Warning: Unexpected message type in stream: {type(msg)}")
continue
# toolメッセージの内容を確認
if msg_dict.get("role") == "tool":
# contentがない場合はデフォルト値を設定
if not msg_dict.get("content") and not msg_dict.get("tool_calls"):
msg_dict["content"] = "Tool executed successfully"
# tool_call_idが必要な場合
if "tool_call_id" not in msg_dict and "id" in msg_dict:
msg_dict["tool_call_id"] = msg_dict["id"]
try:
yield ChatAgentChunk(**{"delta": msg_dict})
except Exception as e:
print(f"Warning: Failed to create ChatAgentChunk: {e}")
continue
except Exception as e:
print(f"Error in predict_stream method: {e}")
import traceback
traceback.print_exc()
return
# エージェントオブジェクトを作成し、mlflow.models.set_model()で推論時に使用するエージェントとして指定
agent = create_tool_calling_agent(llm, tools, system_prompt)
AGENT = LangGraphChatAgent(agent)
mlflow.models.set_model(AGENT)
ドライバープログラム: driver
こちらのノートブックで、上のエージェントの実装の動作確認や評価、デプロイを行うことになります。
%pip install -U -qqqq mlflow-skinny[databricks] langgraph==0.3.4 databricks-langchain databricks-agents uv
dbutils.library.restartPython()
%run ../config
ハンズオンで使用するカタログ: takaakiyayoi_catalog
ハンズオンで使用する共有スキーマ(データを格納): agents_lab
環境変数を通じたエージェントの設定
環境変数からagent.py
のパラメータを設定します。
from databricks.sdk import WorkspaceClient
import os
import re
# ワークスペースクライアントを使用して現在のユーザーに関する情報を取得
w = WorkspaceClient()
user_email = w.current_user.me().emails[0].value
username = user_email.split('@')[0]
username = re.sub(r'[^a-zA-Z0-9_]', '_', username) # 特殊文字をアンダースコアに置換
# スキーマを指定します
user_schema_name = f"agents_lab_{username}" # ユーザーごとのスキーマ
# LLMエンドポイント名
os.environ["LLM_ENDPOINT_NAME"] = "databricks-claude-3-7-sonnet"
# UC関数ツール
os.environ["UC_TOOL_NAMES"] = f"{catalog_name}.{user_schema_name}.*"
# Vector Search名
os.environ["VS_NAME"] = f"{catalog_name}.{system_schema_name}.product_docs_index"
print("環境変数を設定しました:")
print(f"LLM_ENDPOINT_NAME: {os.environ.get('LLM_ENDPOINT_NAME')}")
print(f"UC_TOOL_NAMES: {os.environ.get('UC_TOOL_NAMES')}")
print(f"VS_NAME: {os.environ.get('VS_NAME')}")
環境変数を設定しました:
LLM_ENDPOINT_NAME: databricks-claude-3-7-sonnet
UC_TOOL_NAMES: takaakiyayoi_catalog.agents_lab_takaaki_yayoi.*
VS_NAME: takaakiyayoi_catalog.agents_lab.product_docs_index
エージェントが動作することを確認するためのクイックなテスト
from agent import AGENT
AGENT.predict({"messages": [{"role": "user", "content": "Soundwave X5 Pro ヘッドフォンのトラブルシューティングのコツを教えてください。"}]})
前編で定義したツールも試します。
AGENT.predict({"messages": [{"role": "user", "content": "今日の日付は"}]})
エージェントのグラフ構造も確認しておきます。
from IPython.display import Image, display
# エージェントのグラフ構造を可視化
display(Image(AGENT.agent.get_graph().draw_mermaid_png()))
agent.py
をMLflowモデルとしてログに記録する
agent.py
のコードとしてエージェントをログに記録します。詳細はMLflow - コードからのモデルを参照してください。
# デプロイ時に自動認証パススルーを指定するためのDatabricksリソースを決定
import mlflow
from agent import tools, LLM_ENDPOINT_NAME
from databricks_langchain import VectorSearchRetrieverTool
from mlflow.models.resources import DatabricksFunction, DatabricksServingEndpoint
from unitycatalog.ai.langchain.toolkit import UnityCatalogTool
resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME)]
for tool in tools:
if isinstance(tool, VectorSearchRetrieverTool):
resources.extend(tool.resources)
elif isinstance(tool, UnityCatalogTool):
resources.append(DatabricksFunction(function_name=tool.uc_function_name))
input_example = {
"messages": [
{
"role": "user",
"content": "Aria Modern Bookshelfの利用可能な色オプションは何ですか?"
}
]
}
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
name="agent",
python_model="agent.py",
input_example=input_example,
resources=resources,
extra_pip_requirements=[
"databricks-connect"
]
)
# モデルをロードし、予測関数を作成
logged_model_uri = f"runs:/{logged_agent_info.run_id}/agent"
loaded_model = mlflow.pyfunc.load_model(logged_model_uri)
def predict_wrapper(query):
# チャット形式モデル用の入力を整形
model_input = {
"messages": [{"role": "user", "content": query}]
}
response = loaded_model.predict(model_input)
messages = response['messages']
return messages[-1]['content']
エージェントをエージェント評価で評価する
評価データセットのリクエストや期待される応答を編集し、エージェントを反復しながら評価を実行し、mlflowを活用して計算された品質指標を追跡できます。
import pandas as pd
data = {
"request": [
"Aria Modern Bookshelfの利用可能な色オプションは何ですか?",
"Aurora Oak Coffee Tableを傷つけずに掃除するにはどうすればよいですか?",
"BlendMaster Elite 4000は使用後にどのように掃除すればよいですか?",
"Flexi-Comfort Office Deskは何色展開ですか?",
"StormShield Pro メンズ防水ジャケットのサイズ展開は?"
],
"expected_facts": [
[
"Aria Modern Bookshelfはナチュラルオーク仕上げで利用可能です。",
"Aria Modern Bookshelfはブラック仕上げで利用可能です。",
"Aria Modern Bookshelfはホワイト仕上げで利用可能です。"
],
[
"柔らかく少し湿らせた布で掃除してください。",
"研磨剤入りのクリーナーは使用しないでください。"
],
[
"BlendMaster Elite 4000のジャーはすすいでください。",
"ぬるま湯ですすいでください。",
"使用後は毎回掃除してください。"
],
[
"Flexi-Comfort Office Deskは3色展開です。"
],
[
"StormShield Pro メンズ防水ジャケットのサイズはS、M、L、XL、XXLです。"
]
]
}
eval_dataset = pd.DataFrame(data)
display(eval_dataset)
LLMジャッジであるスコアラーを定義します。
from mlflow.genai.scorers import Guidelines, Safety
import mlflow.genai
# 評価用データセットを作成
eval_data = []
for request, facts in zip(data["request"], data["expected_facts"]):
eval_data.append({
"inputs": {
"query": request # 関数の引数と一致させる
},
"expected_response": "\n".join(facts)
})
# 評価用スコアラーを定義
# LLMジャッジが応答を評価するためのガイドライン
# 製品情報評価に特化したカスタムスコアラーを定義
scorers = [
Guidelines(
guidelines="""応答にはすべての期待される事実が含まれている必要があります:
- 該当する場合はすべての色やサイズを列挙する(部分的なリストは不可)
- 該当する場合は正確な仕様を記載する(例:「5 ATM」など曖昧な表現は不可)
- 掃除手順を尋ねられた場合はすべての手順を含める
いずれかの事実が欠落または誤っている場合は不合格とする。""",
name="completeness_and_accuracy",
),
Guidelines(
guidelines="""応答は明確かつ直接的でなければなりません:
- 質問に正確に答える
- 選択肢はリスト形式、手順はステップ形式で記載
- マーケティング的な表現や余計な背景説明は不要
- 簡潔かつ完全であること。""",
name="relevance_and_structure",
),
Guidelines(
guidelines="""応答は話題から逸脱しないこと:
- 質問された製品のみについて回答する
- 架空の機能や色を追加しない
- 一般的なアドバイスは含めない
- リクエストに記載された製品名を正確に使用すること。""",
name="product_specificity",
),
]
評価を実行します。
print("評価を実行中...")
with mlflow.start_run():
results = mlflow.genai.evaluate(
data=eval_data,
predict_fn=predict_wrapper,
scorers=scorers,
)
MLflowのエクスペリメントに評価結果が記録されます。一部評価に失敗しています。
agent.pyファイルに戻り、プロンプトを変更してマーケティングの誇張を減らしましょう。
ここからはエージェントの実装やスコアラーの評価基準を突き合わせて、改善を繰り返すことになります。ここでは、説明をシンプルにするためにエージェントのシステムプロンプトを確認してみてください。修正を行うことでスコアラーの評価基準に合格する確率が上昇します。実際には、リトリーバーの設定など様々な観点で改善を図る必要があります。
エージェントの実装を見直したら再度評価を行います。
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
name="agent",
python_model="agent.py",
input_example=input_example,
resources=resources,
extra_pip_requirements=[
"databricks-connect"
]
)
# モデルをロードし、予測関数を作成
logged_model_uri = f"runs:/{logged_agent_info.run_id}/agent"
loaded_model = mlflow.pyfunc.load_model(logged_model_uri)
def predict_wrapper(query):
# チャット形式モデル用の入力を整形
model_input = {
"messages": [{"role": "user", "content": query}]
}
response = loaded_model.predict(model_input)
messages = response['messages']
return messages[-1]['content']
print("評価を実行中...")
with mlflow.start_run():
results = mlflow.genai.evaluate(
data=eval_data,
predict_fn=predict_wrapper,
scorers=scorers,
)
評価結果が大幅に改善されました。一番下にFailが残っているのは、そもそもこの製品がVector Search Indexに含まれていないことによります。
モデルをUnity Catalogに登録する
以下の catalog
、schema
、model_name
を更新して、MLflowモデルをUnity Catalogに登録します。
mlflow.set_registry_uri("databricks-uc")
# UCモデル用のカタログ、スキーマ、モデル名を定義
model_name = "product_agent"
UC_MODEL_NAME = f"{catalog_name}.{user_schema_name}.{model_name}"
# モデルをUCに登録
uc_registered_model_info = mlflow.register_model(model_uri=logged_agent_info.model_uri, name=UC_MODEL_NAME)
モデルバージョンにアクセスし、依存関係タブでエージェントのリネージを確認してみましょう。
from IPython.display import display, HTML
# DatabricksのホストURLを取得
workspace_url = spark.conf.get('spark.databricks.workspaceUrl')
# 作成したエージェントへのHTMLリンクを作成
html_link = f'<a href="https://{workspace_url}/explore/data/models/{catalog_name}/{user_schema_name}/product_agent" target="_blank">登録済みエージェントをUnity Catalogで表示</a>'
display(HTML(html_link))
エージェントのデプロイ
上で使用した環境変数を設定してエージェントをモデルサービングエンドポイントにデプロイします。
from databricks import agents
# 環境変数を辞書として定義
environment_vars = {
"LLM_ENDPOINT_NAME": os.environ["LLM_ENDPOINT_NAME"],
"UC_TOOL_NAMES": os.environ["UC_TOOL_NAMES"],
"VS_NAME": os.environ["VS_NAME"],
}
# モデルをレビューアプリおよびモデルサービングエンドポイントにデプロイ
agents.deploy(
UC_MODEL_NAME,
uc_registered_model_info.version,
tags={"endpointSource": "Agent Lab"},
environment_vars=environment_vars,
timeout=900, # 15分に延長
)
数分持つとモデルサービングエンドポイントがReadyになるはずです。
今後の方向性
お疲れ様でした!前編後編を通じて、エージェントが用いるツールの準備、エージェントのプロトタイピング、エージェントの構築、評価と改善、デプロイメントまで体験したことになります。是非ご自身の要件に応じたエージェントを構築してみてください!