3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

【Vol.2 実装編】 Gemini × BigQueryでつくる「自律型データ分析エージェント」 〜LangChain活用編〜

Posted at

前回の 【Vol.1 設計編】 では、GeminiとBigQueryを組み合わせた「自律型データ分析エージェント」のアーキテクチャ設計や要件定義、そしてLLMに正しく意図を伝えるためのプランニングの重要性について解説しました。

今回は続きとなる 【Vol.2 実装編】 です。 設計編で描いた「設計図」を基に、実際にPythonコードを用いてエージェントを構築する手順を解説します。

Vol.1ではGoogleのAgent Development Kit (ADK) にも触れましたが、今回のVol.2では、まずはLLMアプリ開発のデファクトスタンダードである LangChain を用いた実装方法を紹介します。LangChainのエコシステムを活用することで、ReActパターンやTool利用の仕組みを効率的に構築・理解することができます。

なお、ADK(Agent Development Kit)を用いた実装 についても、別途記事化を予定しています。

それでは、LangChainとGeminiを使って、自然言語でデータを分析できるエージェントをハンズオン形式で作っていきましょう。

LangChainとは?

特徴:

  • 汎用性: OpenAI, Anthropic, Google等、あらゆるLLMに対応
  • エコシステム: 豊富なプラグインとコミュニティツール
  • 仕組み: @tool デコレータ等で独自ロジックを構築

適しているケース:

  • 複数のLLMを比較・検証したい
  • 高度なカスタマイズが必要なプロトタイプ開発

Reference Links:

Google ADK (Vertex AI Agent Builder)とは?

特徴:

  • Google特化: Vertex AI / Google Cloud に完全最適化
  • ネイティブ統合: GCPサービス連携がスムーズ
  • エンタープライズ: セキュリティ、監査、SLAが充実

適しているケース:

  • Google Cloud エコシステムで完結させたい
  • 本番環境での安定性と保守性を最優先する

Reference Links:

徹底比較:メリットとデメリット

項目 LangChain (自由度) Google ADK (安定性)
メリット ✅ 柔軟なツール作成 ✅ マルチプロバイダー ✅ 巨大なコミュニティ ✅ GCP完全最適化 ✅ 管理コスト低 ✅ 標準化されたAPI
デメリット ❌ 設定・保守が複雑 ❌ 他社LLM利用不可 ❌ カスタマイズ制限あり

Reference Links:

【実例】BigQuery AI Agent の構築

LangChain × Vertex AI を使用した自然言語データ分析。

システムのゴール:

  1. ユーザーが日本語で質問
    • 「2025年7月の上昇銘柄は?」
  2. AIが自律的にBigQueryへクエリ実行
  3. 分析結果を要約して回答

Reference Links:

システムアーキテクチャ

  1. 頭脳 (LLM): ChatVertexAI (Gemini-2.5-pro)
    • 思考と判断を担当
  2. 手足 (Tools): LangChain @tool
    • get_table_schema: テーブル構造を確認
    • run_query: SQLを実行してデータ取得
  3. 制御 (Executor): AgentExecutor
    • 頭脳と手足を連携させる司令塔

Reference Links:

コード全文

"""
BigQuery AI Agent サンプル
LangChainとVertex AIを使用してBigQueryデータを分析するエージェント
"""

import json
import logging
import os
from typing import Any, Dict

from google.cloud import bigquery

# LangChain関連のインポート
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import tool
from langchain_google_vertexai import ChatVertexAI

# 定数定義
LOCATION = "us-central1"
MODEL_NAME = "gemini-2.5-pro"

# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class BigQueryConnection:
    """BigQuery接続を管理するクラス"""

    def __init__(self, project_id: str = None):
        self.project_id = project_id or os.getenv("GCP_PROJECT_ID", "")
        self.client = bigquery.Client(project=self.project_id)
        logger.info(f"BigQuery接続初期化完了 - Project: {self.project_id}")

    def get_table_schema(self, dataset_id: str, table_id: str) -> Dict[str, Any]:
        """テーブルスキーマを取得"""
        try:
            table_ref = f"{self.project_id}.{dataset_id}.{table_id}"
            table = self.client.get_table(table_ref)

            schema_info = []
            for field in table.schema:
                schema_info.append(
                    {
                        "name": field.name,
                        "type": field.field_type,
                        "mode": field.mode,
                        "description": field.description or "",
                    }
                )

            return {"success": True, "table_ref": table_ref, "schema": schema_info, "num_rows": table.num_rows}
        except Exception as e:
            return {"success": False, "error": str(e)}

    def execute_query(self, query: str) -> Dict[str, Any]:
        """SQLクエリを実行"""
        try:
            query_job = self.client.query(query)
            results = list(query_job.result())

            return {"success": True, "row_count": len(results), "data": [dict(row) for row in results]}
        except Exception as e:
            return {"success": False, "error": str(e), "query": query}

# BigQuery接続インスタンスを作成
bq_connection = BigQueryConnection("xxx")

@tool
def get_table_schema(dataset_id: str, table_id: str) -> str:
    """
    BigQueryテーブルのスキーマ情報を取得します。

    Args:
        dataset_id: データセット名
        table_id: テーブル名

    Returns:
        テーブルのスキーマ情報(JSON形式の文字列)
    """
    result = bq_connection.get_table_schema(dataset_id, table_id)
    return json.dumps(result, ensure_ascii=False, indent=2)

@tool
def run_query(query: str) -> str:
    """
    BigQueryでSQLクエリを実行します。

    Args:
        query: 実行するSQLクエリ(BigQuery Standard SQL)

    Returns:
        クエリ実行結果(JSON形式の文字列)
    """
    result = bq_connection.execute_query(query)
    return json.dumps(result, ensure_ascii=False, indent=2)

def get_agent_executor():
    """AI AgentのExecutorを作成"""
    llm = ChatVertexAI(model_name=MODEL_NAME, location=LOCATION, temperature=0)
    tools = [get_table_schema, run_query]

    system_instruction = """
    あなたはBigQuery分析エージェントです。

    1. `get_table_schema` でテーブル構造を確認する。
    2. `run_query` でSQLを実行する。(BigQuery Standard SQL)
    3. 結果が得られたら、**以下のJSONフォーマットのみ**を出力して終了する。

    ```json
    {{
        "answer": "分析結果の要約テキスト",
        "data": [ ...SQLの結果データ... ]
    }}
    ```
    """

    # プロンプトテンプレートの修正
    prompt = ChatPromptTemplate.from_messages(
        [
            ("system", system_instruction),
            ("user", "{input}"),
            MessagesPlaceholder(variable_name="agent_scratchpad"),
        ]
    )

    agent = create_tool_calling_agent(llm, tools, prompt)
    return AgentExecutor(agent=agent, tools=tools, verbose=True)

def main():
    """メイン実行関数"""
    try:
        print("=== BigQuery AI Agent サンプル ===")

        # AIエージェントの初期化
        agent_executor = get_agent_executor()

        # ユーザーからの質問
        # question = """
        # xxx プロジェクトの xxx データセットにある
        # stocks_daily テーブルを使って、
        # 2025年の月別の登録数を集計して教えてください。
        # """

        question = """
        xxx プロジェクトの xxx データセットにある
        stocks_daily テーブルを使って、
        2025年7月の月初から月末までで上昇率が一番高い銘柄を教えてください。
        """

        print(f"Question: {question.strip()}")
        print("-" * 50)

        # AIエージェントに質問を投げる
        response = agent_executor.invoke({"input": question})

        # 結果を表示
        print("=== 結果 ===")
        print(response["output"])

    except Exception as e:
        logger.error(f"エラーが発生しました: {e}")
        print(f"エラー: {e}")

if __name__ == "__main__":
    main()

コード解説

1. 初期設定・インポート部分


"""
BigQuery AI Agent サンプル
LangChainとVertex AIを使用してBigQueryデータを分析するエージェント
"""

import json
import logging
import os
from typing import Any, Dict

from google.cloud import bigquery

# LangChain関連のインポート
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain.tools import tool
from langchain_google_vertexai import ChatVertexAI

やっていること:

  • 必要なライブラリをインポート
  • BigQueryとの接続用ライブラリ
  • LangChainのエージェント機能
  • Vertex AIのGeminiモデル接続機能

2. 設定・定数定義

# 定数定義
LOCATION = "us-central1"
MODEL_NAME = "gemini-2.5-pro"

# ログ設定
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

やっていること:

  • Vertex AIのリージョンとモデル名を定義
  • ログ出力の設定(処理の経過を記録)

3. BigQuery接続クラス

class BigQueryConnection:
    """BigQuery接続を管理するクラス"""

    def __init__(self, project_id: str = None):
        self.project_id = project_id or os.getenv("GCP_PROJECT_ID", "")
        self.client = bigquery.Client(project=self.project_id)
        logger.info(f"BigQuery接続初期化完了 - Project: {self.project_id}")

やっていること:

  • BigQueryデータベースへの接続を管理
  • プロジェクトID「xxx」を指定
  • 実際のデータベース操作の準備

3-1. テーブル構造取得メソッド

def get_table_schema(self, dataset_id: str, table_id: str) -> Dict[str, Any]:
    """テーブルスキーマを取得"""
    try:
        table_ref = f"{self.project_id}.{dataset_id}.{table_id}"
        table = self.client.get_table(table_ref)

        schema_info = []
        for field in table.schema:
            schema_info.append(
                {
                    "name": field.name,
                    "type": field.field_type,
                    "mode": field.mode,
                    "description": field.description or "",
                }
            )

        return {"success": True, "table_ref": table_ref, "schema": schema_info, "num_rows": table.num_rows}
    except Exception as e:
        return {"success": False, "error": str(e)}

やっていること:

  • 指定されたテーブルの構造情報を取得
  • カラム名、データ型、説明などを収集
  • エラーハンドリングで安全な処理

3-2. クエリ実行メソッド

def execute_query(self, query: str) -> Dict[str, Any]:
    """SQLクエリを実行"""
    try:
        query_job = self.client.query(query)
        results = list(query_job.result())

        return {"success": True, "row_count": len(results), "data": [dict(row) for row in results]}
    except Exception as e:
        return {"success": False, "error": str(e), "query": query}
    

やっていること:

  • AIが生成したSQLクエリを実際に実行
  • 結果をPythonの辞書形式に変換
  • エラー情報も含めて結果を返す

4. AIが使用するツールの定義

@tool
def get_table_schema(dataset_id: str, table_id: str) -> str:
    """
    BigQueryテーブルのスキーマ情報を取得します。

    Args:
        dataset_id: データセット名
        table_id: テーブル名

    Returns:
        テーブルのスキーマ情報(JSON形式の文字列)
    """
    result = bq_connection.get_table_schema(dataset_id, table_id)
    return json.dumps(result, ensure_ascii=False, indent=2)

やっていること:

  • @toolデコレータでAIが使える「道具」として定義
  • テーブル構造を調べるツール
  • AIに返す形式はJSON文字列
@tool
def run_query(query: str) -> str:
    """
    BigQueryでSQLクエリを実行します。

    Args:
        query: 実行するSQLクエリ(BigQuery Standard SQL)

    Returns:
        クエリ実行結果(JSON形式の文字列)
    """
    result = bq_connection.execute_query(query)
    return json.dumps(result, ensure_ascii=False, indent=2)

やっていること:

  • SQLを実行するツール
  • AIが生成したクエリを実際にデータベースで実行

5. AIエージェントの作成

def get_agent_executor():
    """AI AgentのExecutorを作成"""
    llm = ChatVertexAI(model_name=MODEL_NAME, location=LOCATION, temperature=0)
    tools = [get_table_schema, run_query]

やっていること:

  • Gemini 2.5-Proモデルを初期化
  • temperature=0で回答の一貫性を重視
  • 2つのツールをAIに提供

5-1. AIへの指示設定

system_instruction = """
あなたはBigQuery分析エージェントです。

1. `get_table_schema` でテーブル構造を確認する。
2. `run_query` でSQLを実行する。(BigQuery Standard SQL)
3. 結果が得られたら、**以下のJSONフォーマットのみ**を出力して終了する。

```json
{{
    "answer": "分析結果の要約テキスト",
    "data": [ ...SQLの結果データ... ]
}}
\```
"""

やっていること:

  • AIの役割と動作手順を明確に指示
  • 出力フォーマットを統一
  • BigQuery専用SQLの使用を指定

5-2. プロンプトテンプレート作成

# プロンプトテンプレートの修正
prompt = ChatPromptTemplate.from_messages(
    [
        ("system", system_instruction),
        ("user", "{input}"),
        MessagesPlaceholder(variable_name="agent_scratchpad"),
    ]
)

agent = create_tool_calling_agent(llm, tools, prompt)
return AgentExecutor(agent=agent, tools=tools, verbose=True)

やっていること:

  • システム指示、ユーザー入力、AI思考過程の構造化
  • エージェントと実行環境を作成
  • verbose=Trueで処理過程を表示

6. メイン実行処理

def main():
    """メイン実行関数"""
    try:
        print("=== BigQuery AI Agent サンプル ===")

        # AIエージェントの初期化
        agent_executor = get_agent_executor()

やっていること:

  • エージェントシステムを起動
  • エラーハンドリングで安全な実行

6-1. 質問の設定

question = """
xxx プロジェクトの xxx データセットにある
stocks_daily テーブルを使って、
2025年7月の月初から月末までで上昇率が一番高い銘柄を教えてください。
"""

やっていること:

  • 具体的な分析質問を設定
  • プロジェクト、データセット、テーブル名を指定
  • 期間と分析内容を明確化

6-2. AI分析の実行

# AIエージェントに質問を投げる
response = agent_executor.invoke({"input": question})

# 結果を表示
print("=== 結果 ===")
print(response["output"])

やっていること:

  • 質問をAIエージェントに送信
  • AIが自動的にツールを使って分析
  • 結果を表示

実際の処理フロー

ステップ1: テーブル構造の確認

  1. AIが「まずテーブル構造を確認しよう」と判断
  2. get_table_schema("xxx", "stocks_daily")を実行
  3. カラム情報を取得

ステップ2: SQL生成と実行

  1. テーブル構造を基にSQLクエリを生成
  2. run_query(generated_sql)を実行
  3. データを取得

ステップ3: 結果の分析と回答

  1. 取得したデータを分析
  2. JSON形式で結果をまとめ
  3. ユーザーに回答

技術的な特徴

自動思考プロセス

  • AIが段階的に問題を解決
  • 必要に応じてツールを使い分け
  • エラー時の自動対応

安全性

  • エラーハンドリングが各段階に組み込まれている

  • SQLインジェクション対策

  • 接続エラー時の適切な処理

    このシステムにより、SQLを知らないユーザーでも、自然言語でBigQueryデータを分析できる革新的なAIアシスタントが実現されています。


動作フロー(思考プロセス)

Q: 「2025年7月の上昇率No.1銘柄を教えて」

  1. 思考: テーブル構造を知る必要がある
    Action: get_table_schema 実行
  2. 思考: 日付と価格列を使ってSQLを書こう
    Action: run_query (SELECT ... ORDER BY DESC) 実行
  3. 思考: 結果を要約しよう
    Final Answer: JSON形式で回答

Reference Links:

まとめ

今回は LangChain を用いて、BigQuery上のデータを自然言語で分析するエージェントを実装しました。

「先週の売上要因は?」といった曖昧な質問に対し、エージェントが自律的にテーブルを探し、適切なSQLを生成・実行するプロセスが、わずかなコード量で実現できることを体感いただけたかと思います。LangChainのエコシステムを活用することで、こうした高度な推論パイプラインも素早く構築可能です。

次回について
Vol.1でも触れましたが、Google純正の Agent Development Kit (ADK) を用いたアプローチも残っています。 LangChainが汎用的なフレームワークであるのに対し、ADKはよりGoogle Cloud環境やGeminiの特性に最適化されたエージェント開発が可能であると期待されます。

次回は、この ADKを使った実装 に挑戦し、今回のLangChain版とのコードの書き味の違いや、機能的なメリット・デメリットについても検証していく予定です。

「データ分析の民主化」に向けた第一歩として、ぜひ今回のコードを手元で動かし、AIエージェントの可能性を感じてみてください。

次回もお楽しみに!

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?