1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MLflow3で導入されたResponsesAgentでLangGraphを使ったToolCallingエージェントを作成する

Last updated at Posted at 2025-06-14

はじめに

先日、MLflow 3がリリースされました。

かなりの大型アップデートであり、モデルの構造含めて変更が入っています。
mlflow3自体はリリース候補版の際にも多少試してみていましたが、ドキュメントが初めて気づいた変更の一つにResponsesAgentがあります。

上記ドキュメントを一部邦訳すると、以下のようになります。

ResponsesAgentは、PythonModelのサブクラスであり、エージェントモデルを作成するためのフレームワークに依存しない方法を提供します。ResponsesAgentを使用してエージェントを開発することで、以下の利点があります。

  • ツール呼び出しの中間出力を含む、複数の出力メッセージの返却をサポート
  • マルチエージェントシナリオのサポート
  • MLflowのロギング、トレーシング、モデルサービングとの互換性を確保
  • OpenAI Responses APIとの互換性を確保し、OpenAIのresponsesクライアントや他のダウンストリームUI/アプリケーションと互換性を持たせる

ChatModelChatAgentの代わりにResponsesAgentの利用を推奨します。 > ResponsesAgentChatAgentのすべての利点を持ち、さらにアノテーションなどの追加機能もサポートしています。

※ 2025/6/12時点のドキュメントを機械翻訳したもののリンクも貼っておきます。

MLflow 2.xのときにあったChatAgentChatModelを置き換える新しいカスタムモデルインターフェースのようです。
名前の通りOpenAIの Responses APIと互換のインターフェースを提供します。
ResponsesAgentの登場でChatAgent等は利用が非推奨になりました。MLflow3以降はこちらを使ってエージェントを実装することが推奨のようです。

勉強のために、上記内容をベースに、内部的にはLangGraphでToolCallingを実行するエージェントを作成してみます。

開発はDatabricks Free Editionを利用しました。
今回のコードはFree Editionで全て動作します。

Step1. 準備

ノートブックを作成し、必要なパッケージをインストールします。

%pip install -U mlflow[databricks]>=3.1 unitycatalog-ai[databricks] unitycatalog-langchain[databricks] databricks-agents databricks-langchain langgraph 

%restart_python

Step2. カスタムResponsesAgentを定義

最重要ポイントです。
まずはコードを全て掲載します。解説はコードの後に記載しています。

%%writefile simple_responses_agent.py

import mlflow
from mlflow.pyfunc import ResponsesAgent
from mlflow.types.responses import (
    ResponsesAgentRequest,
    ResponsesAgentResponse,
    ResponsesAgentStreamEvent,
)
from mlflow.entities import SpanType
from typing import Generator, Any
from databricks_langchain import ChatDatabricks, UCFunctionToolkit
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import (
    BaseMessage,
    AIMessage,
    ToolMessage,
    AIMessageChunk,
)
from langchain_core.tools import BaseTool, tool
from functools import reduce


# ダミーツール
@tool
def get_weather(city: str) -> str:
    """指定された都市の天気を取得します

    Args:
        city (str): 都市名

    Returns:
        str: 天気情報
    """
    return f"It's always sunny in {city}!"


# Tracingの有効化
mlflow.langchain.autolog()

# Agent
class SimpleResponsesAgent(ResponsesAgent):
    def __init__(self, model, tools: list[BaseTool]):
        """SimpleResponsesAgentの初期化

        Args:
            model: 使用するモデル
            tools (list[BaseTool]): 使用するツールのリスト
        """
        self.model = model
        self.tools = tools

    @mlflow.trace(span_type=SpanType.AGENT)
    def predict(self, request: ResponsesAgentRequest) -> ResponsesAgentResponse:
        """リクエストに基づいて予測を行います

        Args:
            request (ResponsesAgentRequest): 予測リクエスト

        Returns:
            ResponsesAgentResponse: 予測結果のレスポンス
        """
        events = [
            event
            for event in self.predict_stream(request)
            if event.type == "response.output_item.done"
        ]
        outputs = [event.item for event in events]
        # usage総量を計算
        usages = [event.usage for event in events]
        total_usage = {
            "input_tokens_details": {"cached_tokens": 0},
            "output_tokens_details": {"reasoning_tokens": 0},
            **reduce(
                lambda x, y: {k: x.get(k, 0) + y.get(k, 0) for k in set(x) | set(y)},
                usages,
            ),
        }

        return ResponsesAgentResponse(output=outputs, usage=total_usage)

    @mlflow.trace(span_type=SpanType.AGENT)
    def predict_stream(
        self, request: ResponsesAgentRequest
    ) -> Generator[ResponsesAgentStreamEvent, None, None]:
        """ストリームモードで予測を行います

        Args:
            request (ResponsesAgentRequest): 予測リクエスト

        Yields:
            ResponsesAgentStreamEvent: ストリームイベント
        """
        messages, params = self._convert_request_to_lc_request(request)
        react_agent = create_react_agent(self.model.bind(**params), tools=self.tools)

        for chunk in react_agent.stream({"messages": messages}, stream_mode="updates"):
            for value in chunk.values():
                messages = value.get("messages", [])
                responses = self._convert_lc_messages_to_response(messages)
                for response in responses:
                    yield response

    @mlflow.trace(span_type=SpanType.PARSER)
    def _convert_request_to_lc_request(
        self, request: ResponsesAgentRequest
    ) -> (list[BaseMessage], dict[str, Any]):
        """リクエストをLangChainのメッセージおよびパラメータ形式に変換します

        Args:
            request (ResponsesAgentRequest): 変換するリクエスト

        Returns:
            tuple: メッセージリスト、パラメータ辞書
        """

        lc_request = request.model_dump_compat(exclude_none=True)
        custom_inputs = lc_request.pop("custom_inputs", {})

        # custom_inputsは通常のパラメータとして展開
        lc_request.update(custom_inputs)
        messages = lc_request.pop("input")

        # LangChainで有効なパラメータのみに限定
        valid_params = [
            "temperature",
            "max_output_tokens",
            "top_p",
            "top_k",
        ]
        params = {k: v for k, v in lc_request.items() if k in valid_params}
        if "max_output_tokens" in params:
            params["max_tokens"] = params.pop("max_output_tokens")

        return messages, params

    @mlflow.trace(span_type=SpanType.PARSER)
    def _convert_lc_messages_to_response(
        self, messages: list[BaseMessage]
    ) -> list[ResponsesAgentStreamEvent]:
        """LangChainメッセージをレスポンス出力に変換します

        Args:
            messages (list[BaseMessage]): 変換するメッセージリスト

        Returns:
            list[ResponsesAgentStreamEvent]: レスポンス出力のリスト
        """

        def _create_response_agent_stream_event(
            item, usage, metadata
        ) -> ResponsesAgentStreamEvent:
            return ResponsesAgentStreamEvent(
                type="response.output_item.done",
                item=item,
                usage=_convert_lc_usage_to_openai_usage(usage),
                metadata=metadata,
            )

        def _convert_lc_usage_to_openai_usage(usage: dict[str, int]) -> dict[str, int]:
            return {
                "input_tokens": usage.get("prompt_tokens", 0),
                "output_tokens": usage.get("completion_tokens", 0),
                "total_tokens": usage.get("total_tokens", 0),
            }

        outputs = []
        for message in messages:
            if isinstance(message, ToolMessage):
                item = self.create_function_call_output_item(
                    output=message.content,
                    call_id=message.tool_call_id,
                )
                metadata = message.response_metadata
                usage = metadata.pop("usage", {})
                outputs.append(
                    _create_response_agent_stream_event(item, usage, metadata)
                )
            elif (
                isinstance(message, (AIMessage, AIMessageChunk)) and message.tool_calls
            ):
                metadata = message.response_metadata
                usage = metadata.pop("usage", {})
                for tool_call in message.tool_calls:
                    item = self.create_function_call_item(
                        id=message.id,
                        call_id=tool_call.get("id"),
                        name=tool_call.get("name"),
                        arguments=str(tool_call.get("args")),
                    )
                    outputs.append(
                        _create_response_agent_stream_event(item, usage, metadata)
                    )
                    # 1件目のみusageを設定
                    usage = {}
            elif isinstance(message, (AIMessage, AIMessageChunk)):
                item = self.create_text_output_item(
                    text=message.content,
                    id=message.id,
                )
                metadata = message.response_metadata
                usage = metadata.pop("usage", {})
                outputs.append(
                    _create_response_agent_stream_event(item, usage, metadata)
                )
            else:
                raise ValueError(f"Unknown message: {message}")
        return outputs


# Databricksネイティブのllama-3-1-405b-instructを利用
LLM_ENDPOINT_NAME = "databricks-meta-llama-3-1-405b-instruct"
llm = ChatDatabricks(model=LLM_ENDPOINT_NAME)

# 利用可能なツールとして、get_weather関数とUnity Catalogのsystem.ai配下の関数を設定
func_name = f"system.ai.python_exec"
uc_toolkit = UCFunctionToolkit(function_names=[func_name])
LC_TOOLS = [get_weather] + uc_toolkit.tools

# mlflowにエージェントを設定
agent = SimpleResponsesAgent(model=llm, tools=LC_TOOLS)
mlflow.models.set_model(agent)

大事なポイントは、クラスSimpleResponsesAgentがmlflowの新しいクラスResponsesAgentを継承しているところです。
これによって、predictおよびpredict_streamメソッドが従来と異なるデータタイプResponsesAgentRequestのパラメータを取ります。
ResponsesAgentRequestはOpenAI Reponses APIの入力パラメータと互換を持つ入力情報を受け取るクラスです。
また、エージェントの出力結果としては、こちらもOpenAI Reponses APIの出力と互換を持つResponsesAgentResponseクラスもしくはResponsesAgentStreamEventクラスにデータを詰めて返すことになります。

この対応によって、入出力におけるToolCallingの対応やチャット履歴の入出力が可能になりました。

ResponsesAgent内でOpenAI Clientを利用する場合は、上記の入出力をそのまま利用することができます。
今回はLangGraphでエージェント処理を構築する関係上、入出力の変換処理を実装しています。(コード的には一番そこが長い処理になっています)

処理の実態としては、predict_streamの中でLangGraphのcreate_react_agentを実行してツールを実行するReActエージェントを作成しています。
ReActエージェントに与えるLLMはDatabricks Model ServingでホスティングされているLlama4 3.1 405B、ツールはダミーのget_wheather関数とsystem.ai.python_execUnity Catalog関数を利用しています。

軽く動作試験をしてみます。

from simple_responses_agent import (
    SimpleResponsesAgent,
    ResponsesAgentRequest,
    agent,
)

input = {
    "input": [{"role": "user", "content": "what is the weather in Tokyo?"}],
    "context": {"conversation_id": "123", "user_id": "456"},
    "max_output_tokens": 100,
    "top_p": 0.8,
    "temperature": 0.1,
}
print(agent.predict(ResponsesAgentRequest(**input)))

input = {
    "input": [{"role": "user", "content": "what is 4*3 in python"}],
    "context": {"conversation_id": "123", "user_id": "456"},
    "top_p": 0.0,
}
for event in agent.predict_stream(ResponsesAgentRequest(**input)):
    print(event)

MLflow Tracingに処理結果が記録されました。
結果を見るに、適切にツールが呼び出され、回答が作られているようです。

image.png

これまで通りpredictもしくはpredict_streamを使えばいいので違和感はありませんね。

Step3. エージェントのロギング

エージェントをMLflowにロギングします。
mlflow3になったことによる変化点として、log_modelの仕様が一部変更になりました。
artifact_pathの使用は非推奨となり、nameパラメータでモデル名を指定することになります。

import mlflow
from mlflow.models.resources import (
    DatabricksServingEndpoint,
    DatabricksFunction,
)
from simple_responses_agent import LLM_ENDPOINT_NAME

resources = [
    DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME),
    DatabricksFunction(function_name="system.ai.python_exec"),
]

with mlflow.start_run():
    logged_agent_info = mlflow.pyfunc.log_model(
        python_model="agents/simple_responses_agent.py",
        name="simple_responses_agent",
        pip_requirements=[
            "mlflow>=3.1.0",
            "langgraph==0.4.8",
            "databricks-langchain==0.5.1",
            "unitycatalog-langchain==0.2.0",
            "unitycatalog-ai==0.3.1",
            "protobuf==4.25.8",
        ],        
        resources=resources,
    )

ロギングしたエージェントをロードして利用してみます。

import mlflow
from pprint import pprint

model_uri = logged_agent_info.model_uri
agent = mlflow.pyfunc.load_model(model_uri)

input = {
    "input": [
        {"role": "user", "content": "Hi"},
        {"role": "assistant", "content": "Hello."},
        {"role": "user", "content": "what is the weather in Tokyo?"},
    ],
    "max_output_tokens": 1000,
    "top_p": 0.8,
    "temperature": 0.1,
}

for event in agent.predict_stream(input):
    pprint(event.get("item"))
    print("-----------------")
出力
{'arguments': "{'city': 'Tokyo'}",
 'call_id': 'call_14c33bf3-5b79-4479-b30b-206d2f62842c',
 'id': 'run--abdf46bc-a98e-44f8-9cdf-bb4862c47d60-0',
 'name': 'get_weather',
 'type': 'function_call'}
-----------------
{'call_id': 'call_14c33bf3-5b79-4479-b30b-206d2f62842c',
 'output': "It's always sunny in Tokyo!",
 'type': 'function_call_output'}
-----------------
{'content': [{'text': "It's always sunny in Tokyo.", 'type': 'output_text'}],
 'id': 'run--9a53cfb8-980f-46e1-8df8-93ebe3197243-0',
 'role': 'assistant',
 'type': 'message'}
-----------------

ただしく動いていそうですね。
これでエージェントの保管まで無事に実施できました。

Step4. エージェントのデプロイ

それでは、Databricksの醍醐味であるModel Servingへのデプロイをします。

まずは、Unity Catalogに登録。

import mlflow

catalog = "examples"
schema = "mlflow"
model_name = f"{catalog}.{schema}.simple_responses_agent"

mlflow.set_registry_uri("databricks-uc")

registered_model = mlflow.register_model(model_uri=model_uri, name=model_name)

その後、Mosaic AI Agent Frameworkを使ってModel Servingへデプロイ。

from databricks import agents

deployment = agents.deploy(
    registered_model.name, registered_model.version, scale_to_zero=True
)
deployment.query_endpoint

数分待つとデプロイが完了します。

image.png

サービングメニュー上でのクエリ実行も問題ありません。

image.png

Playground上でも動きました。

image.png

まとめ

MLflow3のResponsesAgentインターフェースを実装してエージェントを作成しました。
従来のChatAgentに比べて、OpenAI Responses API互換で作れるのがポイントでしょうか。

まだ出来立てというところもあって不具合も多少残っている気がします。
Requestにtoolsを指定する際に入力スキーマとコンフリクトしたりとか。
Databricksモデルサービング側の対応もまだこれからという感じがしますね。
とはいえ、今後はこのインターフェースを使ってカスタムエージェントを作ることが主流になりそうです。

また、Databricks Free Editionを使って無料でこういったエージェント作成できるのはすごいです。無料でToolCallingするエージェントを作ることができる。(作れる数の制限とかはあるけど)いい時代になったものです。
Free EditionでできるLLM関連の入門記事とか書こうかしら。。。

その他参考

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?