風邪をひいてしまい間があいてしまいました。。。
はじめに
前回の記事でも触れていますが、Model Context Protocol(MCP)サーバがいろいろ出ていますね。
今回はそんなMCPサーバのうち、Markitdown-MCPを使ったエージェントを実装してMosaic AI Model Serving上にデプロイしてみます。
ほとんどの進め方は以下記事と同様です。今回もMLflow 3.0rcを使います。
実行はDatabricks on AWS、ノートブックはサーバレスクラスタを利用しました。
コード飛ばして、挙動だけ確認したい方はStep4から確認ください。
Markitdownとは?
以前、下記の自記事内でも利用したことがあるPDFやWordなど様々なファイルをMarkdown形式に変換するパッケージです。軽量で使い勝手が良くて個人的にも好きだったりします。
このパッケージのMCP対応版を今回利用します。
Step1. MCPを使うエージェントを定義
MCPサーバをツールとして利用するエージェントを定義します。
まずはノートブックを作成し、必要なパッケージをインストール。
%pip install langchain-mcp-adapters langgraph databricks-langchain databricks-agents uv rich nest-asyncio markitdown-mcp ffmpeg-python
%pip install --upgrade 'mlflow>=3.0.0rc0' --pre
%restart_python
前回同様、MCPサーバと連携するためのモジュールとしてLangChain MCP Adaptersを利用しています。
また、今回重要なパッケージはmarkitdown-mcp
です。
こちらを使ってエージェントを実装します。
次に、nest_asyncio
パッケージを使って、async.io
をノートブック内で利用可能にします。
import nest_asyncio
from rich import print
nest_asyncio.apply()
では、MCPを使うエージェントを定義します。
MLflowのChatAgentインターフェースを備えたカスタムクラスとして定義しています。
%%writefile markitdown_mcp_agent.py
from typing import Literal, Generator, List, Optional, Any, Dict, Mapping, Union
import uuid
import asyncio
import sys
import mlflow
from databricks_langchain import ChatDatabricks
from langgraph.prebuilt import create_react_agent
from langchain_core.messages import BaseMessage, convert_to_openai_messages
from mlflow.pyfunc import ChatAgent
from mlflow.types.agent import (
ChatAgentChunk,
ChatAgentMessage,
ChatAgentResponse,
ChatContext,
)
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from langchain_mcp_adapters.tools import load_mcp_tools
from functools import reduce
# Tracingの有効化
mlflow.langchain.autolog()
class MarkitdonwMCPChatAgent(ChatAgent):
def __init__(self, model):
"""LangGraphのグラフを指定して初期化"""
self.model = model
def predict(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> ChatAgentResponse:
"""
指定されたチャットメッセージリストを使用して回答を生成する
Args:
messages (list[ChatAgentMessage]): チャットエージェントメッセージのリスト。
context (Optional[ChatContext]): オプションのチャットコンテキスト。
custom_inputs (Optional[dict[str, Any]]): カスタム入力のオプション辞書。
Returns:
ChatAgentResponse: 予測結果を含むChatAgentResponseオブジェクト。
"""
gen_messages = []
usages = []
for c in self.predict_stream(messages, context, custom_inputs):
gen_messages.append(c.delta)
usages.append(c.usage.model_dump(exclude_none=True))
return ChatAgentResponse(messages=gen_messages, usage=self._sum_usages(usages))
def predict_stream(
self,
messages: list[ChatAgentMessage],
context: Optional[ChatContext] = None,
custom_inputs: Optional[dict[str, Any]] = None,
) -> Generator[ChatAgentChunk, None, None]:
"""
指定されたチャットメッセージリストを使用して、非同期的にエージェントを呼び出し、結果を取得します。
Args:
messages (list[ChatAgentMessage]): チャットエージェントメッセージのリスト。
context (Optional[ChatContext]): オプションのチャットコンテキスト。
custom_inputs (Optional[dict[str, Any]]): カスタム入力のオプション辞書。
Returns:
ChatAgentResponse: 予測結果を含むChatAgentResponseオブジェクト。
"""
async def stream_with_mcp_tools(model, server_params, messages: list):
"""MCP Serverと通信して回答を取得する"""
async with stdio_client(server_params, errlog=None) as (read, write):
async with ClientSession(read, write) as session:
# 接続を初期化
await session.initialize()
# ツールを取得
tools = await load_mcp_tools(session)
# エージェントを作成して実行
agent = create_react_agent(model, tools)
async for event in agent.astream(
{"messages": messages}, stream_mode="updates"
):
for v in event.values():
messages = v.get("messages", [])
for msg in messages:
yield ChatAgentChunk(
delta=self._convert_lc_message_to_chat_message(msg),
usage=msg.response_metadata.get("usage", {}),
)
server_params = StdioServerParameters(
command="markitdown-mcp",
args=[],
)
streamer = stream_with_mcp_tools(
self.model,
server_params,
self._convert_messages_to_dict(messages),
)
loop = asyncio.get_event_loop()
while True:
try:
yield loop.run_until_complete(streamer.__anext__())
except StopAsyncIteration:
break
except Exception as e:
# for "Attempted to exit cancel scope in a different task than it was entered in" Exception.
break
def _convert_lc_message_to_chat_message(
self, lc_message: BaseMessage
) -> ChatAgentMessage:
"""LangChainメッセージをChatAgentMessageに変換する。"""
msg = convert_to_openai_messages(lc_message)
if not "id" in msg:
msg.update({"id": str(uuid.uuid4())})
return ChatAgentMessage(**msg)
def _sum_usages(self, usages: list[dict]) -> dict:
"""使用量のリストから使用量を合計する。"""
def add_usages(a: dict, b: dict) -> dict:
pt = "prompt_tokens"
ct = "completion_tokens"
tt = "total_tokens"
return {
pt: a.get(pt, 0) + b.get(pt, 0),
ct: a.get(ct, 0) + b.get(ct, 0),
tt: a.get(tt, 0) + b.get(tt, 0),
}
return reduce(add_usages, usages)
def _convert_lc_message_to_chat_message(
self, lc_message: BaseMessage
) -> ChatAgentMessage:
"""LangChainメッセージをChatAgentMessageに変換する。"""
msg = convert_to_openai_messages(lc_message)
if not "id" in msg:
msg.update({"id": str(uuid.uuid4())})
return ChatAgentMessage(**msg)
def _sum_usages(self, usages: list[dict]) -> dict:
"""使用量のリストから使用量を合計する。"""
def add_usages(a: dict, b: dict) -> dict:
pt = "prompt_tokens"
ct = "completion_tokens"
tt = "total_tokens"
return {
pt: a.get(pt, 0) + b.get(pt, 0),
ct: a.get(ct, 0) + b.get(ct, 0),
tt: a.get(tt, 0) + b.get(tt, 0),
}
return reduce(add_usages, usages, {})
# DatabricksネイティブのClaude 3.7 SonnetをLLMとして利用
LLM_ENDPOINT_NAME = "databricks-claude-3-7-sonnet"
llm = ChatDatabricks(model=LLM_ENDPOINT_NAME)
# エージェントを作成
AGENT = MarkitdonwMCPChatAgent(model=llm)
mlflow.models.set_model(AGENT)
内容は前回記事とほぼ同じであり、predict_stream
内で実行するMCPサーバにMarkitdown-MCPを利用するように設定しています。
Step2. エージェントをロギング
作成したエージェントをMLflow上にロギング(保管)します。
MLflow 3.0(Preview)版を利用しています。
log_model
のパラメータ設定はMLflow 3.0方式であり、現行のMLflow 2.xとは一部パラメータ指定方法が異なります。
import mlflow
from markitdown_mcp_agent import LLM_ENDPOINT_NAME
from mlflow.models.resources import DatabricksServingEndpoint
resources = [DatabricksServingEndpoint(endpoint_name=LLM_ENDPOINT_NAME)]
input_example = {
"messages": [
{
"role": "user",
"content": "https://www.databricks.com/のサイトをMarkdown形式に変換して内容を要約して",
}
]
}
with mlflow.start_run():
logged_agent_info = mlflow.pyfunc.log_model(
# artifact_path ="markitdown_mcp_agent", # for mlflow 2.0
name="markitdown_mcp_agent", # for mlflow 3.0
python_model="markitdown_mcp_agent.py",
input_example=input_example,
pip_requirements=[
"mlflow",
"langgraph==0.3.34",
"langchain-mcp-adapters==0.0.9",
"databricks-langchain==0.4.2",
"markitdown-mcp==0.0.1a3",
"ffmpeg-python==0.2.0",
],
resources=resources,
model_type="agent", # for mlflow 3.0
)
正常終了するとモデルが保管されます。
念のため、デプロイ前の試験も実施します。
import mlflow
mlflow.models.predict(
model_uri=logged_model.model_uri,
input_data={
"messages": [
{
"role": "user",
"content": "https://www.databricks.com/のサイトをMarkdown形式に変換して内容を要約して",
}
]
},
env_manager="uv",
)
エラーなく実行できれば、検証完了です。
Step3. エージェントをデプロイ
ロギングしたエージェントをMosaic AI Model Servingを使ってデプロイします。
まず、Databricks Unity Catalog上にモデルを登録します。
catalog = "training"
schema = "llm"
model_name = "markitdown_mcp_agent"
UC_MODEL_NAME = f"{catalog}.{schema}.{model_name}"
mlflow.set_registry_uri("databricks-uc")
# モデルをUCに登録する
uc_registered_model_info = mlflow.register_model(
model_uri=logged_model.model_uri, name=UC_MODEL_NAME
)
次にMosaic AI Agent Frameworkを使ってデプロイします。
from databricks import agents
agents.deploy(UC_MODEL_NAME, uc_registered_model_info.version, scale_to_zero=True)
しばらく後にデプロイが完了してエージェントが作成されます。
では、実際に使ってみましょう。
Step4. エージェントを使ってみる
DatabricksのPlaygroundを使ってデプロイしたエージェントを実行してみましょう。
Playgroundで作成したエージェントを指定します。
まずは指定したURLのサイトをMarkdown形式に変換して、という指示をしてみましょう。
すると、Markitdown-MCPのツールであるconvert_to_markdown
が実行されました。
最終回答として、Markdownに変換された内容が取得できました。
日本語のプロンプトで聞いたためか、自動で日本語に翻訳もしてくれています。
次にPDFファイルの要約をさせてみましょう。
デジタル庁のサイトにある、「デジタルを活用する未来に向けて」の別冊PDFを使ってみます。
最終出力は以下のようになりました。
こんな形で、手軽にHTMLサイトの内容やWeb上の各種ファイルなどをMarkdownに変換した上で処理を実施することができます。
ユースケースとして
- 海外blogサイトの邦訳&Markdown形式整形
- 公開資料の取得・サマリ
など、エージェント単体でも便利な使い方が出来そうです。
(そして、真価はこれを他のエージェントと連携することにありそうですが、また別の機会に。。。)
まとめ
DatabricksでMarkitdown-MCPを利用するエージェントを作成しました。
MLflow ChatAgentで作る型を容易してしまえば、様々なMCPサーバの利用ができそうですね。
こうやって作成したエージェントをさらにマルチエージェントシステムから利用するともっと面白いことができると思っています。このあたりの仕組化もいろいろ試してみたいですね。