0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【超入門】ADK Webで「情報収集→推論→意思決定」をサブエージェントで段階実行するマルチAIエージェント作成

Last updated at Posted at 2025-09-15

1. はじめに

Google が提供する ADK(Agent Development Kit) を使うと、LLM(大規模言語モデル)が外部ツールを呼び出しながら課題解決する AIエージェント を、少ないコードで実装できます。本記事では、ADK Web の最小構成をベースに、ルート+サブ3体(情報収集/推論/意思決定) という分かりやすい構成で、実務に近いミニ分析フローを作ってみます。
データは Google Cloud Storage(以下、GCS)に置いた CSV(sales.csv を用います。

1.1 最短ゴール(TL;DR)

  1. 下記の agent.py丸ごとコピペし、<YOUR_BUCKET><YOUR_PREFIX> を自分の環境に置換します。
  2. サンプルの sales.csvgs:////sales.csv にアップロードします。
  3. 端末で adk web を実行し、ブラウザのチャットに 「最新の売上状況と次に打つ手を教えて」 と入力します。
    → エージェントが 情報収集 → 推論 → 意思決定 の順に自動で進め、最後に 【概況】【リスク】【次の一手】 の要約を返します。

1.2 この記事で学べること**

  • AIエージェントの基本概念(LLM+ツール=“行動できる”アプリ)
  • 設計のコツ:責務分離(情報収集/推論/意思決定)シンプルなツールI/O
  • ツール直呼びサブエージェント連携 のアーキテクチャの違い
  • ADK Web での サブエージェント構成実行の流れ
  • GCS に CSV を配置し、安全に読み込むための手順動作確認の方法

2. 全体像と実行の流れ

本記事のエージェントは、次の役割に分割します。

  • ルート(オーケストレーター):全体進行(順序の制御と最終回答の整形)
  • 情報収集エージェント:GCS の sales.csv を読み、KPI(総売上・地域別売上など)を集計
  • 推論エージェント:KPI から 集中リスク(特定地域への売上偏在)や 目標未達 を判定
  • 意思決定エージェント:リスクを踏まえ、次に取るべき施策を日本語で提案

このように責務を分けることで、読みやすさ/テストしやすさ/拡張しやすさが大きく高まります。将来的に「推論ルールを強化する」「データ収集先を BigQuery に変える」といった拡張も、該当エージェントだけを差し替えれば対応できます。

3. アーキテクチャ比較:ツール直呼びサブエージェント連携

本記事では サブエージェント連携 を採用しますが、ツール直呼びサブエージェント連携 のアーキテクチャの違いについてまず整理します。

3.1 仕組みの違い

  • ツール直呼び(ルートがツールを持つ)
    ADK における「ツール」は、外部データ取得や処理を行う モジュール化された関数です。LLM が会話履歴と指示をもとに、どのツールをいつ呼ぶか・引数は何かを決めて実行し、その出力を観察して次の手を考えます。したがって、プロンプトに「この条件でこのツールを使う」「失敗時はこう振る舞う」 を具体的に書くことが重要です。
    image.png

  • サブエージェント連携(Multi-Agent)
    親子関係(sub_agents)を定義して 階層構造を作ると、複数のエージェントが協調してゴールを目指せます。これはモジュール性や再利用性を高め、ワークフロー(順序制御) も設計できます。Workflow 系(例:SequentialAgent)を使えば、LLM に依存せず サブエージェントを 決められた順序で確実に実行できます。
    image.png
    image.png

3.2 メリット / デメリット(実運用視点)

観点 ツール直呼び(ルートのみ) サブエージェント連携
実装スピード 最短(PoC 向け) 初期コストあり
責務分離 ルートに処理が集中し肥大化しやすい 明確(収集/推論/意思決定)で保守性・再利用性◎
観測性/デバッグ 1 箇所にまとまるがステップの区別が曖昧 段ごとに成果物を確認・分岐点を特定しやすい
ガバナンス プロンプト次第で逸脱の余地 各サブに役割を限定でき 越権防止しやすい
呼び出し回数/コスト 少なめで安価になりやすい サブ間移譲・段階実行で増えやすい
連携の安定性 注意点:LLM 主導の移譲では、意図した「連続多段の自動移譲」が行われず、次のエージェントにつながらない場合があります。ワークアラウンドとして 「ルートへ一度戻す」「SequentialAgent で順序固定」「ルートが AgentTool を明示呼び出し」 などを検討します。

3.3 選び方のガイド

  • まず動かす:要件が固まっていない PoC では ツール直呼びで最小構成を素早く作ります。
  • 運用を見据える:順序の保証・監査・再利用が重要なら サブエージェント+Workflow に寄せます。
  • 連携が不安定なとき
    • 順序の固定SequentialAgent を使う。
    • 明示的な呼び分け → ルートが AgentTool/FunctionTool を直接呼ぶ戦略に変更する。
    • 都度の人手確認 → ステップごとに結果を提示し「次へ進む?」の UX を入れて進行を安定化させます。

まとめ

  • ツール直呼びは最短で“動く”が、順序や連携の安定性はプロンプト頼みになります。
  • サブエージェント連携は構造化・再利用・監査に強く、Workflow Agentで順序を保証できます。
  • 実運用では、段階表示+ユーザー確認(HITL)や SequentialAgent での決定的フロー化を組み合わせると安定します。

4. 事前準備

4.1 GCP の有効化と権限

  • GCP プロジェクト:課金が有効であることを確認します。
  • Cloud Storage API:有効化します。
  • 権限(IAM):実行主体(サービスアカウントや実行ユーザー)に roles/storage.objectViewer を付与します。
    ※ 読み取りのみの最小権限にしておくと安全です。

4.2 認証(ローカル開発例)

ローカルから GCS にアクセスさせるには ADC(Application Default Credentials) を用いるのが簡単です。
サービスアカウント鍵(JSON)を用いる場合は、以下の環境変数を設定します。

# PowerShell
$env:GOOGLE_APPLICATION_CREDENTIALS="C:\path\to\sa.json"

# bash
export GOOGLE_APPLICATION_CREDENTIALS=/path/to/sa.json

本番環境(Cloud Run 等)では Workload Identity を使い、JSON鍵を配布しない方式を推奨します。

4.3 依存パッケージのインストール

pip install google-adk google-cloud-storage

5. サンプルデータ(sales.csv)をGCSへアップロード

サンプルデータ(sales.csv)の例:

date,region,product,unit_price,quantity,revenue
2025-09-01,APAC,Notebook,1200,5,5400
2025-09-01,APAC,Pen,100,20,2000
2025-09-02,NA,Notebook,1300,3,3900
2025-09-02,EMEA,Stapler,800,2,1600
2025-09-03,APAC,Pen,100,100,10000
2025-09-03,EMEA,Notebook,1250,4,5000
2025-09-04,NA,Pen,120,50,6000
2025-09-05,EMEA,Notebook,1150,2,2300
2025-09-05,APAC,Stapler,850,3,2550
2025-09-06,NA,Notebook,1350,1,1350

アップロード例(gcloud storage を利用):

gcloud storage cp ./sales.csv gs://<YOUR_BUCKET>/<YOUR_PREFIX>/sales.csv

補足revenue が空欄の行があっても、コード側で unit_price * quantity から補完します。
文字化けする場合は、CSV を UTF-8 で保存するか、後述の ENCODING を変更してください。


6. 実装(ルート+サブ3体で段階実行)

  • agent.py をプロジェクトに作成し、以下を そのまま貼り付けてください。
  • 置換ポイント<YOUR_BUCKET><YOUR_PREFIX> を必ず自分の値に変えてください。
  • モデル名 gemini-2.5-flash は、利用可能なモデルに合わせて変更いただけますが、今回のソースコードでは、 gemini-2.0-flashよりも gemini-2.5-flashの方が想定した結果が返ってきやすいです。
# agent.py
# 依存: pip install google-adk google-cloud-storage
# 認証(ローカル例):
#   PowerShell: $env:GOOGLE_APPLICATION_CREDENTIALS="C:\\path\\to\\sa.json"
#   bash      : export GOOGLE_APPLICATION_CREDENTIALS=/path/to/sa.json
#
# 概要:
# - 情報収集エージェント: gather_sales_kpi(KPI集計)
# - 推論エージェント    : infer_sales_risk(集中リスク/未達判定)
# - 意思決定エージェント: decide_sales_action(次の一手の提案)
# - ルートエージェント  : 上記3つを順に呼び出し、最終回答を整形して返す

import csv
import io
from collections import defaultdict

from google.cloud import storage
from google.adk.agents import Agent

# ==========================
# 設定(※あなたの環境に置換)
# ==========================
GCS_BUCKET = "<YOUR_BUCKET>"            # 例: my-demo-gcs-2025
GCS_PATH   = "<YOUR_PREFIX>/sales.csv"  # 例: data/demo/sales.csv
ENCODING   = "utf-8"

# -----------------------
# 共通: CSV 読み取り関数
# -----------------------
def _read_sales_rows(bucket: str, path: str, encoding: str = ENCODING):
    """GCS から sales.csv を読み込み、1行=辞書の配列で返す。"""
    text = storage.Client().bucket(bucket).blob(path).download_as_text(encoding=encoding)
    rows = list(csv.DictReader(io.StringIO(text)))

    # revenue が空なら unit_price * quantity で補完(最小実装)
    for r in rows:
        if not (r.get("revenue") or "").strip():
            try:
                up = float(r.get("unit_price", 0) or 0)
                q  = float(r.get("quantity", 0) or 0)
                r["revenue"] = str(up * q)
            except Exception:
                r["revenue"] = "0"
    return rows

# ==========================
# ツール関数(各サブエージェントが利用)
# ==========================
def gather_sales_kpi(bucket: str = GCS_BUCKET, path: str = GCS_PATH) -> dict:
    """
    情報収集: CSV を読み、総売上・件数・地域別売上・トップ地域などを返す。
    """
    try:
        rows = _read_sales_rows(bucket, path)
        total_revenue = 0.0
        by_region = defaultdict(float)
        for r in rows:
            rev = float(r.get("revenue", 0) or 0)
            total_revenue += rev
            by_region[(r.get("region") or "UNKNOWN").strip()] += rev

        orders = len(rows)
        top_region, top_rev = ("", 0.0)
        if by_region:
            top_region, top_rev = max(by_region.items(), key=lambda kv: kv[1])
        share = (top_rev / total_revenue) if total_revenue > 0 else 0.0

        return {
            "status": "success",
            "kpi": {
                "orders": orders,
                "total_revenue": round(total_revenue, 2),
                "by_region": [{"region": k, "revenue": round(v, 2)} for k, v in by_region.items()],
                "top_region": top_region,
                "top_region_share": round(share, 4),
            },
            "source": f"gs://{bucket}/{path}",
        }
    except Exception as e:
        return {"status": "error", "error_message": f"gather_sales_kpi failed: {e}"}

def infer_sales_risk(bucket: str = GCS_BUCKET, path: str = GCS_PATH,
                     concentration_threshold: float = 0.7,
                     min_revenue_goal: float = 10_000.0) -> dict:
    """
    推論: KPI を基に簡単なリスクを判定(集中リスク・売上目標未達)。
    ※ 手軽さ重視のため内部で gather_sales_kpi を呼んで再読します。
    """
    try:
        metrics = gather_sales_kpi(bucket, path)
        if metrics.get("status") != "success":
            return metrics
        kpi = metrics["kpi"]
        risks = {}

        warn = []
        if kpi["top_region_share"] >= concentration_threshold:
            warn.append(f"売上の {int(concentration_threshold*100)}% 以上が {kpi['top_region']} に集中")
        if kpi["total_revenue"] < min_revenue_goal:
            warn.append(f"総売上が目標({min_revenue_goal})に未達")

        return {"status": "success", "insight": {"risks": warn, "kpi": kpi}}
    except Exception as e:
        return {"status": "error", "error_message": f"infer_sales_risk failed: {e}"}

def decide_sales_action(bucket: str = GCS_BUCKET, path: str = GCS_PATH,
                        concentration_threshold: float = 0.7,
                        min_revenue_goal: float = 10_000.0) -> dict:
    """
    意思決定: 簡易ルールで「次に取るべきアクション」を提案。
    ※ 手軽さ重視のため内部で infer_sales_risk を呼びます。
    """
    try:
        insight = infer_sales_risk(bucket, path, concentration_threshold, min_revenue_goal)
        if insight.get("status") != "success":
            return insight
        kpi   = insight["insight"]["kpi"]
        risks = insight["insight"]["risks"]

        recs = []
        if any("集中" in r for r in risks):
            recs.append("トップ地域以外での販促を強化(価格/在庫/広告の見直し)")
        if any("未達" in r for r in risks):
            recs.append("高粗利商品の施策(バンドル・まとめ買い割引)を実施")
        if not recs:
            recs.append("現状維持。小規模な A/B テストで改善余地を探索")

        return {"status": "success", "recommendation": {"summary": "".join(recs), "kpi": kpi, "risks": risks}}
    except Exception as e:
        return {"status": "error", "error_message": f"decide_sales_action failed: {e}"}

# ==========================
# サブエージェント定義
# ==========================
# 情報収集エージェント: KPI の収集に専念(gather_sales_kpi のみをツール登録)
information_agent = Agent(
    name="情報収集エージェント",
    model="gemini-2.5-flash",
    description="GCS の sales.csv から売上KPIを収集・要約する専門エージェント。",
    instruction=(
        "あなたはデータ収集専門です。必ず gather_sales_kpi を呼び出してKPIを取得し、"
        "日本語で簡潔に要約してください。余計な推測や助言はせず、収集結果のみを報告します。"
        "要約が終わったら必ず営業分析オーケストレーターに返してください。"
    ),
    tools=[gather_sales_kpi],
)

# 推論エージェント: リスク判定に専念(infer_sales_risk のみ)
inference_agent = Agent(
    name="推論エージェント",
    model="gemini-2.5-flash",
    description="売上KPIに基づいて集中リスクや目標未達を判定するエージェント。",
    instruction=(
        "あなたは推論専門です。必ず infer_sales_risk を呼び出し、"
        "リスクの有無と理由を日本語で簡潔に説明してください。助言や意思決定は行いません。"
        "説明が終わったら必ず営業分析オーケストレーターに返してください。"
    ),
    tools=[infer_sales_risk],
)

# 意思決定エージェント: アクション提案に専念(decide_sales_action のみ)
decision_agent = Agent(
    name="意思決定エージェント",
    model="gemini-2.5-flash",
    description="推論結果に基づき、次に取るべき具体的な施策を提案するエージェント。",
    instruction=(
        "あなたは意思決定専門です。必ず decide_sales_action を呼び出し、"
        "日本語で実行可能な施策を提案してください。KPIとリスクを短く併記し、結論を明確に示します。"
        "提案が終わったら必ず営業分析オーケストレーターに返してください。"
    ),
    tools=[decide_sales_action],
)

# ==========================
# ルート(オーケストレーター)エージェント
# ==========================
root_agent = Agent(
    name="営業分析オーケストレーター",
    model="gemini-2.5-flash",
    description="情報収集→推論→意思決定を段階的に進め、最終回答を組み立てる窓口エージェント。",
    instruction=(
        "ユーザーの問いに対し、次の順序で必ず進めてください:\n"
        "1) 情報収集エージェントを呼び出し、KPIを収集する。\n"
        "2) 推論エージェントを呼び出し、リスクを判定する。\n"
        "3) 意思決定エージェントを呼び出し、具体的な施策を提案する。\n\n"
        "最終的な回答は日本語の自然文のみで、上記3段階の結果を統合して必ず次の3セクションで出力します:\n"
        "【概況】…(総売上・トップ地域など)\n"
        "【リスク】…(集中・未達など)\n"
        "【次の一手】…(実行可能な施策)\n"
        "function_call などのシステムメタ情報は出力に含めないでください。"
    ),
    tools=[],
    sub_agents=[information_agent, inference_agent, decision_agent],
)

7. 実行と確認手順

  1. 上記 agent.py を保存します。

  2. 端末でプロジェクトのディレクトリに移動し、以下を実行します。

    adk web
    
  3. コンソールに表示される URL(通常は http://127.0.0.1:8000)をブラウザで開きます。

  4. チャットに 「最新の売上状況と次に打つ手を教えて」 と入力します。

    • 内部では、情報収集 → 推論 → 意思決定 の順にサブエージェントが呼び出され、
      最後に 【概況】【リスク】【次の一手】 の要約が返ります。
      スクリーンショット 2025-09-15 204119.png
      スクリーンショット 2025-09-15 204212.png
      スクリーンショット 2025-09-15 204232.png
  5. 左のタブからEvents>>Traceを開くと、どのサブエージェントがどのツールをいつ呼んだかが時系列で確認できます。
    image.png

image.png

6.チャット画面から対象のエージェント、アクションを選択するとその時にどのエージェントが何のアクションをしているのかを確認することができます。

image.png


8. まとめ

本記事では、ADK Web を用いて ルート+サブ3体 のエージェント構成を作成し、
GCS の CSV を読み → KPI を集計 → リスクを推論 → 次の一手を提案 する一連の流れを、最小コードで体験いただきました。
この構成は、今後の拡張(ルール強化・データソース変更・権限分離)にも対応しやすく、実務へ将来的に展開しやすい構造です。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?