MastraとMCP、ナウいコンビで最高ですね!
ドキュメントを参考に作ると、多分こんな感じになると思います。(私だけだったらごめんなさい)
app
├── api
│ └── chat
│ └── route.ts
├── layout.tsx
└── page.tsx
mastra
├── agents
│ └── index.ts
├── index.ts
└── tools
└── index.ts
6 directories, 6 files
-
mastra
ディレクトリMCPツールとして、ナウいPlaywrightを選択!
mastra/tools/index.tsimport { MCPClient } from "@mastra/mcp"; export const mcp = new MCPClient({ servers: { playwright: { command: "docker", args: ["run", "-i", "--rm", "--init", "mcr.microsoft.com/playwright/mcp"], }, }, });
mastra/agents/index.tsimport { mcp } from "@/mastra/tools"; import { createAmazonBedrock } from "@ai-sdk/amazon-bedrock"; import { fromNodeProviderChain } from "@aws-sdk/credential-providers"; import { Agent } from "@mastra/core/agent"; const bedrock = createAmazonBedrock({ region: "ap-northeast-1", credentialProvider: fromNodeProviderChain(), }); export const myAgent = new Agent({ name: "My Agent", instructions: "You are a helpful assistant.", model: bedrock("apac.amazon.nova-pro-v1:0"), tools: await mcp.getTools(), });
mastra/index.tsimport { Mastra } from "@mastra/core"; import { myAgent } from "./agents"; export const mastra = new Mastra({ agents: { myAgent }, });
-
app
ディレクトリapp/api/chat/route.tsimport { mastra } from "@/mastra"; // Allow streaming responses up to 30 seconds export const maxDuration = 30; export async function POST(req: Request) { const { messages } = await req.json(); const myAgent = mastra.getAgent("myAgent"); const result = await myAgent.stream(messages); return result.toDataStreamResponse(); }
app/page.tsx'use client'; import { useChat } from '@ai-sdk/react'; export default function Chat() { const { messages, input, handleInputChange, handleSubmit } = useChat(); return ( <div className="flex flex-col w-full max-w-md py-24 mx-auto stretch"> {messages.map(message => ( <div key={message.id} className="whitespace-pre-wrap"> {message.role === 'user' ? 'User: ' : 'AI: '} {message.parts.map((part, i) => { switch (part.type) { case 'text': return <div key={`${message.id}-${i}`}>{part.text}</div>; } })} </div> ))} <form onSubmit={handleSubmit}> <input className="fixed dark:bg-zinc-900 bottom-0 w-full max-w-md p-2 mb-8 border border-zinc-300 dark:border-zinc-800 rounded shadow-xl" value={input} placeholder="Say something..." onChange={handleInputChange} /> </form> </div> ); }
完成したので、動作確認です
playwright MCPツールで、Yahoo.co.jpにアクセスし、今日の主要ニュースを調べて。
1つ目のURLにアクセスし、ニュースの詳細を教えて下さい。
トップページを表示した後にリンクをクリックして次のページを取得してくれました。
Playwright、マジ最高。
そしてここでおもむろに別のブラウザ(下図の右側)を立ち上げて
最後にアクセスしたURLを教えて下さい。
と聞くと
なんということでしょう!
先ほど別ブラウザ(=別人)でアクセスしたURLが取得できたではありませんか!!!
これは良くない、良くない、良くない。恥ずかしい。
ということで、この問題への対策の件です。
これまで私が書いた記事を参考にしてくれた方、ごめんなさい。間違った実装を提示してたと思います。
種明かし
実はドキュメントに記載があります。ちょっと前はなかった気がします。
シングルユーザー利用を想定した「Static Configuration」と、マルチユーザー利用を想定した「Dynamic Configuration」があるのですね。
上で紹介した方法は、シングルユーザー向けの実装でしたので、マルチユーザー利用のクライアント・サーバー型では問題が起きてましたということです。
実装をダイナミックに更新
ツールをエージェントに事前セットするのではなく、ツール実行のタイミングで初期化してエージェントへセットする実装に変更します。
こうなります。
-
mastra/tools/index.ts
不要なので削除します -
mastra/agents/index.ts
エージェントの定義にツールを含めませんmastra/agents/index.ts- import { mcp } from "@/mastra/tools"; import { createAmazonBedrock } from "@ai-sdk/amazon-bedrock"; import { fromNodeProviderChain } from "@aws-sdk/credential-providers"; import { Agent } from "@mastra/core/agent"; const bedrock = createAmazonBedrock({ region: "ap-northeast-1", credentialProvider: fromNodeProviderChain(), }); export const myAgent = new Agent({ name: "My Agent", instructions: "You are a helpful assistant.", model: bedrock("apac.amazon.nova-pro-v1:0"), - tools: await mcp.getTools(), });
-
mastra/index.ts
変更ありません -
app/api/chat/route.ts
APIが呼ばれるタイミングでMCPツールを初期化します。 これで、複数のリクエストでツールが使い回されることがありません
(ただし、複数の会話ターンで前回のツールを引き継ぐということはできません)
import { mastra } from "@/mastra";
+ import { MCPClient } from "@mastra/mcp";
// Allow streaming responses up to 30 seconds
export const maxDuration = 30;
export async function POST(req: Request) {
+ const mcp = new MCPClient({
+ servers: {
+ playwright: {
+ command: "docker",
+ args: [
+ "run",
+ "-i",
+ "--rm",
+ "--init",
+ "mcr.microsoft.com/playwright/mcp",
+ ],
+ },
+ },
+ });
const { messages } = await req.json();
const myAgent = mastra.getAgent("myAgent");
- const result = await myAgent.stream(messages);
+ const result = await myAgent.stream(messages, {
+ toolsets: await mcp.getToolsets(),
+ });
return result.toDataStreamResponse();
}
上記ではまだ問題があります。リクエストのたびにMCPサーバーへ接続するのですが、切断処理が入っていません。
mcp.disconnect()
を呼び出す必要があるのですが、これが曲者です。
ストリームで返却する都合上、returnの後ろに処理をかけません。
そのため、try~finallyしてみたのですが、動作上はストリームが終わる前の早い段階でクローズされてしまい、MCPツールが正しく呼べませんでした。
try{
return result.toDataStreamResponse();
} finally {
mcp.disconnect()
}
そこで、Amazon Q Developer CLIに実装してもらったのがこちらです。
期待動作をしましたが、コードの意味は聞かないでください(笑)
import { mastra } from "@/mastra";
import { MCPClient } from "@mastra/mcp";
// Allow streaming responses up to 30 seconds
export const maxDuration = 30;
export async function POST(req: Request) {
// 各リクエストで独立したMCPClientを作成(ユニークなIDを使用)
const requestId = `mcp-${Date.now()}-${Math.random().toString(36).substr(2, 9)}`;
const mcp = new MCPClient({
id: requestId, // メモリリーク防止のためのユニークID
servers: {
playwright: {
command: "docker",
args: [
"run",
"-i",
"--rm",
"--init",
"mcr.microsoft.com/playwright/mcp",
],
},
},
});
// クリーンアップ関数
const cleanup = async () => {
try {
await mcp.disconnect();
console.log(`MCP Client ${requestId} disconnected`);
} catch (error) {
console.error(`Error disconnecting MCP Client ${requestId}:`, error);
}
};
// リクエストがキャンセルされた場合のクリーンアップ
req.signal?.addEventListener('abort', () => {
console.log(`Request ${requestId} aborted, cleaning up...`);
cleanup();
});
try {
const { messages } = await req.json();
const myAgent = mastra.getAgent("myAgent");
const result = await myAgent.stream(messages, {
toolsets: await mcp.getToolsets(),
});
// ストリーミングレスポンスを取得
const response = result.toDataStreamResponse();
// レスポンスのbodyストリームをラップして、完了時にクリーンアップを実行
const originalBody = response.body;
if (originalBody) {
let cleanupCalled = false;
const safeCleanup = async () => {
if (!cleanupCalled) {
cleanupCalled = true;
await cleanup();
}
};
const wrappedStream = new ReadableStream({
start(controller) {
const reader = originalBody.getReader();
const pump = async () => {
try {
while (true) {
const { done, value } = await reader.read();
if (done) {
controller.close();
// ストリーム完了時にクリーンアップ
await safeCleanup();
break;
}
controller.enqueue(value);
}
} catch (error) {
controller.error(error);
// エラー時もクリーンアップ
await safeCleanup();
}
};
pump();
},
cancel() {
// ストリームがキャンセルされた時もクリーンアップ
safeCleanup();
}
});
// 新しいレスポンスを作成
return new Response(wrappedStream, {
headers: response.headers,
status: response.status,
statusText: response.statusText,
});
}
// bodyがない場合は元のレスポンスを返してクリーンアップ
await cleanup();
return response;
} catch (error) {
console.error("Error in chat2 API:", error);
// エラー時もクリーンアップ
await cleanup();
return new Response(
JSON.stringify({ error: "Internal server error" }),
{
status: 500,
headers: { "Content-Type": "application/json" },
}
);
}
}
外部検索とか、ステートレスなツールであれば影響なさそうですが、ツールによっては問題があるよというお話でした。