0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangGraphでAIエージェント対応SQLボットを作る Part 3:LangGraphで自然言語問い合わせ→SQLを実装(チェックポイント・再試行)

Posted at

到達点:LangGraphでNL→SQLフローを構築し、

  • PostgreSQLチェックポイントで状態を保存
  • エラー処理(SELECT限定・データセット固定・多文拒否)
  • 自動再試行(エラーメッセージを使ってSQLをリペア)
    までを実装します。
    (Part 1/2のMVPを正式フローに差し替え)

追加依存関係の追加

backend/requirements.txt
langgraph>=0.1.0
langgraph-checkpoint-postgres>=0.1.0
psycopg[binary,pool]>=3.2
pydantic>=2.7
openai>=1.40.0

環境変数(例):

.env
OPENAI_API_KEY=sk-***
OPENAI_MODEL=gpt-4o-mini        # 任意のモデル名を指定
CHECKPOINT_NS=text2sql          # 既出(Part 1/2と同じ命名でOK)

1. チェックポイント(PostgreSQL)を用意する

LangGraphは**状態(State)**をDBに保存できます。最初にテーブルを自動作成します。

langgraph/checkpointer.py
from __future__ import annotations
import os
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg import connect
from psycopg.rows import dict_row

DB_URI = os.getenv("DATABASE_URL")  # postgresql://user:pass@host:5432/sqlbot

def build_checkpointer() -> PostgresSaver:
    with connect(DB_URI, autocommit=True, row_factory=dict_row) as conn:
        saver = PostgresSaver(conn)
        saver.setup()  # 初回テーブル作成
        return saver

代表テーブル(バージョンで多少異なることあり):checkpoint_migrations, checkpoint_writes, checkpoint_blobs など。
ここに スレッド(thread_id) ごとの進行状況が保存され、再実行・再試行が安定します。

2. スキーマキャッシュをDBに保存

NL→SQLの精度を上げるため、INFORMATION_SCHEMAから取得したテーブル/カラム情報をJSONBでキャッシュします(TTLつき)。

db/migrations/schema_cache.sql
CREATE TABLE IF NOT EXISTS bq_schema_cache (
  project_id TEXT NOT NULL,
  dataset    TEXT NOT NULL,
  table_name TEXT NOT NULL,
  columns    JSONB NOT NULL,     -- [{name,type,description?}, ...]
  fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
  PRIMARY KEY (project_id, dataset, table_name)
);
bq/schema_cache.py
from __future__ import annotations
import json, time
from sqlalchemy import text
from bq.client import run_query   # ← 絶対インポートに統一

TTL_SEC = 6 * 3600  # 6時間

def get_table_schema(db, project: str, dataset: str, table: str):
    row = db.execute(
        text("""SELECT columns, fetched_at
                FROM bq_schema_cache
                WHERE project_id=:p AND dataset=:d AND table_name=:t"""),
        {"p": project, "d": dataset, "t": table},
    ).fetchone()

    if row and (time.time() - row.fetched_at.timestamp()) < TTL_SEC:
        return row.columns

    headers, rows = run_query(f"""
      SELECT column_name, data_type
      FROM `{project}.{dataset}.INFORMATION_SCHEMA.COLUMNS`
      WHERE table_name = '{table}'
      ORDER BY ordinal_position
    """, project=project)
    cols = [{"name": r[0], "type": r[1]} for r in rows]

    db.execute(
        text("""INSERT INTO bq_schema_cache(project_id,dataset,table_name,columns,fetched_at)
                VALUES (:p,:d,:t,:c, now())
                ON CONFLICT (project_id,dataset,table_name)
                DO UPDATE SET columns=:c, fetched_at=now()"""),
        {"p": project, "d": dataset, "t": table, "c": json.dumps(cols)},
    )
    db.commit()
    return cols

最初は必要テーブルだけ(例:orders)で十分。後で自動収集・pgvector拡張も可能。
全テーブルを自動読み込みする場合はschema_loader.pyなどを実装して読み込み専用処理を定期実行するようにして対応する。

3. LLMクライアント(最小)

langgraph/llm.py
from __future__ import annotations
import os
from openai import OpenAI

_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
_client = OpenAI()

def complete_system_user(system: str, user: str) -> str:
    resp = _client.chat.completions.create(
        model=_MODEL,
        messages=[{"role": "system", "content": system},
                  {"role": "user", "content": user}],
        temperature=0.2,
    )
    return resp.choices[0].message.content or ""

4. エラー処理(ガード強化)

  • SELECT限定
  • 多文(;区切り)拒否
  • データセット外アクセス拒否(project.dataset.table が許可DSかチェック)
  • LIMIT補完(存在しなければ LIMIT 100 を末尾に付与)
langgraph/guards.py
from __future__ import annotations
import re

_FORBIDDEN = re.compile(r"\b(INSERT|UPDATE|DELETE|MERGE|TRUNCATE|DROP|ALTER|CREATE|GRANT|REVOKE)\b", re.I)

def enforce_sql_policy(sql: str, allowed_dataset: str) -> str:
    s = sql.strip().rstrip(";")
    if not re.match(r"(?s)^\s*SELECT\b", s, re.I):
        raise ValueError("Only SELECT is allowed.")
    if _FORBIDDEN.search(s):
        raise ValueError("Dangerous SQL detected.")
    if ";" in sql:
        raise ValueError("Multiple statements are not allowed.")

    def _qualify(m):
        full = m.group(1)
        parts = full.split(".")
        if len(parts) == 1:
            return m.group(0).replace(full, f"{allowed_dataset}.{full}")
        ds = parts[-2] if len(parts) >= 2 else ""
        if ds.lower() != allowed_dataset.lower():
            raise ValueError(f"Access to dataset '{ds}' is not allowed.")
        return m.group(0)

    s = re.sub(r"\bFROM\s+([a-zA-Z_][\w\.]*)\b", _qualify, s, flags=re.I)
    s = re.sub(r"\bJOIN\s+([a-zA-Z_][\w\.]*)\b", _qualify, s, flags=re.I)

    if not re.search(r"\bLIMIT\s+\d+\b", s, re.I):
        s = f"{s}\nLIMIT 100"
    return s

5. グラフの状態とノードを定義する

5-1. State

langgraph/state.py
from __future__ import annotations
from pydantic import BaseModel, Field
from typing import List, Optional, Tuple

class NL2SQLState(BaseModel):
    question: str
    project_id: str
    dataset: str
    location: str

    schema_hint: Optional[str] = None
    sql: Optional[str] = None
    headers: Optional[List[str]] = None
    rows: Optional[List[Tuple]] = None
    error: Optional[str] = None
    retries: int = Field(default=0, ge=0)

5-2. プロンプト

langgraph/prompt.py
SYSTEM_PROMPT = """あなたはBigQuery専用のSQLアシスタントです。
- 返答はSQLのみ(説明文なし)
- DML/DDLは禁止(SELECTのみ)
- 必ず `{dataset}.table` の形式で参照
- 末尾に LIMIT を付ける(多すぎる結果を避ける)
- タイムゾーンはUTC基準で良い(必要ならDATE()で日付化)
"""

USER_TEMPLATE = """自然文:
{question}

利用可能なテーブルと主なカラム(参考):
{schema_hint}

制約:
- dataset={dataset}
- SELECTのみ、1文のみ
- 結果はSQLだけを返す
"""

5-3. ノード(promptの {dataset} を format して渡す)

langgraph/nodes.py
from __future__ import annotations
from sqlalchemy.orm import Session
from .state import NL2SQLState
from .prompt import SYSTEM_PROMPT, USER_TEMPLATE
from .llm import complete_system_user
from bq.client import run_query
from .guards import enforce_sql_policy
from bq.schema_cache import get_table_schema

def build_schema_hint(db: Session, project: str, dataset: str, examples=("orders",)) -> str:
    lines = []
    for tbl in examples:
        cols = get_table_schema(db, project, dataset, tbl)
        if not cols:
            continue
        left = ", ".join(f"{c['name']}:{c['type']}" for c in cols[:8])
        tail = " ..." if len(cols) > 8 else ""
        lines.append(f"- {tbl}({left}{tail})")
    return "\n".join(lines) if lines else "(スキーマ情報は最小です)"

def node_collect_schema(db: Session):
    def _fn(state: NL2SQLState) -> NL2SQLState:
        state.schema_hint = build_schema_hint(db, state.project_id, state.dataset, examples=("orders",))
        return state
    return _fn

def node_llm_generate_sql(state: NL2SQLState) -> NL2SQLState:
    system = SYSTEM_PROMPT.format(dataset=state.dataset)            # ← ここ重要
    user = USER_TEMPLATE.format(
        question=state.question,
        schema_hint=state.schema_hint or "(なし)",
        dataset=state.dataset
    )
    sql = complete_system_user(system, user).strip()
    state.sql = sql
    state.error = None
    return state

def node_validate_and_execute(state: NL2SQLState) -> NL2SQLState:
    try:
        safe_sql = enforce_sql_policy(state.sql or "", state.dataset)
        headers, rows = run_query(
            safe_sql,
            project=state.project_id,
            location=state.location
        )
        state.headers, state.rows = headers, rows
        state.error = None
        state.sql = safe_sql
        return state
    except Exception as e:
        state.error = str(e)
        return state

def node_llm_repair_sql(state: NL2SQLState) -> NL2SQLState:
    user = f"""次のSQLを修正して。エラー:\n{state.error}\n\nSQL:\n{state.sql}"""
    fixed = complete_system_user(
        "あなたはSQL修復アシスタントです。BigQuery方言で1文のSELECTだけを返す。",
        user
    ).strip()
    state.sql = fixed
    return state

6. グラフ(DBセッション生成の簡素化)

langgraph/graph.py
from __future__ import annotations
from langgraph.graph import StateGraph, START, END
from .state import NL2SQLState
from .nodes import node_collect_schema, node_llm_generate_sql, node_validate_and_execute, node_llm_repair_sql
from .checkpointer import build_checkpointer

MAX_RETRIES = 2

def build_graph(db_session_factory):
    builder = StateGraph(NL2SQLState)

    def _collect(state: NL2SQLState):
        with db_session_factory() as db:
            return node_collect_schema(db)(state)

    builder.add_node("collect_schema", _collect)
    builder.add_node("draft_sql", node_llm_generate_sql)
    builder.add_node("exec", node_validate_and_execute)
    builder.add_node("repair", node_llm_repair_sql)

    builder.add_edge(START, "collect_schema")
    builder.add_edge("collect_schema", "draft_sql")
    builder.add_edge("draft_sql", "exec")

    def _route_on_error(state: NL2SQLState):
        if state.error is None:
            return END
        if state.retries >= MAX_RETRIES:
            return END
        state.retries += 1
        return "repair"

    builder.add_conditional_edges("exec", _route_on_error)
    builder.add_edge("repair", "exec")

    checkpointer = build_checkpointer()
    return builder.compile(checkpointer=checkpointer)

7. ハンドラから呼び出す(Part 2差し替え)

Part 2の handle_sql() を LangGraph呼び出しに置き換えます。
thread_id と checkpoint_ns を指定するのがポイント(Slackスレッド単位で状態が残る)。

app/handler_langgraph.py
from __future__ import annotations
import os, requests
from fastapi import BackgroundTasks
from .slack import parse_slash_payload
from .formatting import format_table
from ..langgraph.graph import build_graph
from .db.session import SessionLocal      # ← ここを修正

CHECKPOINT_NS = os.getenv("CHECKPOINT_NS", "text2sql")

_graph = None
def get_graph():
    global _graph
    if _graph is None:
        _graph = build_graph(SessionLocal)
    return _graph

def post_to_slack(response_url: str, text: str, in_channel: bool = True):
    payload = {"response_type": "in_channel" if in_channel else "ephemeral", "text": text}
    requests.post(response_url, json=payload, timeout=10)

async def handle_sql_with_langgraph(body: bytes, bg: BackgroundTasks, tenant: dict):
    p = parse_slash_payload(body)
    question = p["text"]
    response_url = p["response_url"]

    state = {
        "question": question,
        "project_id": tenant["project_id"],
        "dataset": tenant["dataset"],
        "location": tenant["location"],
    }

    thread_id = f"{p['channel_id']}:{p['user_id']}"
    config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": CHECKPOINT_NS}}

    def _run():
        graph = get_graph()
        out = graph.invoke(state, config=config)
        if out.error:
            return f":warning: 失敗({out.retries}回試行): {out.error}\n```\n{out.sql or ''}\n```"
        table = format_table(out.headers or [], out.rows or [])
        return f"*SQL*\n```\n{out.sql}\n```\n*RESULT*\n{table}"

    inline = os.getenv("DEV_INLINE_REPLY", "false").lower() == "true" or not response_url
    if inline:
        return {"response_type": "ephemeral", "text": _run()}

    bg.add_task(lambda: post_to_slack(response_url, _run(), in_channel=True))
    return {"response_type": "ephemeral", "text": "受け付けました(LangGraph正式版)。結果は追って返信します。"}

main.py 側は、Part 2 の handle_sql をこちらに差し替えるだけです。

app/main.py
# from .handler import handle_sql  # 旧
from .handler_langgraph import handle_sql_with_langgraph as handle_sql  # 新

8. 動作確認(ローカル)

curl -X POST http://127.0.0.1:8000/slack/cmd \
  -H "Content-Type: application/x-www-form-urlencoded" \
  --data 'team_id=T09GABUDW2X&user_id=U1&channel_id=C1&response_url=&text=今月の売上を日次で、7日移動平均も'
  • 1回目:スキーマ収集 → LLMがSQL生成 → 実行 → 表で返る
  • 失敗時:エラーを食わせてリペア → 再実行(最大2回)

9. 実装の意図と運用Tips

チェックポイント:Thread単位で履歴が残るので、再送や途中失敗にも強い。
禁則処理:

  • SELECT以外は即拒否
  • 多文禁止(;)でインジェクション耐性
  • データセット外拒否でテナント越境を防止
  • LIMIT補完でコスト・UI崩れを防ぐ
    再試行(リペア):
  • Unrecognized name や Syntax error をLLMに渡して修正
  • Not found: Table 系はschema_cacheの更新で解消されることが多い
    拡張余地:
  • bq_schema_cache にdescription/サンプル行を入れる
  • pgvectorで列説明の意味検索を追加

10. まとめ

• LangGraphで正式な自然言語問い合わせ→SQLのパイプラインを構築
• PostgreSQLチェックポイントで状態/再試行を堅牢化
• エラー処理で安全性を担保しつつ、自動リペアで実用性を底上げ

追記:Dockerでの実行(Part 3 版)

docker-compose.yml
version: "3.9"
services:
  postgres:
    image: postgres:15
    environment:
      POSTGRES_USER: sqlbot
      POSTGRES_PASSWORD: sqlbot
      POSTGRES_DB: sqlbot
    ports: ["5432:5432"]
    volumes: [pgdata:/var/lib/postgresql/data]

  api:
    build:
      context: .
      dockerfile: backend/Dockerfile
    depends_on: [postgres]
    ports: ["8000:8000"]
    env_file:
      - backend/.env
    environment:
      DATABASE_URL: postgresql+psycopg2://sqlbot:sqlbot@postgres:5432/sqlbot
      OPENAI_API_KEY: ${OPENAI_API_KEY}        # ← シェル or .env から渡す
      # GOOGLE_APPLICATION_CREDENTIALS: /secrets/sa.json  # ADCが使えない人向け(代替)
    volumes:
      - ./backend:/app
      - ~/.config/gcloud:/root/.config/gcloud:ro # ADC(推奨)
      # - ./secrets/sa.json:/secrets/sa.json:ro  # 代替: SA JSON を使う場合
    restart: unless-stopped

volumes:
  pgdata:
# 起動
docker compose up --build -d

# マイグレーション(Part 2 + Part 3)
docker compose exec -T postgres psql -U sqlbot -d sqlbot < db/migrations/20250929_part3_schema_cache.sql

ランディングページのご案内

プロダクト概要・導入メリット・デモイメージは、こちらのLPにまとめています。
👉 ランディングページ


前回:Part 2 へ

Part 2:Slack多テナント用の参照系テーブルとルーティング実装

次回: Part 4:Cloud Run × BigQuery の最小権限とコスト設計を体系化し、運用で詰まりがちな領域(ロケーション不一致、定額/オンデマンド、結果キャッシュ)を開設する

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?