はじめに
マルチエージェントシステム内では複数のエージェントが動いています。
その中のAgent間のやり取りをブラックボックスにしたまま開発を進めると、
デバッグ/チューニングも手探りになってしまいますね。
Google A2A(Agent-to-Agent)プロトコル を使ってエージェントを3体実装し、
その通信をリアルタイムで可視化するモニタリングツールを作って中を確認します!
A2Aとは
まず、A2A(Agent-to-Agent)はGoogleが2025年に公開したオープンプロトコルで、異なるフレームワーク・言語で実装されたエージェント同士が標準的な方法で通信するための仕様のことです。
コアにあるのは以下の2つだと思います。
Agent Card
各エージェントが /.well-known/agent-card.json で自分の能力を宣言する。名前、スキル、サポートするプロトコルバインディング(JSON-RPC / REST / gRPC)、ストリーミング対応可否などが書かれていて、この情報を元にやり取りを行うわけですね。
{
"name": "Writer",
"supportedInterfaces": [{
"url": "http://localhost:8002/",
"protocolBinding": "JSONRPC",
"protocolVersion": "1.0"
}],
"capabilities": { "streaming": true },
"skills": [{ "id": "write", "name": "Writer" }]
}
メッセージング
話しかける前にAgent Cardを取得し(サービスディスカバリー)、その情報に基づいて SendMessage を投げて常に GET /.well-known/agent-card.json → POST / の2ステップになっています。
システム構成
今回はストーリーを計画する人、お話を考える人、お話を評価する人を作成し、
それぞれのやり取りを確認します。
Story Planner (8001)
│ story_plan
▼
Writer (8002) ◄──── feedback ────┐
│ chapter │
▼ │
Reviewer (8003) ──── feedback ───►│
最終ターン後: Writer → Story Planner (final_story)
用意するエージェントは3体。
Reviewer→Writer→Reviewerのループを任意ターン繰り返し、
最後にWriterが完成原稿をStory Plannerに返します。
技術スタック
| 役割 | 技術 |
|---|---|
| A2A SDK | a2a-sdk 1.1.0 |
| バックエンド | Python 3.12 + FastAPI 0.136 |
| HTTP クライアント | httpx 0.28 |
| フロントエンド | React 19 + TypeScript + Vite 8 |
| Sequence Diagram | mermaid 11 |
| リアルタイム通信 | SSE(Server-Sent Events) |
ファイル構成
backend/
agents.py # AgentExecutor 実装(Planner / Writer / Reviewer)
agent_servers.py # 3体のA2A準拠 FastAPI アプリをビルド
orchestrator.py # 通信ループ制御 + HTTP キャプチャ
message_log.py # 全通信ログ + SSE 配信
main.py # 4サーバーを1プロセスで起動(8000-8003)
frontend/src/
App.tsx
components/
AgentGraph.tsx # SVG グラフ + アニメーション
Timeline.tsx # 時系列リスト
SequenceDiagram.tsx # Mermaid 自動生成
MessageDetail.tsx # Content / Protocol(HTTP) タブ
Agent実装
A2A SDKの AgentExecutor を継承し、execute() メソッドに処理を書きます!
SDKのドキュメントには2つのパターンが示されています。
-
Immediate response —
Messageを1つエンキューして返す(同期的) -
Asynchronous —
Taskをエンキューして非同期にTaskStatusUpdateEvent/TaskArtifactUpdateEventを流す
今回はシンプルに Immediate response を使いました。
from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events import EventQueue
from a2a.helpers.proto_helpers import new_text_message
class WriterExecutor(AgentExecutor):
async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
text = context.get_user_input() # 受け取ったメッセージ
reply = generate_text(text) # ここに LLM 呼び出しが入る想定
msg = new_text_message(
text=reply,
context_id=context.context_id,
task_id=context.task_id,
)
await event_queue.enqueue_event(msg)
async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
pass
Asynchronousパターンを使う場合、
start_work()を呼ぶ前に必ずTaskオブジェクト自体をキューに積む必要がある。ここを踏み外すとエラーになる。
サーバー登録
各エージェントを独立したFastAPIアプリとして起動します。
DefaultRequestHandlerV2 がJSON-RPCのルーティングを全部担います。
from a2a.server.request_handlers import DefaultRequestHandlerV2
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
from a2a.server.routes.fastapi_routes import add_a2a_routes_to_fastapi
def build_writer_app(port: int = 8002) -> FastAPI:
card = AgentCard(
name="Writer",
supported_interfaces=[AgentInterface(
protocol_binding=TransportProtocol.JSONRPC,
protocol_version=PROTOCOL_VERSION_1_0,
url=f"http://localhost:{port}/",
)],
capabilities=AgentCapabilities(streaming=True),
...
)
handler = DefaultRequestHandlerV2(
agent_executor=WriterExecutor(),
task_store=InMemoryTaskStore(),
agent_card=card,
)
app = FastAPI()
add_a2a_routes_to_fastapi(
app,
agent_card_routes=create_agent_card_routes(card),
jsonrpc_routes=create_jsonrpc_routes(handler, rpc_url="/"),
)
return app
4つのサーバーを1プロセスで動かすため、asyncio.gather で並列起動しました。
async def main():
await asyncio.gather(
serve(monitor, 8000), # モニタリング API
serve(planner_app, 8001),
serve(writer_app, 8002),
serve(reviewer_app, 8003),
)
HTTPキャプチャの仕組み
A2Aの通信を観測するために httpx.AsyncBaseTransport をラップするカスタムトランスポートを実装しました。
class CapturingTransport(httpx.AsyncBaseTransport):
def __init__(self, wrapped: httpx.AsyncBaseTransport):
self._wrapped = wrapped
self.last: ProtocolData | None = None
async def handle_async_request(self, request: httpx.Request) -> httpx.Response:
req_body = request.content.decode("utf-8", errors="replace")
response = await self._wrapped.handle_async_request(request)
# レスポンスボディはストリームなので一度読み切ってバッファに持つ
body_bytes = await response.aread()
self.last = ProtocolData(
http_method=request.method,
http_url=str(request.url),
request_headers=dict(request.headers),
request_body=req_body,
response_status=response.status_code,
response_headers=dict(response.headers),
response_body=body_bytes.decode("utf-8", errors="replace"),
)
# 読み切ったバイト列で Response を再構築して返す
return httpx.Response(
status_code=response.status_code,
headers=response.headers,
content=body_bytes,
request=request,
)
ストリームは一度読んだら終わりなので、そのまま渡すと呼び出し元でボディが空になります。
response.aread() でボディを消費した後、同じバイト列で httpx.Response を再構築して返します。
これにより各通信の実際のJSON-RPCペイロードがフロントエンドまで届くようになります。
作成物と実際に観測できる通信
UIは以下です。
左:通信を選択、
中:mermaidで全体の流れを可視化
右:左で選択した通信内容を詳細に表示
SSEエンドポイントは既存メッセージをリプレイしてから新着をストリーミングするようになってます。
| エリア | 役割 |
|---|---|
| Agent Graph | SVGでエージェントをノード表示。通信発生時にエッジをハイライト+アニメーション |
| Timeline | 全通信を時系列で表示。最新が上。SSEでリアルタイム更新 |
| Sequence Diagram | Mermaidで自動生成。ターンごとに更新 |
| Message Detail | TimelineをクリックするとContentタブ(内容)とProtocol(HTTP)タブ(生のJSON-RPC)を表示 |
実行ログから見えるA2Aの通信構造
フェーズ1:サービスディスカバリー(ループ開始前に1回)
最初にAgent Cardを取得し(サービスディスカバリー)、エージェントの能力を確認しています。
GET http://localhost:8001/.well-known/agent-card.json → 200
GET http://localhost:8002/.well-known/agent-card.json → 200
GET http://localhost:8003/.well-known/agent-card.json → 200
フェーズ2:実行(ターンごとに POST)
お互いに情報をやり取りしています。
POST http://localhost:8002/ → 200 # Story Planner → Writer
POST http://localhost:8003/ → 200 # Writer → Reviewer
POST http://localhost:8002/ → 200 # Reviewer → Writer
...(3ターン繰り返し)
POST http://localhost:8001/ → 200 # Writer → Story Planner(最終)
1つ抜粋すると
{
"method": "SendMessage",
"params": {
"message": {
"messageId": "672541dc-...",
"role": "ROLE_USER",
"parts": [{ "text": "騎士ナイトは夜の森で奇妙な歌声を聞いた。" }]
}
},
"jsonrpc": "2.0",
"id": "1"
}
{
"result": {
"message": {
"messageId": "5c830d9a-...",
"role": "ROLE_AGENT",
"parts": [{ "text": "危機感を高めて。" }]
}
},
"jsonrpc": "2.0",
"id": "1"
}
JSON-RPC 2.0の標準フォーマットの上に、A2Aのメッセージ構造(role / parts / messageId)が乗っている形になっています。
ログから読み取れること
1. Agent Cardの capabilities.streaming: true と実際の動作は別物
Agent Cardには "streaming": true と宣言しています。
def _make_card(name: str, description: str, port: int, skill_id: str) -> AgentCard:
...
capabilities=AgentCapabilities(streaming=True),
...
ただ、クライアント側で ClientConfig(streaming=False) を設定しています。
実際はSendMessage(非ストリーミング)で通信していてこのことから、
クライアント側の設定が優先され、能力の申告と実際の使い方は切り離されていることがわかります。
2. 制御はエージェントではなくオーケストレーターが持つ
各エージェントは来たリクエストに返答するだけで、「次に誰に送る」を知りません。
通信の順序とルーティングは orchestrator.py が全部制御しており、
A2Aは通信の標準化プロトコルであって、エージェントの自律性を保証するものではないということですね。
3. 通信は直列
POSTが返るまで次を投げない設計なので今回の8メッセージは完全にシーケンシャルに実行されましたが、
オーケストレーター側の変更で並列化も可能で、プロトコルレベルでは並列通信を妨げるものはありませんでした。
4. A2Aはあくまで通信レイヤー
現状のWriterはフィードバックの内容を無視してランダムに文章を生成しています。
これはLLMを使っていないからですが、実際のシステムでは context.get_user_input() で
受け取ったテキストをLLMのプロンプトに渡します。A2Aは通信レイヤーの責務を担うだけで、
エージェントのロジックは完全に実装者に委ねられているわけですね。
まとめ
A2AはHTTPの上に乗るプロトコルレイヤーで、
エージェントが「何を考えるか」ではなく「どう話すか」を標準化するものです。
今回はモニタリングツールを作ってその話し方を可視化しました。
Protocol(HTTP)を見る事でAgent Cardの取得からSendMessageのJSON-RPCペイロードまで
「A2Aとは何か」をコードを読まずに体感できました。
LLMを組み込んで本当のエージェントループにすると、
エージェントが何をどう判断したかという別の観点が出てくるのでそこも確認したいですね。





