1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

MCP Tasks Primitive入門 — 長時間エージェントタスクをストリーミングで実装する

1
Last updated at Posted at 2026-03-30

はじめに

AIエージェントが本番環境で動くようになると、必ず壁にぶつかる問題がある。「数分〜数時間かかるタスクをどう扱うか」だ。

従来のMCPでは tools/call を発行するとレスポンスが返るまでコネクションを保持し続ける必要があった。医薬品相互作用分析、大規模コードマイグレーション、マルチエージェントによる深層リサーチ——これらは秒単位どころか時間単位で処理が続く。

そこで登場したのが Tasks Primitive(SEP-1686)だ。2026年2月にリリースされた TypeScript SDK v1.27.0 では、ストリーミングサポートも追加されている。本記事ではこの新しい非同期実行レイヤーの仕組みと実装方法を解説する。

この記事で学べること

  • MCP Tasks Primitive の設計思想と「call-now, fetch-later」パターン
  • タスクのライフサイクル(5状態遷移)とAPIメソッド
  • TypeScript SDK v1.27.0 を使ったクライアント・サーバー実装
  • Python(FastMCP)でのストリーミング対応

対象読者

  • MCPを使ったAIエージェントを構築・運用しているエンジニア
  • 長時間処理をエージェントに組み込みたい開発者
  • MCP v1.27.x へのアップグレードを検討している方

前提環境

  • Node.js v22.x 以上 / Python 3.10 以上
  • @modelcontextprotocol/sdk v1.27.0 以上
  • MCPの基本概念(Tools / Resources / Prompts)の理解

TL;DR

  • Tasks Primitive はMCP第4プリミティブ。tools/call などを非同期化する横断的実行管理レイヤー
  • 「call-now, fetch-later」パターン:リクエスト送信後すぐに taskId が返り、後からポーリングまたはブロッキング取得できる
  • v1.27.0 で elicitationsampling にストリーミングメソッドが追加。Streamable HTTP(SSE)でリアルタイム進捗が届く
  • 後方互換性あり:Tasks非対応サーバーはメタデータを無視して通常の同期処理を継続

MCP Tasks Primitive とは

4つのプリミティブ

MCPはサーバーが提供できる能力を3つのプリミティブで定義してきた。

プリミティブ 役割
Resources ファイル・データベース等のリソース取得
Prompts 再利用可能なプロンプトテンプレート
Tools 実行可能な操作・関数
Tasks(新) 非同期の長時間タスク管理

Tasks は「サーバーが提供する能力」ではなく、既存の3プリミティブを非同期実行するための横断的レイヤーとして設計されている。実行対象となるリクエストは現仕様で3種類に限定されている:

  • tools/call
  • sampling/createMessage
  • elicitation/create

これらのリクエストに _meta フィールドでタスクIDを付与するだけで非同期化できる。プロトコルバージョンネゴシエーションは不要だ。

設計思想: call-now, fetch-later

【従来の問題】
クライアント → tools/call → サーバー
                              ↓ 処理中(数分〜数時間)
クライアント ←───────────── 全て完了後に一括レスポンス
               コネクション保持が必要 ❌

【Tasks Primitive の解決策】
クライアント → tools/call + task_meta → サーバー
クライアント ← taskId(即返却)           ↓ 非同期処理中
               (working)

クライアント → tasks/get (ポーリング)  → サーバー
クライアント ← status: "working"

... 処理完了後 ...

クライアント → tasks/result           → サーバー
クライアント ← 完了結果 ✅

この設計はべき等性も担保している。クライアント側で生成した taskId を使うため、ネットワーク障害時でも安全にリトライできる。

Amazonが設計した実世界ユースケース

SEP-1686はAmazonのSurbhi BansalとLuca Changが提案した。その背景にある実際の用途が設計を理解する上で参考になる:

  • 医薬品相互作用分析 — 数時間単位のデータ処理
  • エンタープライズSDLC自動化 — 複数リポジトリ横断のコードマイグレーション
  • テスト実行プラットフォーム — 数千テストスイートの並列実行
  • ディープリサーチ — 複数エージェントによる並列調査タスク
  • マルチエージェント通信(A2A) — 別エージェントへのサブタスク委譲

タスクライフサイクルと状態遷移

5つの状態

       ┌──────────────────────────────┐
       ▼                              │
   [working]  ──→  [input_required]  ─┘
       │
       ├──→  [completed]   (終了・不変)
       ├──→  [failed]      (終了・不変)
       └──→  [cancelled]   (終了・不変)
状態 意味 次の状態
working 処理中(初期状態) input_required / 終了状態
input_required サーバーが追加入力を要求中 working(入力後)
completed 正常完了 なし(不変)
failed 失敗 なし(不変)
cancelled キャンセル済み なし(不変)

input_requiredelicitation/createsampling/createMessage がタスクフレームワーク内で発生したときに遷移する。ユーザーへの追加質問や、LLMへの中間問い合わせが完了すると再び working に戻る。

TTLとKeepalive

タスクには有効期限(TTL)が設定される。サーバーは完了タスクをTTL経過後に自動削除し、メモリリークを防ぐ。クライアントはポーリング間隔(pollFrequency)の推奨値をサーバーから受け取れる。

プロトコルの実装

タスク付きリクエスト

tools/call にタスク情報を _meta フィールドで付与する:

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "tools/call",
  "params": {
    "name": "analyze_large_dataset",
    "arguments": { "dataset": "clinical_data_v3.parquet" },
    "_meta": {
      "modelcontextprotocol.io/task": {
        "taskId": "786512e2-9e0d-44bd-8f29-789f320fe840",
        "keepAlive": 60000
      }
    }
  }
}

サーバーはタスクを受理すると 即座に notifications/tasks/created を返す(ポーリング開始の合図):

{
  "jsonrpc": "2.0",
  "method": "notifications/tasks/created",
  "params": {
    "_meta": {
      "modelcontextprotocol.io/related-task": {
        "taskId": "786512e2-9e0d-44bd-8f29-789f320fe840"
      }
    }
  }
}

tasks/get(状態ポーリング)

// リクエスト
{
  "jsonrpc": "2.0",
  "id": 2,
  "method": "tasks/get",
  "params": { "taskId": "786512e2-9e0d-44bd-8f29-789f320fe840" }
}

// レスポンス(処理中)
{
  "jsonrpc": "2.0",
  "id": 2,
  "result": {
    "taskId": "786512e2-9e0d-44bd-8f29-789f320fe840",
    "keepAlive": 30000,
    "pollFrequency": 5000,
    "status": "working"
  }
}

tasks/result(完了後の結果取得)

tasks/result は終了状態のタスクに対して結果を返す。未完了状態では完了までブロッキングする仕様になっている(タイムアウト設定可):

// リクエスト
{
  "jsonrpc": "2.0",
  "id": 3,
  "method": "tasks/result",
  "params": { "taskId": "786512e2-9e0d-44bd-8f29-789f320fe840" }
}

// レスポンス(tools/callの結果)
{
  "jsonrpc": "2.0",
  "id": 3,
  "result": {
    "content": [
      { "type": "text", "text": "分析完了: 1,234件の相互作用を検出。重大な副作用リスクは23件..." }
    ],
    "isError": false
  }
}

tasks/list(タスク一覧)

カーソルベースのページネーションに対応:

{
  "result": {
    "tasks": [
      { "taskId": "abc...", "status": "working", "pollFrequency": 5000 },
      { "taskId": "def...", "status": "completed", "keepAlive": 60000 }
    ],
    "nextCursor": "eyJwYWdlIjoyfQ=="
  }
}

TypeScript 実装ガイド

クライアント側の実装

@modelcontextprotocol/sdk v1.27.0 以上が必要:

npm install @modelcontextprotocol/sdk@^1.27.0

以下のコードは Tasks Primitive 対応 SDK での実装イメージを示す概念コードです。createTaskgetTaskStatusgetTaskResult 等の高レベルAPIは SDK の安定版リリースに合わせて変更される可能性があります。最新の API シグネチャは公式ドキュメントを参照してください。

import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

const transport = new StdioClientTransport({
  command: "node",
  args: ["my-mcp-server.js"]
});

const client = new Client({ name: "tasks-client", version: "1.0.0" });
await client.connect(transport);

// 1. タスクとしてツールを呼び出す
const taskResult = await client.callTool({
  name: "deep_research",
  arguments: {
    query: "2026年のAIエージェントエコシステムの動向分析",
    sources: 50
  }
}, {
  createTask: true,
  ttl: 3600000  // TTL: 1時間
});

const taskId = taskResult.task.taskId;
console.log(`タスク開始: ${taskId} (status: ${taskResult.task.status})`);

// 2. ポーリングループ
let status = "working";
while (status === "working" || status === "input_required") {
  await new Promise(r => setTimeout(r, 5000)); // 5秒待機
  const taskStatus = await client.getTaskStatus(taskId);
  status = taskStatus.status;
  console.log(`ステータス: ${status}`);

  if (status === "input_required") {
    // ユーザー入力が必要な場合の処理
    const userInput = await promptUser(taskStatus.elicitationRequest);
    await client.respondToElicitation(taskId, userInput);
  }
}

// 3. 完了後に結果を取得
if (status === "completed") {
  const result = await client.getTaskResult(taskId);
  console.log("完了:", result.content[0].text);
}

// 4. 後処理(不要になったタスクを削除)
await client.deleteTask(taskId);

サーバー側の実装(TypeScript)

import { Server } from "@modelcontextprotocol/sdk/server/index.js";
import { CallToolRequestSchema } from "@modelcontextprotocol/sdk/types.js";

const server = new Server({ name: "research-server", version: "1.0.0" });

server.setRequestHandler(CallToolRequestSchema, async (request) => {
  const { name, arguments: args, _meta } = request.params;

  if (name === "deep_research") {
    // _meta にタスク情報が含まれている場合は非同期処理
    if (_meta?.["modelcontextprotocol.io/task"]) {
      const { taskId } = _meta["modelcontextprotocol.io/task"];

      // バックグラウンドで処理を開始
      runResearchAsync(taskId, args as { query: string; sources: number });

      // 即座にタスクIDを返す(ブロッキングしない)
      return {
        task: {
          taskId,
          status: "working",
          createdAt: new Date().toISOString(),
          ttl: 3600000,
          pollInterval: 5000
        }
      };
    }
    // 通常の同期処理(後方互換性)
    return await executeResearch(args as { query: string; sources: number });
  }
});

async function runResearchAsync(taskId: string, args: { query: string; sources: number }) {
  try {
    // 進捗通知を送信しながら処理
    await server.sendNotification({
      method: "notifications/tasks/progress",
      params: {
        _meta: { "modelcontextprotocol.io/related-task": { taskId } },
        progress: { step: 1, total: 5, message: "ソース収集中..." }
      }
    });

    const result = await executeResearch(args);

    // 完了状態を保存(次の tasks/result で返却)
    taskStore.set(taskId, { status: "completed", result });

  } catch (error) {
    taskStore.set(taskId, {
      status: "failed",
      error: (error as Error).message
    });
  }
}

Python(FastMCP)でのストリーミング実装

Python SDK でも v0.5.0 以降でTasksサポートが追加されている。FastMCPライブラリを使った実装:

from fastmcp import FastMCP
from fastmcp.utilities.context import Context
import asyncio

mcp = FastMCP("research-server")

@mcp.tool()
async def deep_research(query: str, sources: int, ctx: Context) -> str:
    """長時間かかる深層リサーチを実行する"""

    total_steps = sources
    results = []

    for i in range(total_steps):
        await asyncio.sleep(2)  # 実際の処理に置き換え

        # クライアントへの進捗通知(Streamable HTTP SSE 経由)
        await ctx.report_progress(current=i + 1, total=total_steps)
        await ctx.info(f"ソース {i + 1}/{total_steps} 処理中: {query[:30]}...")

        results.append(f"Source {i+1}: 関連情報を発見")

    # 完了通知
    summary = f"{total_steps}件のソースを分析。{len(results)}件の関連情報を取得。"
    return summary

if __name__ == "__main__":
    mcp.run(transport="streamable-http", port=8080)

クライアント側からの接続(create_task=True 等のパラメータは Tasks Primitive 対応版 SDK での実装イメージです):

import asyncio
from mcp import ClientSession
from mcp.client.streamable_http import streamable_http

async def main():
    async with streamable_http("http://localhost:8080/mcp") as (read, write):
        async with ClientSession(read, write) as session:
            await session.initialize()

            # タスクとしてツールを呼び出す
            task = await session.call_tool(
                "deep_research",
                {"query": "MCP 2026年のエコシステム動向", "sources": 10},
                create_task=True,
                ttl=1800000
            )

            print(f"タスクID: {task.task_id}")

            # ポーリングで状態確認
            while True:
                status = await session.get_task_status(task.task_id)
                print(f"状態: {status.status}")
                if status.status in ("completed", "failed", "cancelled"):
                    break
                await asyncio.sleep(5)

            # 結果取得
            if status.status == "completed":
                result = await session.get_task_result(task.task_id)
                print(f"完了: {result.content[0].text}")

asyncio.run(main())

v1.27.0 の変更内容

ストリーミングメソッド追加(PR #1528)

v1.27.0 の主要変更は、elicitationsampling へのストリーミングメソッド追加だ1

これにより Tasks フレームワーク内で:

  • サーバーがユーザーへの入力要求(elicitation)をストリーミングで送れる
  • サーバーがLLMへの問い合わせ(sampling)をストリーミングで取得できる

Streamable HTTP トランスポート

MCPのストリーミングは Streamable HTTP(HTTP + SSE)で実現する:

クライアント: HTTP POST → サーバー (リクエスト送信)
クライアント: HTTP GET  ← サーバー (SSEでリアルタイム受信)

以前のHTTP+SSE実装と比べて接続が安定し、ロードバランサとの互換性も向上している2

v1.27.1 のセキュリティ修正

v1.27.1(2026年2月24日)では2つの重要な修正が含まれる:

  1. コマンドインジェクション防止 — URLオープニング機能の脆弱性を修正
  2. エラーハンドリング修正 — トランスポートエラーがサイレントに無視される問題を解消

v1.27.0を使用している場合は v1.27.1以上へのアップグレードが推奨される。

npm update @modelcontextprotocol/sdk
# → 1.27.1以上にアップデート

Elicitation と Sampling の使い分け

タスクが input_required 状態になる2つのケースを整理する:

Elicitation(ユーザーから入力収集)

// サーバー側: タスク処理中にユーザー入力を要求
await server.sendElicitationRequest({
  taskId: "...",
  message: "分析対象の期間を指定してください",
  requestedSchema: {
    type: "object",
    properties: {
      startDate: { type: "string", format: "date" },
      endDate: { type: "string", format: "date" }
    }
  }
});
// → タスクが input_required 状態に遷移
// → クライアントがユーザーに入力を求め、レスポンス後 working に戻る

Sampling(LLMに補完を要求)

// サーバー側: タスク処理中にLLMの判断を要求
const completion = await server.requestSampling({
  taskId: "...",
  messages: [
    {
      role: "user",
      content: { type: "text", text: "この分析結果を要約してください: ..." }
    }
  ],
  modelPreferences: {
    hints: [{ name: "claude-3-sonnet" }],
    intelligencePriority: 0.8,
    speedPriority: 0.5
  },
  maxTokens: 500
});
// → クライアントのLLM(Claude等)が応答を生成して返す
// → サーバーはその結果を使って処理を継続

注意点

Tasks非対応サーバーとの後方互換性

サーバーが Tasks に対応していない場合(_meta を無視する実装)、リクエストは通常の同期処理として実行される。このグレースフルデグラデーションにより、既存のMCPサーバーを変更せずにクライアント側だけタスク対応コードを書いても問題ない。

ただし、対応状況の確認手段として Capabilities ネゴシエーション時に tasks ケイパビリティが宣言されているかを確認するのが確実だ:

const capabilities = await client.getServerCapabilities();
const tasksSupported = capabilities.experimental?.tasks !== undefined;

タスクIDの管理

タスクIDはクライアント側で生成するUUIDだ。アプリケーション側でタスクIDと対応する処理内容のマッピングを永続化しておく必要がある。サーバー再起動や接続切断後にも、同じタスクIDでポーリングを再開できる。

TTLの設計

TTLが短すぎると、処理完了前にタスクが削除される可能性がある。長い処理には余裕を持ったTTL設定(処理時間の2〜3倍)が推奨される。

まとめ

  • MCP Tasks Primitivetools/call 等を非同期化する「call-now, fetch-later」レイヤー
  • タスクは5状態で管理される(working → input_required ↔ working → 終了状態)
  • v1.27.0で elicitation と sampling のストリーミングサポートが追加
  • TypeScript / Python どちらでも実装でき、既存のMCPサーバーとの後方互換性を保つ
  • v1.27.1 のセキュリティ修正が含まれるため、早めのアップデートが推奨

エージェントが数時間単位の処理を扱う場面が増えてきた今、Tasks Primitiveは本番運用のMCPエージェントに必須の仕組みといえる。

参考リンク

  1. TypeScript SDK v1.27.0 リリースノート — PR #1528の変更内容

  2. MCP Streamable HTTP ドキュメント — トランスポートの比較

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?