0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Mapbox】第2回:CrewAI × Mapbox MCP × Agent Skills でマルチエージェントバックエンドを作る

0
Posted at

シリーズ:AI Trip Planner チュートリアル


はじめに

第1回ではサンプルデータを使ったフロントエンドを構築しました。第2回では「目的地を入力すると AI が実際に旅行を生成する」バックエンドを構築します。

ポイントはマルチエージェントアーキテクチャです。1つの AI に全部やらせるのではなく、専門の役割を持つエージェントが分業することで、より高品質な結果を得られます。

今回のアーキテクチャ

ユーザーのクエリ
  ↓
Curator エージェント  ← POI(観光スポット)を検索・収集
  ↓
Planner エージェント  ← ルートを計算し、スケジュールを組む(Agent Skills 搭載)
  ↓
Writer エージェント   ← シネマティックな説明文を生成し、Trip JSON を出力

各エージェントは Mapbox MCP Server@mapbox/mcp-server)経由で Mapbox の地図 API を呼び出します。


第2回の技術スタック

ツール 役割
CrewAI マルチエージェントフレームワーク
@mapbox/mcp-server MCP プロトコルで Mapbox API を提供
Mapbox Agent Skills エージェント向け RAG ドキュメント
ONNX エンベッダー RAG でツール選択の精度を向上
Claude Haiku 各エージェントの LLM
Python 3.11+ エージェント実装言語

プロジェクトセットアップ

完成コードはこちら
github.com/akiramur/trip-planner-ai-tutorial-blogpart2/backend-mcp-skills ブランチ

Python 環境

cd agents
python -m venv .venv
source .venv/bin/activate  # Windows: .venv\Scripts\activate
pip install crewai anthropic httpx

Mapbox Agent Skills のインストール

cd ..  # プロジェクトルートに戻る
npx skills add mapbox/mapbox-agent-skills

インストールする Skills を選ぶリストが表示されたら、以下の2つを選択してください:

  • mapbox-search-patterns — 検索ツール選択・空間フィルターのルール
  • mapbox-mcp-runtime-patterns — MCP ツールの API コスト判断ルール

crew.py_PLANNER_SKILLS がこの2つを参照しており、Planner エージェントの RAG に必要な Skills はこれで揃います。

次にインストール先のエージェントを選ぶリストが表示されたら Claude Code (.claude/skills) を選択してください。crew.py.claude/skills/ を参照しているためです。

インストールスコープを聞かれたら project を選択してください。このプロジェクト専用の Skills のため、グローバルに入れる必要はありません。

npx skills add は Skills を {実行ディレクトリ}/.claude/skills/ にインストールします。crew.pyPath(__file__).parent.parent.parent / ".claude" / "skills" でプロジェクトルートの .claude/skills/ を参照するため、agents/ 配下で実行すると読み込めません。

crew.py_build_knowledge_sources() がこのディレクトリを読み込み、Planner エージェントの RAG ナレッジソースとして渡します。CrewAI が ONNX エンベッダーでベクトル化し、ツール選択の精度向上に使用します。このディレクトリは .gitignore に追加して git 管理外にします(各自の環境でインストールするため)。

環境変数

.env.local に以下を追加します。

ANTHROPIC_API_KEY=sk-ant-...

MCP ブリッジの仕組み

Python エージェントは Mapbox MCP Server と直接通信できません。MCP Server は Next.js プロセスが stdio で管理するシングルトンだからです。

代わりに、Python エージェントは HTTP POST で Next.js の /api/mcp-bridge エンドポイントを呼び出し、そこが MCP クライアント経由でリクエストを転送します。

Python エージェント
  → HTTP POST /api/mcp-bridge { toolName, args }
    → lib/mcp-client.ts(シングルトン MCP クライアント)
      → @mapbox/mcp-server(stdio 子プロセス)
        → Mapbox API

MCP クライアント(TypeScript 側)

// lib/mcp-client.ts
import { Client } from "@modelcontextprotocol/sdk/client/index.js";
import { StdioClientTransport } from "@modelcontextprotocol/sdk/client/stdio.js";

async function createClient(): Promise<Client> {
  const transport = new StdioClientTransport({
    command: "npx",
    args: ["-y", "@mapbox/mcp-server"],  // パッケージ名は @mapbox/mcp-server
    env: {
      ...process.env,
      MAPBOX_ACCESS_TOKEN:
        process.env.NEXT_PUBLIC_MAPBOX_ACCESS_TOKEN ?? "",
    },
  });

  const client = new Client(
    { name: "trip-planner-tutorial", version: "1.0.0" },
    { capabilities: {} }
  );

  await client.connect(transport);
  return client;
}

export async function callMCPTool(
  toolName: string,
  args: Record<string, unknown>
): Promise<MCPToolResult> {
  const client = await getMCPClient();
  const result = await client.callTool({ name: toolName, arguments: args });
  return result as MCPToolResult;
}

重要: パッケージ名は @mapbox/mcp-server です(@mapbox/mapbox-mcp-server ではありません)。

MCP ブリッジ(API Route)

// app/api/mcp-bridge/route.ts
import { NextRequest, NextResponse } from "next/server";
import { callMCPTool } from "@/lib/mcp-client";

export async function POST(request: NextRequest) {
  const { toolName, args } = await request.json();

  if (!toolName || typeof toolName !== "string") {
    return NextResponse.json({ error: "Missing toolName" }, { status: 400 });
  }

  const result = await callMCPTool(toolName, args ?? {});
  return NextResponse.json(result);
}

Mapbox MCP ツール

@mapbox/mcp-server が提供するツールのうち、今回使うものは以下の3つです(ツール名は PascalCase)。

MCP ツール名 用途
ForwardGeocodeTool 地名・住所 → 座標に変換
PoiSearchTool カテゴリ(飲食店・寺院など)でスポットを検索
DirectionsTool 複数地点間のルートと所要時間を計算

Python 側のツール実装

CrewAI エージェントが使う Python ツールを agents/src/tools.py に実装します。各ツールは MCP ブリッジを HTTP で呼び出します。

# agents/src/tools.py
import json
import os
import sys
from typing import Optional

import httpx
from crewai.tools import BaseTool
from pydantic import BaseModel, Field

MCP_BRIDGE_URL = os.getenv("MCP_BRIDGE_URL", "http://localhost:3000/api/mcp-bridge")

# Language for search queries — injected by main.py
_locale: str = "en"


def _log_output(msg: str) -> None:
    """Write a log line to stderr (stdout is reserved for the final JSON output)."""
    print(msg, file=sys.stderr, flush=True)


def _map_focus(event_type: str, payload: dict) -> None:
    """
    Emit a MAP_FOCUS event to stdout so the Next.js parent can update the map.
    The parent process reads JSON lines from stdout; non-Trip lines are filtered out.
    """
    print(json.dumps({"MAP_FOCUS": event_type, **payload}), flush=True)


def _call_bridge(tool_name: str, args: dict) -> str:
    """Call the MCP bridge and return the result text."""
    try:
        response = httpx.post(
            MCP_BRIDGE_URL,
            json={"toolName": tool_name, "args": args},
            timeout=30.0,
        )
        response.raise_for_status()
        data = response.json()

        # MCP returns { content: [{ type: "text", text: "..." }] }
        content = data.get("content", [])
        if content and content[0].get("type") == "text":
            return content[0]["text"]
        return json.dumps(data)
    except httpx.HTTPError as e:
        _log_output(f"[MCP Bridge error] {tool_name}: {e}")
        return json.dumps({"error": str(e)})


# ---------------------------------------------------------------------------
# MapboxSearchTool — ForwardGeocodeTool
# ---------------------------------------------------------------------------

class MapboxSearchInput(BaseModel):
    q: str = Field(description="Place name or address to search for")
    language: str = Field(default="en", description="Language for results (en, ja, etc.)")
    limit: int = Field(default=3, ge=1, le=10, description="Maximum number of results")
    proximity_longitude: Optional[float] = Field(default=None, description="Longitude bias for nearby results")
    proximity_latitude: Optional[float] = Field(default=None, description="Latitude bias for nearby results")
    country: Optional[list[str]] = Field(default=None, description="ISO country codes, e.g. ['jp']")


class MapboxSearchTool(BaseTool):
    name: str = "mapbox_search"
    description: str = (
        "Convert a place name or address into geographic coordinates (ForwardGeocodeTool). "
        "Use for specific named places (landmarks, parks, streets, areas). "
        "Returns: {results: [{name, full_address, coordinates: [lng, lat]}]}"
    )
    args_schema: type[BaseModel] = MapboxSearchInput

    def _run(
        self,
        q: str,
        language: str = "en",
        limit: int = 3,
        proximity_longitude: Optional[float] = None,
        proximity_latitude: Optional[float] = None,
        country: Optional[list[str]] = None,
    ) -> str:
        args: dict = {
            "q": q,
            "language": language,
            "limit": limit,
            "format": "json_string",
        }
        if proximity_longitude is not None and proximity_latitude is not None:
            args["proximity"] = {"longitude": proximity_longitude, "latitude": proximity_latitude}
        if country:
            args["country"] = country

        raw = _call_bridge("ForwardGeocodeTool", args)
        _log_output(f"[mapbox_search] {q!r}{raw[:120]}")

        try:
            data = json.loads(raw)
            features = data.get("features", data.get("results", []))
            results = [
                {
                    "name": f.get("properties", {}).get("name") or f.get("place_name", ""),
                    "full_address": (
                        f.get("properties", {}).get("full_address")
                        or f.get("place_name", "")
                    ),
                    "coordinates": (
                        f.get("geometry", {}).get("coordinates")
                        or f.get("center", [])
                    ),
                }
                for f in features
            ]
            return json.dumps({"results": results})
        except (json.JSONDecodeError, KeyError):
            return raw


# ---------------------------------------------------------------------------
# MapboxPoiSearchTool — PoiSearchTool
# ---------------------------------------------------------------------------

class MapboxPoiSearchInput(BaseModel):
    q: str = Field(description="POI search query (e.g. 'ramen restaurant', 'temple', 'park')")
    language: str = Field(default="en", description="Language for results (en, ja, etc.)")
    limit: int = Field(default=5, ge=1, le=10, description="Maximum results")
    proximity_longitude: Optional[float] = Field(default=None, description="Search near this longitude")
    proximity_latitude: Optional[float] = Field(default=None, description="Search near this latitude")
    country: Optional[list[str]] = Field(
        default=None,
        description="ISO country codes to restrict results, e.g. ['jp']. "
                    "Do NOT combine with poi_category.",
    )
    types: Optional[list[str]] = Field(
        default=None,
        description="Feature types to return, e.g. ['poi']. "
                    "NEVER combine with poi_category — they conflict and return 0 results.",
    )


class MapboxPoiSearchTool(BaseTool):
    name: str = "mapbox_poi_search"
    description: str = (
        "Search for points of interest (restaurants, temples, museums, parks, etc.) using PoiSearchTool. "
        "Use for category-based discovery, not named-place lookup. "
        "CRITICAL: never combine poi_category and types=['poi'] — they return 0 results. "
        "Returns: {results: [{name, full_address, coordinates: [lng, lat], mapbox_id, feature_type}]}"
    )
    args_schema: type[BaseModel] = MapboxPoiSearchInput

    _AREA_TYPES = {"neighborhood", "locality", "district", "region", "country"}

    def _run(
        self,
        q: str,
        language: str = "en",
        limit: int = 5,
        proximity_longitude: Optional[float] = None,
        proximity_latitude: Optional[float] = None,
        country: Optional[list[str]] = None,
        types: Optional[list[str]] = None,
    ) -> str:
        args: dict = {
            "q": q,
            "language": language,
            "limit": limit,
            "format": "json_string",
        }
        if proximity_longitude is not None and proximity_latitude is not None:
            args["proximity"] = {"longitude": proximity_longitude, "latitude": proximity_latitude}
        if country:
            args["country"] = country
        if types:
            args["types"] = types

        raw = _call_bridge("PoiSearchTool", args)
        _log_output(f"[mapbox_poi_search] {q!r}{raw[:120]}")

        try:
            data = json.loads(raw)
            features = data.get("features", data.get("results", []))
            results = []
            for f in features:
                props = f.get("properties", {})
                feature_type = props.get("feature_type", props.get("result_type", ""))
                if feature_type in self._AREA_TYPES:
                    continue
                results.append(
                    {
                        "name": props.get("name") or f.get("place_name", ""),
                        "full_address": (
                            props.get("full_address") or f.get("place_name", "")
                        ),
                        "coordinates": (
                            f.get("geometry", {}).get("coordinates")
                            or f.get("center", [])
                        ),
                        "mapbox_id": props.get("mapbox_id", ""),
                        "feature_type": feature_type,
                    }
                )
            return json.dumps({"results": results})
        except (json.JSONDecodeError, KeyError):
            return raw


# ---------------------------------------------------------------------------
# MapboxDirectionsTool — DirectionsTool
# ---------------------------------------------------------------------------

class Coordinate(BaseModel):
    longitude: float
    latitude: float


class MapboxDirectionsInput(BaseModel):
    coordinates: list[Coordinate] = Field(
        description="Waypoints in order (minimum 2). Each has longitude and latitude.",
        min_length=2,
    )
    routing_profile: str = Field(
        default="driving-traffic",
        description="'walking', 'driving', 'cycling', or 'driving-traffic'",
    )


class MapboxDirectionsTool(BaseTool):
    name: str = "mapbox_directions"
    description: str = (
        "Calculate a route between two or more locations using DirectionsTool. "
        "Returns GeoJSON route geometry and travel duration. "
        "Pass all waypoints in a single call — do not split into pairs. "
        "Returns: {routeGeometry: GeoJSON LineString, legDurations: [seconds, ...]} "
        "legDurations[i] is the travel time in seconds for the segment between waypoint i and i+1."
    )
    args_schema: type[BaseModel] = MapboxDirectionsInput

    def _run(
        self,
        coordinates: list[Coordinate],
        routing_profile: str = "driving-traffic",
    ) -> str:
        coord_list = []
        for c in coordinates:
            if hasattr(c, "model_dump"):
                coord_list.append(c.model_dump())
            elif isinstance(c, dict):
                coord_list.append(c)
            else:
                coord_list.append({"longitude": c.longitude, "latitude": c.latitude})
        args = {
            "coordinates": coord_list,
            "routing_profile": routing_profile,
            "geometries": "geojson",
        }

        raw = _call_bridge("DirectionsTool", args)
        _log_output(f"[mapbox_directions] {len(coord_list)} waypoints, profile={routing_profile}")

        if len(coord_list) >= 2:
            _map_focus("DIRECTIONS", {
                "from": [coord_list[0]["longitude"], coord_list[0]["latitude"]],
                "to": [coord_list[-1]["longitude"], coord_list[-1]["latitude"]],
            })

        try:
            data = json.loads(raw)
            routes = data.get("routes", [])
            if routes:
                route = routes[0]
                geometry = route.get("geometry")
                leg_durations = [leg["duration"] for leg in route.get("legs", [])]
                result: dict = {"routeGeometry": geometry}
                if leg_durations:
                    result["legDurations"] = leg_durations
                return json.dumps(result)
        except (json.JSONDecodeError, KeyError):
            pass
        return raw

Pydantic スキーマを使う理由

CrewAI は各ツールの args_schema を LLM に渡してツール呼び出しの JSON を生成します。BaseModel でスキーマを明示することで:

  • LLM が正しい型・フィールド名でツールを呼び出せる
  • Optional フィールドで proximity(近傍バイアス)や country フィルターをオプション指定できる
  • Field(description=...) の文言がそのままエージェントへの指示になる

エージェント定義(agents.yaml)

3つのエージェントの役割・目標・バックストーリーを YAML で定義します。

# agents/config/agents.yaml

curator:
  role: Travel Experience Curator
  goal: >
    【重要】あなたはチャットボットではなく、データ生成パイプラインの一部である。
    挨拶・案内・質問・確認・待機メッセージは一切禁止。
    入力されたリクエストに対して即座にツールを実行し、指定フォーマットの JSON のみを出力せよ。
    ユーザーの要望を具体的なスポット(POI)に変換し、旅の「構成案」を作成する。
    ルートの詳細計算は行わず、最高の訪問先リストを作ることに集中する。
  backstory: >
    あなたは世界中の観光スポットや飲食店に精通したデータプロセッサーである。
    人間と会話するシステムではなく、入力を受け取り、出力 JSON を返す自動パイプラインである。
  verbose: true
  allow_delegation: false

planner:
  role: Travel Route Planner
  goal: >
    【重要】あなたはチャットボットではなく、データ生成パイプラインの一部である。
    挨拶・案内・質問・確認・待機メッセージは一切禁止。
    スポットリストを受け取り、mapbox_directions でルートと所要時間を確定させる。
    各スポットの zoom/pitch/bearing も推奨値として出力する。
    スケジュールに食事の空き時間があれば飲食店を自動挿入し、実現可能な行程表を完成させる。
  backstory: >
    あなたは地理データとロジスティクスのスペシャリストである。
    点と点を線で結び、移動時間を計算し、ランチやディナーの最適なタイミングを割り出す
    自動パイプラインの一部として動作する。
  verbose: true
  allow_delegation: false

writer:
  role: Cinematic Travel Director
  goal: >
    【重要】あなたはチャットボットではなく、データ生成パイプラインの一部である。
    Planner の JSON を受け取り、即座に description・lightPreset・cameraStyle を付加した
    完全な Trip JSON のみを返す。JSON 以外のテキストは一切出力禁止。
    映画のワンシーンのような没入感ある description を執筆し、
    Mapbox の 3D 演出(カメラアングル・光の質感)を最大限に引き出す設定を施す。
  backstory: >
    あなたはアカデミー賞受賞の撮影監督であり、旅のシネマティックな物語を描く
    データ変換エンジンである。"Cinematic tracking shot"、"Dramatic low-angle tilt"、
    "Elegant high-altitude orbit" などの演出語彙で各スポットを設計する。
  verbose: true
  allow_delegation: false

【重要】 プレフィックスを付ける理由
CrewAI は LLM にエージェントの goal をそのまま渡します。実際に動かすと、LLM が律儀に「了解しました!次のスポットを探します」などと返してパイプラインが止まるケースが頻発しました。「チャットボットではなくパイプラインの一部」と明示することで、余計な会話的テキストを抑制できます。


タスク定義(tasks.yaml)

# agents/config/tasks.yaml

search_pois:
  description: >
    以下の旅行リクエストに合う実在するスポットを検索せよ: {query}

    【パイプライン制約】挨拶・説明・質問は一切禁止。即座にツールを実行すること。

    mapbox_search と mapbox_poi_search を使い、4〜6 件の実在スポットを探せ。
    朝から夕方まで 1 日を通じて楽しめる多様な体験(寺社・市場・展望台・食事など)を含めること。

    【日本国内スポット検索の必須ルール】
    - q には日本語のキーワードのみ使用(例: "ラーメン" "居酒屋")。英語キーワードは精度が大幅低下
    - language="ja", country=["jp"], types=["poi"] の 3 点セットを必須とする
    - poi_category と types=["poi"] を同時に渡すと 0 件になるため絶対禁止
    - feature_type が neighborhood/locality/district/region/country の結果は除外(行政区域)

    【出力形式】JSON 配列のみ。説明文・マークダウン禁止。
    [
      {{
        "name": "スポット名(正式名称)",
        "type": "place",
        "coordinates": [経度, 緯度],
        "address": "住所",
        "description": "このスポットが面白い理由を 1 文で"
      }}
    ]

  expected_output: >
    4〜6 件の実在スポットを含む JSON 配列。マークダウン・説明文なし。
  agent: curator

plan_route:
  description: >
    Curator が提供したスポットリストをもとに、完全な旅程を構築せよ。

    【パイプライン制約】挨拶・説明・質問は一切禁止。即座にツールを実行すること。

    手順:
    1. スポットを地理的に最適な順序に並べる(移動距離を最小化)
    2. 全ウェイポイント(スポット1→スポット2→…→スポットN)を coordinates に渡して
       mapbox_directions を 1 回だけ呼び出してルートを一括取得する
       レスポンスには以下が含まれる:
       - routeGeometry: 全体ルートの LineString
       - legDurations: 区間ごとの所要秒数配列(legDurations[0] = スポット1→2の秒数、以降同様)
       legDurations を使って各チャプターの移動時間を正確に計算し、schedule に反映すること。
       各チャプターの routeGeometry は null のままで構わない(フロントエンドが自動補完する)。
    3. 9:00 AM 始まりでスケジュールを割り当てる(1 スポット 30〜90 分 + 移動時間)
    4. 11:30〜13:30(ランチ)または 18:00〜20:00(ディナー)に移動が重なる場合、
       飲食店チャプターを自動挿入する
    5. カメラ推奨値を設定する(zoom: 14〜17、pitch: 45〜75、bearing: -180〜180)

    【スポット検索の使い分け】
    - mapbox_search: 公園・通り・エリアなど面的な場所
    - mapbox_poi_search: 建物・飲食店・施設など点的な場所

    【mapboxId ルール】
    - mapbox_poi_search の結果から得られた生の Base64 ID のみ使用
    - "kiyomizu-dera-kyoto" など人間が読める文字列は禁止(400 Bad Request の原因)

    【セルフレビュー(出力前に必ず確認)】
    - 全スポットに実在する正式名称が使われているか("Local Lunch Spot" 等の一般名称は禁止)
    - mapboxId がある場合、生の Base64 文字列か
    - ランチまたはディナーの枠が存在するか

    【出力形式】JSON 配列のみ。説明文・マークダウン禁止。
    [
      {{
        "id": "slug-based-id",
        "title": "スポット名",
        "type": "place",
        "address": "住所",
        "coordinates": [経度, 緯度],
        "zoom": 16,
        "pitch": 60,
        "bearing": 20,
        "schedule": "9:00 AM",
        "routeGeometry": null
      }}
    ]

  expected_output: >
    ルート計算済みのチャプター配列(JSON)。マークダウン・説明文なし。
  agent: planner
  context:
    - search_pois

write_stories:
  description: >
    Planner のルートデータを、フロントエンドが描画できる完全な Trip JSON に変換せよ。

    【パイプライン制約】挨拶・説明・質問は一切禁止。JSON のみ出力すること。

    各チャプターに以下を付加する:

    description(必須):
    - 映画のト書きのような没入感ある 2〜3 文
    - 感覚的な描写(光・音・空気感・歴史的文脈)を含める
    - food チャプター: 料理の見た目・香り・食感を中心に

    lightPreset(時刻から機械的に決定):
    - 05:00〜08:59 → "dawn"
    - 09:00〜15:59 → "day"
    - 16:00〜18:59 → "dusk"
    - 19:00〜04:59 → "night"

    cameraStyle(場所の性質から選択):
    - "orbit"    : 城・寺院・タワー(中心を旋回)
    - "flyover"  : 橋・川沿い・大通り(低空直線飛行)
    - "street"   : 商店街・横丁・市場(路地這い)
    - "panorama" : 海岸・公園・展望台(広大景色)
    - food チャプター: "street" または "orbit" のみ

    【出力形式】以下の Trip JSON のみ。マークダウン・説明文は一切禁止。
    {{
      "title": "旅行タイトル(印象的な一言)",
      "description": "1〜2 文の旅行概要",
      "chapters": [
        {{
          "id": "slug-id",
          "title": "スポット名",
          "description": "映像的な 2〜3 文の描写...",
          "schedule": "9:00 AM",
          "location": {{
            "center": [経度, 緯度],
            "zoom": 16,
            "pitch": 60,
            "bearing": 20
          }},
          "lightPreset": "day",
          "cameraStyle": "orbit"
        }}
      ]
    }}

    【保持フィールド】id・location.center・zoom・pitch は Planner の値を変更禁止。
    【routeGeometry】出力トークン節約のため全チャプターで省略すること。

  expected_output: >
    フロントエンドが直接レンダリングできる完全な Trip JSON。マークダウンなし。
  agent: writer
  context:
    - plan_route

Writer が routeGeometry を省略する理由
LLM が大量の座標配列を正確に再現することは困難かつトークンの無駄です。routeGeometry はサーバーサイドで Mapbox Directions API から取得して後から注入します(第3回で解説)。


Crew の組み立て(crew.py)

# agents/src/crew.py
from pathlib import Path
import os
from crewai import Agent, Crew, LLM, Process, Task
from crewai.knowledge.source.string_knowledge_source import StringKnowledgeSource
from crewai.project import CrewBase, agent, crew, task
from src.tools import MapboxDirectionsTool, MapboxPoiSearchTool, MapboxSearchTool

_PROJECT_ROOT = Path(__file__).parent.parent.parent
_SKILLS_DIR = _PROJECT_ROOT / ".claude" / "skills"

_PLANNER_SKILLS = [
    "mapbox-search-patterns",       # 検索ツール選択・空間フィルター
    "mapbox-mcp-runtime-patterns",  # MCPツールのAPI コスト判断
]

_ONNX_EMBEDDER = {"provider": "onnx", "config": {}}


def _build_knowledge_sources() -> list[StringKnowledgeSource]:
    """インストール済みの Mapbox Agent Skills から KnowledgeSource を生成する。

    SKILL.md(ルール・判断基準)と AGENTS.md(実装コード例)の両方を読み込む。
    TextFileKnowledgeSource に絶対パスを渡すと CrewAI が先頭に knowledge/ を
    付加してしまうため、ファイルを自前で読み込んで StringKnowledgeSource に渡す。
    """
    import sys
    sources = []
    for skill_name in _PLANNER_SKILLS:
        for filename in ("SKILL.md", "AGENTS.md"):
            skill_path = _SKILLS_DIR / skill_name / filename
            if skill_path.exists():
                content = skill_path.read_text(encoding="utf-8")
                sources.append(StringKnowledgeSource(content=content))
            else:
                print(f"[crew] Warning: Skills file not found: {skill_path}", file=sys.stderr)
    return sources


@CrewBase
class TripPlannerCrew:
    """Sequential pipeline: Curator → Planner → Writer"""

    agents_config = "../config/agents.yaml"
    tasks_config = "../config/tasks.yaml"

    def _llm(self) -> LLM:
        return LLM(
            model="anthropic/claude-haiku-4-5-20251001",
            api_key=os.environ["ANTHROPIC_API_KEY"],
            max_tokens=8000,
        )

    @agent
    def curator(self) -> Agent:
        # Curator は広範な探索が目的のため Skills なし(速度優先)
        return Agent(
            config=self.agents_config["curator"],
            tools=[MapboxSearchTool(), MapboxPoiSearchTool()],
            llm=self._llm(),
            max_iter=20,
        )

    @agent
    def planner(self) -> Agent:
        # Planner は Mapbox Agent Skills(RAG)搭載
        knowledge_sources = _build_knowledge_sources()
        return Agent(
            config=self.agents_config["planner"],
            tools=[MapboxSearchTool(), MapboxPoiSearchTool(), MapboxDirectionsTool()],
            llm=self._llm(),
            max_iter=40,
            knowledge_sources=knowledge_sources or None,
            embedder=_ONNX_EMBEDDER if knowledge_sources else None,
        )

    @agent
    def writer(self) -> Agent:
        # Writer はツール不要(テキスト生成のみ)
        return Agent(
            config=self.agents_config["writer"],
            tools=[],
            llm=self._llm(),
        )

    # ------------------------------------------------------------------
    # Tasks
    # ------------------------------------------------------------------

    @task
    def search_pois(self) -> Task:
        return Task(config=self.tasks_config["search_pois"])

    @task
    def plan_route(self) -> Task:
        return Task(config=self.tasks_config["plan_route"])

    @task
    def write_stories(self) -> Task:
        return Task(config=self.tasks_config["write_stories"])

    # ------------------------------------------------------------------
    # Crew
    # ------------------------------------------------------------------

    @crew
    def crew(self) -> Crew:
        return Crew(
            agents=self.agents,
            tasks=self.tasks,
            process=Process.sequential,  # Curator → Planner → Writer の順に実行
            verbose=True,
        )

Mapbox Agent Skills とは

Agent Skills は、エージェントが Mapbox ツールをより効果的に使えるようにする RAG(検索拡張生成)ドキュメントです。各スキルには2種類のファイルが含まれます:

ファイル 内容
SKILL.md ツール選択基準・API コストの判断ルール
AGENTS.md 実装コード例・具体的なユースケース

_build_knowledge_sources() は両ファイルを読み込んで StringKnowledgeSource に変換します。ファイルが存在しない場合は stderr に警告を出力してスキップします。

Planner エージェントのみに適用している理由:

エージェント Skills の必要性
Curator 広範な探索が目的。速度が重要で RAG のオーバーヘッドは不要
Planner 精密なルート計算が必要。正しいツール選択が品質を左右する
Writer ツールを使わないため不要

RAG を使うにはドキュメントをベクトル化するエンベッダーが必要です。CrewAI のデフォルトは OpenAI の Embeddings API を使うため、通常は OpenAI API キーが別途必要になります。ここでは fastembed(Qdrant 製)を使った ONNX エンベッダーを採用し、埋め込み生成を ONNX Runtime でローカル実行します。これにより OpenAI API キーが不要となり、LLM(Claude Haiku)用の Anthropic API キーだけで動作します。


エントリーポイント(main.py)

# agents/src/main.py
import json, re, sys


def _extract_json(text: str) -> dict | list | None:
    """
    CrewAI の生出力から JSON を堅牢に抽出する。

    CrewAI はマークダウンフェンスで JSON を囲んだり、前置きテキストを付けたり、
    自身の verbose ログボックスを混入させることがある。
    優先順に3つの戦略で解析を試みる。
    """
    if not text or not text.strip():
        return None

    # Strategy 1: マークダウンフェンスを除去して直接パース
    cleaned = re.sub(r"```(?:json)?\s*", "", text)
    cleaned = re.sub(r"```", "", cleaned).strip()
    try:
        return json.loads(cleaned)
    except json.JSONDecodeError:
        pass

    # Strategy 2: 最初の { ... } または [ ... ] ブロックを探す
    for start_char, end_char in [('{', '}'), ('[', ']')]:
        start = text.find(start_char)
        if start == -1:
            continue
        depth = 0
        in_string = False
        escape_next = False
        for i, ch in enumerate(text[start:], start=start):
            if escape_next:
                escape_next = False
                continue
            if ch == '\\' and in_string:
                escape_next = True
                continue
            if ch == '"':
                in_string = not in_string
                continue
            if in_string:
                continue
            if ch == start_char:
                depth += 1
            elif ch == end_char:
                depth -= 1
                if depth == 0:
                    try:
                        return json.loads(text[start:i + 1])
                    except json.JSONDecodeError:
                        break

    # Strategy 3: ```json ... ``` コードブロックを明示的に探す
    code_block_match = re.search(r"```(?:json)?\s*(\{[\s\S]*?\}|\[[\s\S]*?\])\s*```", text)
    if code_block_match:
        try:
            return json.loads(code_block_match.group(1))
        except json.JSONDecodeError:
            pass

    return None


def _log(msg: str) -> None:
    """進捗メッセージを STDOUT_LOG: プレフィックスで stdout に出力する。
    
    Next.js 親プロセスがこの行を読み取り、SSE イベントとしてブラウザに転送する。
    """
    print(f"STDOUT_LOG: {msg}", flush=True)


def run(query: str) -> dict:
    from src.crew import TripPlannerCrew

    print(f"[main] Starting trip generation for: {query!r}", file=sys.stderr)

    _log("Searching for spots...")
    crew_instance = TripPlannerCrew().crew()

    # kickoff() 前に sys.stdout を sys.stderr にリダイレクトする。
    # CrewAI の verbose 出力(ボックス枠・エージェントログ)が stdout を汚染するのを防ぐ。
    # stdout は STDOUT_LOG: と TRIP_RESULT: の出力専用に保つ。
    real_stdout = sys.stdout
    sys.stdout = sys.stderr
    try:
        result = crew_instance.kickoff(inputs={"query": query})
    finally:
        sys.stdout = real_stdout

    _log("Planning route...")
    raw_output = result.raw if hasattr(result, "raw") else str(result)

    print(f"[main] Raw output length: {len(raw_output)} chars", file=sys.stderr)
    print(f"[main] Raw output preview: {raw_output[:300]}", file=sys.stderr)

    trip = _extract_json(raw_output)

    if trip is None:
        _log("Retrying JSON extraction...")
        print("[main] JSON extraction failed — retrying with a direct writer call", file=sys.stderr)
        trip = _retry_writer(query, raw_output)

    if trip is None or not isinstance(trip, dict):
        raise ValueError(f"Could not extract valid Trip JSON from output:\n{raw_output[:500]}")

    _log("Finalizing itinerary...")
    return trip


def _retry_writer(query: str, planner_output: str) -> dict | None:
    """
    CrewAI の最終出力が有効な JSON でない場合、
    Writer エージェントに直接変換を依頼するフォールバック。
    """
    import anthropic

    client = anthropic.Anthropic()
    prompt = f"""The following is a travel itinerary plan. Convert it to valid Trip JSON.

ITINERARY PLAN:
{planner_output}

OUTPUT FORMAT (JSON only, no markdown, no explanation):
{{
  "title": "...",
  "description": "...",
  "chapters": [
    {{
      "id": "slug-id",
      "title": "Place Name",
      "description": "2-3 vivid sentences...",
      "schedule": "9:00 AM",
      "location": {{
        "center": [longitude, latitude],
        "zoom": 16,
        "pitch": 60,
        "bearing": 0
      }},
      "lightPreset": "day",
      "cameraStyle": "orbit"
    }}
  ]
}}"""

    message = client.messages.create(
        model="claude-haiku-4-5-20251001",
        max_tokens=4096,
        messages=[{"role": "user", "content": prompt}],
    )
    raw = message.content[0].text if message.content else ""
    return _extract_json(raw)


if __name__ == "__main__":
    if len(sys.argv) < 2:
        print("Usage: python -m src.main <query>", file=sys.stderr)
        sys.exit(1)

    query = sys.argv[1]

    try:
        trip = run(query)
        # TRIP_RESULT: プレフィックスで最終 JSON を stdout に出力
        print(f"TRIP_RESULT: {json.dumps(trip, ensure_ascii=False)}")
    except Exception as e:
        print(f"[main] Fatal error: {e}", file=sys.stderr)
        sys.exit(1)

stdout/stderr の使い分け

出力先 内容
stdout STDOUT_LOG: 進捗メッセージ(Next.js がブラウザへ SSE 転送)
stdout TRIP_RESULT: 最終 Trip JSON
stderr CrewAI verbose ログ・デバッグ情報(kickoff() 中は stdout を stderr にリダイレクト)

この設計により、Next.js 親プロセスは stdout だけを解析すれば進捗と結果を取得でき、CrewAI の冗長なログに邪魔されません。

JSON 抽出の堅牢化(_extract_json

CrewAI エージェントは JSON を返すよう指示しても、マークダウンフェンスで囲んだり前置きテキストを付けたりすることがあります。_extract_json は3段階の戦略でこれに対処します:

  1. マークダウンフェンスを除去して直接 json.loads()
  2. ネスト深度を追跡して最初の {...} / [...] ブロックを抽出
  3. ```json ... ``` コードブロックを正規表現で検索

JSON 抽出失敗時のフォールバック(_retry_writer

3戦略すべて失敗した場合、_retry_writer が Planner の出力を Claude Haiku に直接渡し、「JSON のみで出力せよ」と明示して再変換を試みます。これにより、CrewAI パイプライン全体を再実行せずに済みます。


動作確認

第3回のフロントエンド統合を待たずに、Python エージェント単体で動作確認できます。

ターミナル1: Next.js を起動(MCP ブリッジを有効にする)

npm run dev

ターミナル2: Python エージェントを直接実行

cd agents
source .venv/bin/activate  # Windows: .venv\Scripts\activate
export ANTHROPIC_API_KEY=sk-ant-...  # .env.local は Python が自動読み込みしないため手動でエクスポート
python -m src.main "東京の寺社と食を巡る1日旅"

main.py__main__ エントリーポイントがコマンドライン引数を受け取り、パイプライン全体(Curator → Planner → Writer)を実行します。正常に動作すると stdout に以下が出力されます。

STDOUT_LOG: Searching for spots...
STDOUT_LOG: Planning route...
TRIP_RESULT: {"title": "...", "chapters": [...]}

注意: Python エージェントは MCP ブリッジ(http://localhost:3000/api/mcp-bridge)経由で Mapbox API を呼び出すため、Next.js の起動が必須です。/api/agent(第3回で実装する SSE ブリッジ)は不要です。


まとめ

第2回で構築したポイントを整理します。

マルチエージェントパイプライン

  • Curator(探索)→ Planner(ルート計算)→ Writer(説明文生成)の順次実行
  • 各エージェントが専門性を持つことで、単一 AI より高品質な結果を得られる

Mapbox MCP ブリッジ

  • Python エージェントは HTTP POST で /api/mcp-bridge を呼び出す
  • Next.js が @mapbox/mcp-server をシングルトンの stdio プロセスとして管理
  • MCP ツール名は PascalCase: ForwardGeocodeTool, PoiSearchTool, DirectionsTool

Mapbox Agent Skills

  • Planner エージェントのみに適用(Curator には不要)
  • ONNX エンベッダーを使った RAG でツール選択の精度を向上
  • StringKnowledgeSource で直接コンテンツを渡すことで CrewAI のパス問題を回避

次回予告

第3回では、このバックエンドをフロントエンドと繋げる統合を実装します。

第3回では /api/agent(Python エージェントを SSE でブリッジする Next.js ルート)、Mapbox Directions API によるルートジオメトリのサーバーサイド注入、フォールバック実装を加えてアプリを動かします。


参考・関連リンク

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?