はじめに
先日、MLflow 3がリリースされました。
かなりの大型アップデートであり、モデルの構造含めて変更が入っています。
mlflow3自体はリリース候補版の際にも多少試してみていましたが、ドキュメントが初めて気づいた変更の一つにResponsesAgent
があります。
上記ドキュメントを一部邦訳すると、以下のようになります。
ResponsesAgent
は、PythonModel
のサブクラスであり、エージェントモデルを作成するためのフレームワークに依存しない方法を提供します。ResponsesAgent
を使用してエージェントを開発することで、以下の利点があります。
- ツール呼び出しの中間出力を含む、複数の出力メッセージの返却をサポート
- マルチエージェントシナリオのサポート
- MLflowのロギング、トレーシング、モデルサービングとの互換性を確保
- OpenAI Responses APIとの互換性を確保し、OpenAIのresponsesクライアントや他のダウンストリームUI/アプリケーションと互換性を持たせる
ChatModel
やChatAgent
の代わりにResponsesAgent
の利用を推奨します。 >ResponsesAgent
はChatAgent
のすべての利点を持ち、さらにアノテーションなどの追加機能もサポートしています。
※ 2025/6/12時点のドキュメントを機械翻訳したもののリンクも貼っておきます。
MLflow 2.xのときにあったChatAgent
やChatModel
を置き換える新しいカスタムモデルインターフェースのようです。
名前の通りOpenAIの Responses APIと互換のインターフェースを提供します。
ResponsesAgent
の登場でChatAgent
等は利用が非推奨になりました。MLflow3以降はこちらを使ってエージェントを実装することが推奨のようです。
勉強のために、上記内容をベースに、内部的にはLangGraphでToolCallingを実行するエージェントを作成してみます。
開発はDatabricks Free Editionを利用しました。
今回のコードはFree Editionで全て動作します。
Step1. 準備
ノートブックを作成し、必要なパッケージをインストールします。
%pip install -U mlflow[databricks]>=3.1 unitycatalog-ai[databricks] unitycatalog-langchain[databricks] databricks-agents databricks-langchain langgraph
%restart_python
Step2. カスタムResponsesAgentを定義
最重要ポイントです。
まずはコードを全て掲載します。解説はコードの後に記載しています。
%%writefile simple_responses_agent.py
import mlflow
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
ResponsesAgentRequest,
ResponsesAgentResponse,
ResponsesAgentStreamEvent,
)
from mlflow.entities import SpanType
from typing import Generator, Any
from databricks_langchain import ChatDatabricks, UCFunctionToolkit
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import (
BaseMessage,
AIMessage,
ToolMessage,
AIMessageChunk,
)
from langchain_core.tools import BaseTool, tool
from functools import reduce
# ダミーツール
@tool
def get_weather(city: str) -> str:
"""指定された都市の天気を取得します
Args:
city (str): 都市名
Returns:
str: 天気情報
"""
return f"It's always sunny in {city}!"
# Tracingの有効化
mlflow.langchain.autolog()
# Agent
class SimpleResponsesAgent(ResponsesAgent):
def __init__(self, model, tools: list[BaseTool]):
"""SimpleResponsesAgentの初期化
Args:
model: 使用するモデル
tools (list[BaseTool]): 使用するツールのリスト
"""
self.model = model
self.tools = tools
@mlflow.trace(span_type=SpanType.AGENT)
def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
"""リクエストに基づいて予測を行います
Args:
request (ResponsesAgentRequest): 予測リクエスト
Returns:
ResponsesAgentResponse: 予測結果のレスポンス
"""
events = [
event
for event in self.predict_stream(request)
if event.type == "response.output_item.done"
]
outputs = [event.item for event in events]
# usage総量を計算
usages = [event.usage for event in events]
total_usage = {
"input_tokens_details": {"cached_tokens": 0},
"output_tokens_details": {"reasoning_tokens": 0},
**reduce(
lambda x, y: {k: x.get(k, 0) + y.get(k, 0) for k in set(x) | set(y)},
usages,
),
}
return ResponsesAgentResponse(output=outputs, usage=total_usage)
@mlflow.trace(span_type=SpanType.AGENT)
def predict_stream(
self, request: ResponsesAgentRequest
) -> Generator[ResponsesAgentStreamEvent, None, None]:
"""ストリームモードで予測を行います
Args:
request (ResponsesAgentRequest): 予測リクエスト
Yields:
ResponsesAgentStreamEvent: ストリームイベント
"""
messages, params = self._convert_request_to_lc_request(request)
react_agent = create_react_agent(self.model.bind(**params), tools=self.tools)
for chunk in react_agent.stream({"messages": messages}, stream_mode="updates"):
for value in chunk.values():
messages = value.get("messages", [])
responses = self._convert_lc_messages_to_response(messages)
for response in responses:
yield response
@mlflow.trace(span_type=SpanType.PARSER)
def _convert_request_to_lc_request(
self, request: ResponsesAgentRequest
) -> (list[BaseMessage], dict[str, Any]):
"""リクエストをLangChainのメッセージおよびパラメータ形式に変換します
Args:
request (ResponsesAgentRequest): 変換するリクエスト
Returns:
tuple: メッセージリスト、パラメータ辞書
"""
lc_request = request.model_dump_compat(exclude_none=True)
custom_inputs = lc_request.pop("custom_inputs", {})
# custom_inputsは通常のパラメータとして展開
lc_request.update(custom_inputs)
messages = lc_request.pop("input")
# LangChainで有効なパラメータのみに限定
valid_params = [
"temperature",
"max_output_tokens",
"top_p",
"top_k",
]
params = {k: v for k, v in lc_request.items() if k in valid_params}
if "max_output_tokens" in params:
params["max_tokens"] = params.pop("max_output_tokens")
return messages, params
@mlflow.trace(span_type=SpanType.PARSER)
def _convert_lc_messages_to_response(
self, messages: list[BaseMessage]
) -> list[ResponsesAgentStreamEvent]:
"""LangChainメッセージをレスポンス出力に変換します
Args:
messages (list[BaseMessage]): 変換するメッセージリスト
Returns:
list[ResponsesAgentStreamEvent]: レスポンス出力のリスト
"""
def _create_response_agent_stream_event(
item, usage, metadata
) -> ResponsesAgentStreamEvent:
return ResponsesAgentStreamEvent(
type="response.output_item.done",
item=item,
usage=_convert_lc_usage_to_openai_usage(usage),
metadata=metadata,
)
def _convert_lc_usage_to_openai_usage(usage: dict[str, int]) -> dict[str, int]:
return {
"input_tokens": usage.get("prompt_tokens", 0),
"output_tokens": usage.get("completion_tokens", 0),
"total_tokens": usage.get("total_tokens", 0),
}
outputs = []
for message in messages:
if isinstance(message, ToolMessage):
item = self.create_function_call_output_item(
output=message.content,
call_id=message.tool_call_id,
)
metadata = message.response_metadata
usage = metadata.pop("usage", {})
outputs.append(
_create_response_agent_stream_event(item, usage, metadata)
)
elif (
isinstance(message, (AIMessage, AIMessageChunk)) and message.tool_calls
):
metadata = message.response_metadata
usage = metadata.pop("usage", {})
for tool_call in message.tool_calls:
item = self.create_function_call_item(
id=message.id,
call_id=tool_call.get("id"),
name=tool_call.get("name"),
arguments=str(tool_call.get("args")),
)
outputs.append(
_create_response_agent_stream_event(item, usage, metadata)
)
# 1件目のみusageを設定
usage = {}
elif isinstance(message, (AIMessage, AIMessageChunk)):
item = self.create_text_output_item(
text=message.content,
id=message.id,
)
metadata = message.response_metadata
usage = metadata.pop("usage", {})
outputs.append(
_create_response_agent_stream_event(item, usage, metadata)
)
else:
raise ValueError(f"Unknown message: {message}")
return outputs
# Databricksネイティブのllama-3-1-405b-instructを利用
LLM_ENDPOINT_NAME = "databricks-meta-llama-3-1-405b-instruct"
llm = ChatDatabricks(model=LLM_ENDPOINT_NAME)
# 利用可能なツールとして、get_weather関数とUnity Catalogのsystem.ai配下の関数を設定
func_name = f"system.ai.python_exec"
uc_toolkit = UCFunctionToolkit(function_names=[func_name])
LC_TOOLS = [get_weather] + uc_toolkit.tools
# mlflowにエージェントを設定
agent = SimpleResponsesAgent(model=llm, tools=LC_TOOLS)
mlflow.models.set_model(agent)
大事なポイントは、クラスSimpleResponsesAgent
がmlflowの新しいクラスResponsesAgent
を継承しているところです。
これによって、predict
およびpredict_stream
メソッドが従来と異なるデータタイプResponsesAgentRequest
のパラメータを取ります。
ResponsesAgentRequest
はOpenAI Reponses APIの入力パラメータと互換を持つ入力情報を受け取るクラスです。
また、エージェントの出力結果としては、こちらもOpenAI Reponses APIの出力と互換を持つResponsesAgentResponse
クラスもしくはResponsesAgentStreamEvent
クラスにデータを詰めて返すことになります。
この対応によって、入出力におけるToolCallingの対応やチャット履歴の入出力が可能になりました。
ResponsesAgent
内でOpenAI Clientを利用する場合は、上記の入出力をそのまま利用することができます。
今回はLangGraphでエージェント処理を構築する関係上、入出力の変換処理を実装しています。(コード的には一番そこが長い処理になっています)
処理の実態としては、predict_stream
の中でLangGraphのcreate_react_agent
を実行してツールを実行するReActエージェントを作成しています。
ReActエージェントに与えるLLMはDatabricks Model ServingでホスティングされているLlama4 3.1 405B、ツールはダミーのget_wheather
関数とsystem.ai.python_exec
Unity Catalog関数を利用しています。
軽く動作試験をしてみます。
from simple_responses_agent import (
SimpleResponsesAgent,
ResponsesAgentRequest,
agent,
)
input = {
"input": [{"role": "user", "content": "what is the weather in Tokyo?"}],
"context": {"conversation_id": "123", "user_id": "456"},
"max_output_tokens": 100,
"top_p": 0.8,
"temperature": 0.1,
}
print(agent.predict(ResponsesAgentRequest(**input)))
input = {
"input": [{"role": "user", "content": "what is 4*3 in python"}],
"context": {"conversation_id": "123", "user_id": "456"},
"top_p": 0.0,
}
for event in agent.predict_stream(ResponsesAgentRequest(**input)):
print(event)
MLflow Tracingに処理結果が記録されました。
結果を見るに、適切にツールが呼び出され、回答が作られているようです。
これまで通りpredict
もしくはpredict_stream
を使えばいいので違和感はありませんね。
Step3. エージェントのロギング
エージェントをMLflowにロギングします。
mlflow3になったことによる変化点として、log_model
の仕様が一部変更になりました。
artifact_path
の使用は非推奨となり、name
パラメータでモデル名を指定することになります。
import mlflow
from mlflow.models.resources import (
DatabricksServingEndpoint,
DatabricksFunction,
)
from simple_responses_agent import LLM_ENDPOINT_NAME
resources = [
DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
DatabricksFunction(function_name="system.ai.python_exec"),
]
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
python_model="agents/simple_responses_agent.py",
name="simple_responses_agent",
pip_requirements=[
"mlflow>=3.1.0",
"langgraph==0.4.8",
"databricks-langchain==0.5.1",
"unitycatalog-langchain==0.2.0",
"unitycatalog-ai==0.3.1",
"protobuf==4.25.8",
],
resources=resources,
)
ロギングしたエージェントをロードして利用してみます。
import mlflow
from pprint import pprint
model_uri = logged_agent_info.model_uri
agent = mlflow.pyfunc.load_model(model_uri)
input = {
"input": [
{"role": "user", "content": "Hi"},
{"role": "assistant", "content": "Hello."},
{"role": "user", "content": "what is the weather in Tokyo?"},
],
"max_output_tokens": 1000,
"top_p": 0.8,
"temperature": 0.1,
}
for event in agent.predict_stream(input):
pprint(event.get("item"))
print("-----------------")
{'arguments': "{'city': 'Tokyo'}",
'call_id': 'call_14c33bf3-5b79-4479-b30b-206d2f62842c',
'id': 'run--abdf46bc-a98e-44f8-9cdf-bb4862c47d60-0',
'name': 'get_weather',
'type': 'function_call'}
-----------------
{'call_id': 'call_14c33bf3-5b79-4479-b30b-206d2f62842c',
'output': "It's always sunny in Tokyo!",
'type': 'function_call_output'}
-----------------
{'content': [{'text': "It's always sunny in Tokyo.", 'type': 'output_text'}],
'id': 'run--9a53cfb8-980f-46e1-8df8-93ebe3197243-0',
'role': 'assistant',
'type': 'message'}
-----------------
ただしく動いていそうですね。
これでエージェントの保管まで無事に実施できました。
Step4. エージェントのデプロイ
それでは、Databricksの醍醐味であるModel Servingへのデプロイをします。
まずは、Unity Catalogに登録。
import mlflow
catalog = "examples"
schema = "mlflow"
model_name = f"{catalog}.{schema}.simple_responses_agent"
mlflow.set_registry_uri("databricks-uc")
registered_model = mlflow.register_model(model_uri=model_uri, name=model_name)
その後、Mosaic AI Agent Frameworkを使ってModel Servingへデプロイ。
from databricks import agents
deployment = agents.deploy(
registered_model.name, registered_model.version, scale_to_zero=True
)
deployment.query_endpoint
数分待つとデプロイが完了します。
サービングメニュー上でのクエリ実行も問題ありません。
Playground上でも動きました。
まとめ
MLflow3のResponsesAgent
インターフェースを実装してエージェントを作成しました。
従来のChatAgent
に比べて、OpenAI Responses API互換で作れるのがポイントでしょうか。
まだ出来立てというところもあって不具合も多少残っている気がします。
Requestにtoolsを指定する際に入力スキーマとコンフリクトしたりとか。
Databricksモデルサービング側の対応もまだこれからという感じがしますね。
とはいえ、今後はこのインターフェースを使ってカスタムエージェントを作ることが主流になりそうです。
また、Databricks Free Editionを使って無料でこういったエージェント作成できるのはすごいです。無料でToolCallingするエージェントを作ることができる。(作れる数の制限とかはあるけど)いい時代になったものです。
Free EditionでできるLLM関連の入門記事とか書こうかしら。。。
その他参考