1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめに

マルチエージェントシステム内では複数のエージェントが動いています。
その中のAgent間のやり取りをブラックボックスにしたまま開発を進めると、
デバッグ/チューニングも手探りになってしまいますね。

Google A2A(Agent-to-Agent)プロトコル を使ってエージェントを3体実装し、
その通信をリアルタイムで可視化するモニタリングツールを作って中を確認します!

A2Aとは

まず、A2A(Agent-to-Agent)はGoogleが2025年に公開したオープンプロトコルで、異なるフレームワーク・言語で実装されたエージェント同士が標準的な方法で通信するための仕様のことです。

コアにあるのは以下の2つだと思います。

Agent Card

各エージェントが /.well-known/agent-card.json で自分の能力を宣言する。名前、スキル、サポートするプロトコルバインディング(JSON-RPC / REST / gRPC)、ストリーミング対応可否などが書かれていて、この情報を元にやり取りを行うわけですね。

image.png

{
  "name": "Writer",
  "supportedInterfaces": [{
    "url": "http://localhost:8002/",
    "protocolBinding": "JSONRPC",
    "protocolVersion": "1.0"
  }],
  "capabilities": { "streaming": true },
  "skills": [{ "id": "write", "name": "Writer" }]
}

メッセージング

話しかける前にAgent Cardを取得し(サービスディスカバリー)、その情報に基づいて SendMessage を投げて常に GET /.well-known/agent-card.jsonPOST / の2ステップになっています。

image.png

システム構成

今回はストーリーを計画する人、お話を考える人、お話を評価する人を作成し、
それぞれのやり取りを確認します。

Story Planner (8001)
       │  story_plan
       ▼
   Writer (8002) ◄──── feedback ────┐
       │  chapter                   │
       ▼                            │
  Reviewer (8003) ──── feedback ───►│

  最終ターン後: Writer → Story Planner (final_story)

用意するエージェントは3体。
Reviewer→Writer→Reviewerのループを任意ターン繰り返し、
最後にWriterが完成原稿をStory Plannerに返します。

技術スタック

役割 技術
A2A SDK a2a-sdk 1.1.0
バックエンド Python 3.12 + FastAPI 0.136
HTTP クライアント httpx 0.28
フロントエンド React 19 + TypeScript + Vite 8
Sequence Diagram mermaid 11
リアルタイム通信 SSE(Server-Sent Events)

ファイル構成

backend/
  agents.py        # AgentExecutor 実装(Planner / Writer / Reviewer)
  agent_servers.py # 3体のA2A準拠 FastAPI アプリをビルド
  orchestrator.py  # 通信ループ制御 + HTTP キャプチャ
  message_log.py   # 全通信ログ + SSE 配信
  main.py          # 4サーバーを1プロセスで起動(8000-8003)

frontend/src/
  App.tsx
  components/
    AgentGraph.tsx       # SVG グラフ + アニメーション
    Timeline.tsx         # 時系列リスト
    SequenceDiagram.tsx  # Mermaid 自動生成
    MessageDetail.tsx    # Content / Protocol(HTTP) タブ

Agent実装

A2A SDKの AgentExecutor を継承し、execute() メソッドに処理を書きます!
SDKのドキュメントには2つのパターンが示されています。

  • Immediate responseMessage を1つエンキューして返す(同期的)
  • AsynchronousTask をエンキューして非同期に TaskStatusUpdateEvent / TaskArtifactUpdateEvent を流す

今回はシンプルに Immediate response を使いました。

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.helpers.proto_helpers import new_text_message

class WriterExecutor(AgentExecutor):
    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        text = context.get_user_input()  # 受け取ったメッセージ
        reply = generate_text(text)      # ここに LLM 呼び出しが入る想定
        msg = new_text_message(
            text=reply,
            context_id=context.context_id,
            task_id=context.task_id,
        )
        await event_queue.enqueue_event(msg)

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
        pass

Asynchronousパターンを使う場合、start_work() を呼ぶ前に必ず Task オブジェクト自体をキューに積む必要がある。ここを踏み外すとエラーになる。

サーバー登録

各エージェントを独立したFastAPIアプリとして起動します。
DefaultRequestHandlerV2 がJSON-RPCのルーティングを全部担います。

from a2a.server.request_handlers import DefaultRequestHandlerV2
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
from a2a.server.routes.fastapi_routes import add_a2a_routes_to_fastapi

def build_writer_app(port: int = 8002) -> FastAPI:
    card = AgentCard(
        name="Writer",
        supported_interfaces=[AgentInterface(
            protocol_binding=TransportProtocol.JSONRPC,
            protocol_version=PROTOCOL_VERSION_1_0,
            url=f"http://localhost:{port}/",
        )],
        capabilities=AgentCapabilities(streaming=True),
        ...
    )
    handler = DefaultRequestHandlerV2(
        agent_executor=WriterExecutor(),
        task_store=InMemoryTaskStore(),
        agent_card=card,
    )
    app = FastAPI()
    add_a2a_routes_to_fastapi(
        app,
        agent_card_routes=create_agent_card_routes(card),
        jsonrpc_routes=create_jsonrpc_routes(handler, rpc_url="/"),
    )
    return app

4つのサーバーを1プロセスで動かすため、asyncio.gather で並列起動しました。

async def main():
    await asyncio.gather(
        serve(monitor, 8000),    # モニタリング API
        serve(planner_app, 8001),
        serve(writer_app, 8002),
        serve(reviewer_app, 8003),
    )

HTTPキャプチャの仕組み

A2Aの通信を観測するために httpx.AsyncBaseTransport をラップするカスタムトランスポートを実装しました。

class CapturingTransport(httpx.AsyncBaseTransport):
    def __init__(self, wrapped: httpx.AsyncBaseTransport):
        self._wrapped = wrapped
        self.last: ProtocolData | None = None

    async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
        req_body = request.content.decode("utf-8", errors="replace")

        response = await self._wrapped.handle_async_request(request)

        # レスポンスボディはストリームなので一度読み切ってバッファに持つ
        body_bytes = await response.aread()

        self.last = ProtocolData(
            http_method=request.method,
            http_url=str(request.url),
            request_headers=dict(request.headers),
            request_body=req_body,
            response_status=response.status_code,
            response_headers=dict(response.headers),
            response_body=body_bytes.decode("utf-8", errors="replace"),
        )

        # 読み切ったバイト列で Response を再構築して返す
        return httpx.Response(
            status_code=response.status_code,
            headers=response.headers,
            content=body_bytes,
            request=request,
        )

ストリームは一度読んだら終わりなので、そのまま渡すと呼び出し元でボディが空になります。
response.aread() でボディを消費した後、同じバイト列で httpx.Response を再構築して返します。
これにより各通信の実際のJSON-RPCペイロードがフロントエンドまで届くようになります。

作成物と実際に観測できる通信

UIは以下です。
左:通信を選択、
中:mermaidで全体の流れを可視化
右:左で選択した通信内容を詳細に表示
SSEエンドポイントは既存メッセージをリプレイしてから新着をストリーミングするようになってます。

image.png

エリア 役割
Agent Graph SVGでエージェントをノード表示。通信発生時にエッジをハイライト+アニメーション
Timeline 全通信を時系列で表示。最新が上。SSEでリアルタイム更新
Sequence Diagram Mermaidで自動生成。ターンごとに更新
Message Detail TimelineをクリックするとContentタブ(内容)とProtocol(HTTP)タブ(生のJSON-RPC)を表示

実行ログから見えるA2Aの通信構造

フェーズ1:サービスディスカバリー(ループ開始前に1回)

最初にAgent Cardを取得し(サービスディスカバリー)、エージェントの能力を確認しています。

GET http://localhost:8001/.well-known/agent-card.json → 200
GET http://localhost:8002/.well-known/agent-card.json → 200
GET http://localhost:8003/.well-known/agent-card.json → 200

フェーズ2:実行(ターンごとに POST)

お互いに情報をやり取りしています。

POST http://localhost:8002/  → 200  # Story Planner → Writer
POST http://localhost:8003/  → 200  # Writer → Reviewer
POST http://localhost:8002/  → 200  # Reviewer → Writer
  ...(3ターン繰り返し)
POST http://localhost:8001/  → 200  # Writer → Story Planner(最終)

1つ抜粋すると

①ストーリーを書く人から評価する人に対して通信を行い、
image.png

②自分が考えたストーリーをtext partで送り、
image.png

{
  "method": "SendMessage",
  "params": {
    "message": {
      "messageId": "672541dc-...",
      "role": "ROLE_USER",
      "parts": [{ "text": "騎士ナイトは夜の森で奇妙な歌声を聞いた。" }]
    }
  },
  "jsonrpc": "2.0",
  "id": "1"
}

③ストーリを評価する人はそのレビューを応答しています。
image.png

{
  "result": {
    "message": {
      "messageId": "5c830d9a-...",
      "role": "ROLE_AGENT",
      "parts": [{ "text": "危機感を高めて。" }]
    }
  },
  "jsonrpc": "2.0",
  "id": "1"
}

JSON-RPC 2.0の標準フォーマットの上に、A2Aのメッセージ構造(role / parts / messageId)が乗っている形になっています。

ログから読み取れること

1. Agent Cardの capabilities.streaming: true と実際の動作は別物

Agent Cardには "streaming": true と宣言しています。

def _make_card(name: str, description: str, port: int, skill_id: str) -> AgentCard:
    ...
        capabilities=AgentCapabilities(streaming=True),
    ...

ただ、クライアント側で ClientConfig(streaming=False) を設定しています。
実際はSendMessage(非ストリーミング)で通信していてこのことから、
クライアント側の設定が優先され、能力の申告と実際の使い方は切り離されていることがわかります。

2. 制御はエージェントではなくオーケストレーターが持つ

各エージェントは来たリクエストに返答するだけで、「次に誰に送る」を知りません。
通信の順序とルーティングは orchestrator.py が全部制御しており、
A2Aは通信の標準化プロトコルであって、エージェントの自律性を保証するものではないということですね。

3. 通信は直列

POSTが返るまで次を投げない設計なので今回の8メッセージは完全にシーケンシャルに実行されましたが、
オーケストレーター側の変更で並列化も可能で、プロトコルレベルでは並列通信を妨げるものはありませんでした。

4. A2Aはあくまで通信レイヤー

現状のWriterはフィードバックの内容を無視してランダムに文章を生成しています。
これはLLMを使っていないからですが、実際のシステムでは context.get_user_input()
受け取ったテキストをLLMのプロンプトに渡します。A2Aは通信レイヤーの責務を担うだけで、
エージェントのロジックは完全に実装者に委ねられているわけですね。

まとめ

A2AはHTTPの上に乗るプロトコルレイヤーで、
エージェントが「何を考えるか」ではなく「どう話すか」を標準化するものです。
今回はモニタリングツールを作ってその話し方を可視化しました。

Protocol(HTTP)を見る事でAgent Cardの取得からSendMessageのJSON-RPCペイロードまで
「A2Aとは何か」をコードを読まずに体感できました。

LLMを組み込んで本当のエージェントループにすると、
エージェントが何をどう判断したかという別の観点が出てくるのでそこも確認したいですね。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?