1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Mapbox】第3回:SSE ストリーミング × ルートジオメトリ注入でフロントエンドとバックエンドを繋ぐ

1
Posted at

シリーズ:AI Trip Planner チュートリアル


はじめに

第1回でフロントエンド(Mapbox マップ + タイムラインサイドバー)を、第2回でバックエンド(CrewAI マルチエージェント + Mapbox MCP)を構築しました。第3回ではこれらを統合して、「目的地を入力すると AI が旅行を生成してマップに表示する」動作するアプリを仕上げます。

今回の実装内容

  1. /api/agent — Python エージェントをサブプロセスで起動し、Next.js 側で SSE に変換してブラウザに配信
  2. lib/route-geometry.ts — Mapbox Directions API でルートジオメトリをサーバーサイドから取得
  3. /api/trip — CrewAI が失敗した場合の Claude 直接呼び出しフォールバック
  4. TripApp.tsx — SSE ストリームを読んで進捗表示 → 旅行ビューに遷移

データフロー全体像

ブラウザ(TripApp)
  │ POST /api/agent { query }
  ↓
app/api/agent/route.ts
  │ spawn("python -m src.main", [query])
  │   → stdout: STDOUT_LOG: ... (進捗)
  │   → stdout: TRIP_RESULT: {...} (最終 Trip JSON)
  │
  │ SSE ストリーム ──────────────────────────────→ ブラウザ
  │   event: log    { message }  ← 進捗メッセージを表示
  │   event: result { trip }     ← 旅行ビューに遷移
  │   event: error  { message }  ← フォールバックへ
  │
  │ injectRouteGeometries()
  │   → Mapbox Directions API(chapter ごとに2点間ルート取得)
  │   → chapter.routeGeometry = GeoJSON LineString
  ↓
ブラウザ(MapView)
  └─ routeGeometry が存在すれば道路に沿ったルートライン
     存在しなければ中心点を直線でつなぐ(フォールバック)

SSE Agent API(/api/agent)

Python エージェントをサブプロセスとして起動し、stdout を SSE ストリームとしてブラウザに流します。

// app/api/agent/route.ts
import { NextRequest } from "next/server";
import { spawn } from "child_process";
import { existsSync } from "fs";
import { join } from "path";
import { injectRouteGeometries } from "@/lib/route-geometry";

export const maxDuration = 300; // 5分タイムアウト(エージェントは時間がかかる)

const LOG_PREFIX = "STDOUT_LOG: ";
const TRIP_PREFIX = "TRIP_RESULT: ";

function getPythonPath(): string {
  const agentsDir = join(process.cwd(), "agents");
  const venvBin = join(agentsDir, ".venv", "bin", "python");
  const venvBinWin = join(agentsDir, ".venv", "Scripts", "python.exe");
  if (existsSync(venvBin)) return venvBin;
  if (existsSync(venvBinWin)) return venvBinWin;
  return "python3";
}

export async function POST(request: NextRequest) {
  const { query } = await request.json();

  const encoder = new TextEncoder();

  const stream = new ReadableStream({
    start(controller) {
      const enqueue = (event: string, data: object) => {
        controller.enqueue(
          encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`)
        );
      };

      const pythonPath = getPythonPath();
      const agentsDir = join(process.cwd(), "agents");

      const child = spawn(pythonPath, ["-m", "src.main", query], {
        cwd: agentsDir,
        env: {
          ...process.env,
          MCP_BRIDGE_URL: process.env.MCP_BRIDGE_URL ?? "http://localhost:3000/api/mcp-bridge",
          ANTHROPIC_API_KEY: process.env.ANTHROPIC_API_KEY ?? "",
        },
      });

      let lineBuffer = "";
      let fullStdout = "";

      // stderr を drain してパイプバッファのオーバーフローを防ぐ
      // (main.py が kickoff() 中に sys.stdout → sys.stderr をリダイレクトするため
      //   CrewAI の verbose ログが大量に流れ込む)
      child.stderr.on("data", (chunk: Buffer) => {
        process.stderr.write(chunk.toString());
      });

      child.stdout.on("data", (chunk: Buffer) => {
        const text = chunk.toString();
        fullStdout += text;
        lineBuffer += text;

        // 完全な行を flush
        const lines = lineBuffer.split("\n");
        lineBuffer = lines.pop() ?? "";

        for (const line of lines) {
          if (line.startsWith(LOG_PREFIX)) {
            // STDOUT_LOG: → event: log としてブラウザに送信
            enqueue("log", { message: line.slice(LOG_PREFIX.length).trim() });
          }
          // TRIP_RESULT は close イベントでルートジオメトリ注入後に送信
        }
      });

      child.on("close", async (code) => {
        if (code !== 0) {
          enqueue("error", { message: `Agent failed (exit code ${code}).` });
          controller.close();
          return;
        }

        // TRIP_RESULT: 行を探す
        const tripLine = fullStdout
          .split("\n")
          .find((l) => l.startsWith(TRIP_PREFIX));

        if (!tripLine) {
          enqueue("error", { message: "Agent returned no result." });
          controller.close();
          return;
        }

        const trip = JSON.parse(tripLine.slice(TRIP_PREFIX.length).trim());

        // ルートジオメトリをサーバーサイドで注入
        if (Array.isArray(trip?.chapters)) {
          enqueue("log", { message: "Fetching route geometries..." });
          await injectRouteGeometries(trip.chapters);
        }

        // event: result で旅行データをブラウザに送信
        enqueue("result", { trip });
        controller.close();
      });
    },
  });

  return new Response(stream, {
    headers: {
      "Content-Type": "text/event-stream",
      "Cache-Control": "no-cache",
      Connection: "keep-alive",
    },
  });
}

設計のポイント

要素 理由
STDOUT_LOG: プレフィックス Python stdout から進捗行を識別する
TRIP_RESULT: プレフィックス 最終 JSON 行を他の stdout と区別する
close イベントでルート注入 全 stdout が揃ってから JSON をパースするため
stderr を drain する Python が大量ログを書き込むとパイプがブロックする

サーバーサイド ルートジオメトリ注入

LLM は大量の座標配列を正確に生成できません。そのため、ルートジオメトリはサーバーサイドで Mapbox Directions API から取得して注入します。

// lib/route-geometry.ts
export interface RouteGeometry {
  type: "LineString";
  coordinates: [number, number][];
}

export async function fetchRouteGeometry(
  from: [number, number],
  to: [number, number],
  token: string
): Promise<RouteGeometry | null> {
  const coords = `${from[0]},${from[1]};${to[0]},${to[1]}`;
  const url = `https://api.mapbox.com/directions/v5/mapbox/driving-traffic/${coords}`
    + `?access_token=${token}&geometries=geojson&overview=full`;

  try {
    const res = await fetch(url);
    if (!res.ok) return null;
    const data = await res.json();
    return (data.routes?.[0]?.geometry as RouteGeometry) ?? null;
  } catch {
    return null;
  }
}

/**
 * 各チャプターに routeGeometry を注入する。
 * 最初のチャプターは null(入口となるルートがない)。
 * エラーは無視 — MapView が直線でフォールバックする。
 */
export async function injectRouteGeometries(
  chapters: Array<{ location: { center: [number, number] }; routeGeometry?: unknown }>
): Promise<void> {
  const token = process.env.NEXT_PUBLIC_MAPBOX_ACCESS_TOKEN ?? "";

  chapters[0].routeGeometry = null;

  for (let i = 1; i < chapters.length; i++) {
    const from = chapters[i - 1].location.center;
    const to = chapters[i].location.center;
    const geometry = await fetchRouteGeometry(from, to, token);
    if (geometry) {
      chapters[i].routeGeometry = geometry;
    }
  }
}

この関数は /api/agent/api/trip の両方で呼ばれます。LLM の出力後に必ずサーバーサイドでルートを補完する設計です。


フォールバック API(/api/trip)

CrewAI パイプラインが失敗した場合(Python 未インストール、タイムアウト等)は、Claude Haiku への単純な1回の API 呼び出しにフォールバックします。

// app/api/trip/route.ts
import Anthropic from "@anthropic-ai/sdk";
import { injectRouteGeometries } from "@/lib/route-geometry";

const client = new Anthropic();

export async function POST(req: NextRequest) {
  const { query } = await req.json();

  const systemPrompt = `You are a trip planning assistant. Create a trip itinerary as JSON.
Return ONLY valid JSON with this exact structure:
{
  "title": "Trip title",
  "description": "Short trip description",
  "chapters": [
    {
      "id": "unique-id",
      "title": "Place name",
      "description": "2-3 sentence description of this place and why it's interesting",
      "schedule": "9:00 AM",
      "location": {
        "center": [longitude, latitude],
        "zoom": 15,
        "pitch": 60,
        "bearing": 0
      },
      "lightPreset": "day",
      "cameraStyle": "orbit"
    }
  ]
}
Rules:
- Include 3-5 chapters
- Use accurate coordinates for real places
- lightPreset must be one of: dawn, day, dusk, night (match the time of day)
- cameraStyle must be one of: orbit, flyover, street, panorama
- zoom: 14-17 for city spots, 12-14 for landmarks
- pitch: 45-75 for cinematic views
- Schedule the day from morning to evening`;

  const message = await client.messages.create({
    model: "claude-haiku-4-5-20251001",
    max_tokens: 2000,
    system: systemPrompt,
    messages: [{ role: "user", content: `Create a trip for: ${query}` }],
  });

  const content = message.content[0];
  if (content.type !== "text") {
    return NextResponse.json({ error: "Failed to generate trip" }, { status: 500 });
  }

  // Claude が JSON をマークダウンフェンスで囲む場合があるため正規表現で抽出
  const jsonMatch = content.text.match(/\{[\s\S]*\}/);
  if (!jsonMatch) {
    return NextResponse.json({ error: "Invalid response format" }, { status: 500 });
  }

  const trip = JSON.parse(jsonMatch[0]);
  if (Array.isArray(trip?.chapters)) {
    await injectRouteGeometries(trip.chapters);
  }
  return NextResponse.json(trip);
}

TripApp の SSE 読み取りロジック

フロントエンドは SSE ストリームを手動でパースして進捗表示と旅行データの受信を処理します。

// components/TripApp.tsx(handleCreateTrip の抜粋)
async function handleCreateTrip(e: React.FormEvent) {
  e.preventDefault();
  setIsCreating(true);
  setProgressMessage("Starting up agents...");

  try {
    // CrewAI マルチエージェントパイプライン(SSE ストリーム)
    const res = await fetch("/api/agent", {
      method: "POST",
      headers: { "Content-Type": "application/json" },
      body: JSON.stringify({ query }),
    });

    if (!res.ok || !res.body) throw new Error("Agent API error");

    const reader = res.body.getReader();
    const decoder = new TextDecoder();
    let buffer = "";
    let agentTrip: Trip | null = null;
    let agentError: string | null = null;

    outer: while (true) {
      const { done, value } = await reader.read();
      if (done) break;
      buffer += decoder.decode(value, { stream: true });

      const lines = buffer.split("\n");
      buffer = lines.pop() ?? "";

      let eventType = "";
      for (const line of lines) {
        if (line.startsWith("event: ")) {
          eventType = line.slice("event: ".length).trim();
        } else if (line.startsWith("data: ")) {
          const payload = JSON.parse(line.slice("data: ".length));
          if (eventType === "log") {
            setProgressMessage(payload.message);  // 進捗をリアルタイム表示
          } else if (eventType === "result") {
            agentTrip = payload.trip;
            break outer;
          } else if (eventType === "error") {
            agentError = payload.message;
            break outer;
          }
        }
      }
    }

    if (agentTrip) {
      setTrip(agentTrip);
      setActiveChapterId(agentTrip.chapters[0]?.id ?? null);
      return;
    }

    if (agentError) throw new Error(agentError);

  } catch (agentErr) {
    // CrewAI 失敗 → /api/trip にフォールバック
    console.warn("CrewAI agent unavailable, falling back to /api/trip:", agentErr);
    setProgressMessage("Falling back to direct API...");

    try {
      const res = await fetch("/api/trip", {
        method: "POST",
        headers: { "Content-Type": "application/json" },
        body: JSON.stringify({ query }),
      });
      const data: Trip = await res.json();
      setTrip(data);
      setActiveChapterId(data.chapters[0]?.id ?? null);
    } catch {
      // 最終フォールバック: サンプルデータ
      setErrorMessage("Could not generate trip. Showing sample data.");
      setTrip(sampleTrip);
      setActiveChapterId(sampleTrip.chapters[0]?.id ?? null);
    }
  } finally {
    setIsCreating(false);
  }
}

フォールバック戦略

CrewAI(/api/agent)
  └─ 失敗 → Claude Haiku にプロンプトを渡して JSON 生成(/api/trip)
                └─ 失敗 → sampleTrip(data/sample-trip.ts)

どの段階でも最低限の動作を保証する堅牢な設計です。


MapView のルートジオメトリ対応

chapter.routeGeometry が存在すれば道路に沿ったルートを、なければ直線を描画します。

// components/MapView.tsx(map.on('load') 内の抜粋)
const coordinates: [number, number][] = [];
trip.chapters.forEach((chapter, i) => {
  const geom = chapter.routeGeometry as {
    type: string;
    coordinates: [number, number][];
  } | null | undefined;

  if (geom?.type === "LineString" && geom.coordinates.length > 0) {
    // 連続するセグメントの共有端点を重複させないように先頭を除く
    const segCoords = i === 0 ? geom.coordinates : geom.coordinates.slice(1);
    coordinates.push(...segCoords);
  } else {
    // ルートジオメトリなし: チャプターの中心点を使用(直線フォールバック)
    coordinates.push(chapter.location.center as [number, number]);
  }
});

map.addSource("route", {
  type: "geojson",
  data: {
    type: "Feature",
    geometry: { type: "LineString", coordinates },
    properties: {},
  },
});

また、旅行が生成された後(trip の props が変わった後)にルートを更新する useEffect も追加します。

// 旅行が変わったときにルートラインを更新(CrewAI 生成後など)
useEffect(() => {
  if (!mapRef.current || !mapLoadedRef.current) return;
  const map = mapRef.current;

  const coordinates: [number, number][] = [];
  trip.chapters.forEach((chapter, i) => {
    const geom = chapter.routeGeometry as { type: string; coordinates: [number, number][] } | null | undefined;
    if (geom?.type === "LineString" && geom.coordinates.length > 0) {
      const segCoords = i === 0 ? geom.coordinates : geom.coordinates.slice(1);
      coordinates.push(...segCoords);
    } else {
      coordinates.push(chapter.location.center as [number, number]);
    }
  });

  const source = map.getSource("route") as mapboxgl.GeoJSONSource | undefined;
  source?.setData({
    type: "Feature",
    geometry: { type: "LineString", coordinates },
    properties: {},
  });
}, [trip]);

環境変数まとめ

# .env.local

# ブラウザ公開(Mapbox GL JS が使用)
NEXT_PUBLIC_MAPBOX_ACCESS_TOKEN=pk.eyJ...

# サーバーサイド専用(Claude API が使用)
ANTHROPIC_API_KEY=sk-ant-...

# Python エージェントの MCP ブリッジ URL(デフォルト: localhost:3000)
MCP_BRIDGE_URL=http://localhost:3000/api/mcp-bridge

動作確認

完成コードはこちら
github.com/akiramur/trip-planner-ai-tutorial-blogpart-3/integration ブランチ

# 0. 依存パッケージをインストール(初回クローン時のみ)
npm install

# 1. Next.js 開発サーバーを起動(MCP ブリッジも含む)
npm run dev

# 2. Python 仮想環境をセットアップ(初回のみ)
cd agents
python -m venv .venv
source .venv/bin/activate
pip install crewai anthropic httpx fastembed

# 3. ブラウザで http://localhost:3000 を開く
# 「A day exploring temples in Kyoto」などと入力して「旅行プランを作成」をクリック

画面下部に進捗メッセージがリアルタイムに表示され(「Searching for spots...」「Planning route...」「Fetching route geometries...」)、完了すると旅行マップが表示されます。

Screenshot 2026-06-19 at 14.45.46.png


まとめ

第1〜3回で構築したアプリの全体像を振り返ります。

アーキテクチャ

page.tsx(サーバー)
  └─ TripApp.tsx(クライアント)
       ├─ MapView.tsx(Mapbox GL JS)
       └─ TimelineSidebar.tsx(チャプターリスト)

/api/agent → Python CrewAI(Curator → Planner → Writer)
/api/trip  → Claude Haiku 直接呼び出し(フォールバック)
/api/mcp-bridge → @mapbox/mcp-server(ForwardGeocodeTool, PoiSearchTool, DirectionsTool)
lib/route-geometry.ts → Mapbox Directions API(ルートジオメトリ注入)

第1回で学んだこと

  • 作成画面 → 旅行ビューの2画面構成
  • Mapbox GL JS と React の統合パターン(useRef、useEffect、クリーンアップ)
  • lightPreset で時刻帯の照明を表現

第2回で学んだこと

  • CrewAI マルチエージェントの分業設計
  • MCP ブリッジ経由で Python から Mapbox API を呼び出す
  • STDOUT_LOG: / TRIP_RESULT: プレフィックスによる通信プロトコル
  • Mapbox Agent Skills(RAG)で Planner の精度を向上

第3回で学んだこと

  • SSE ストリームで進捗をリアルタイム表示
  • サーバーサイドでルートジオメトリを注入(LLM に座標生成を任せない)
  • 3段階フォールバック戦略(CrewAI → Claude 直接 → sampleTrip)
  • MapView が routeGeometry の有無で道路追従 / 直線を切り替える

参考・関連リンク

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?