到達点:LangGraphでNL→SQLフローを構築し、
- PostgreSQLチェックポイントで状態を保存
- エラー処理(SELECT限定・データセット固定・多文拒否)
- 自動再試行(エラーメッセージを使ってSQLをリペア)
までを実装します。
(Part 1/2のMVPを正式フローに差し替え)
追加依存関係の追加
langgraph>=0.1.0
langgraph-checkpoint-postgres>=0.1.0
psycopg[binary,pool]>=3.2
pydantic>=2.7
openai>=1.40.0
環境変数(例):
OPENAI_API_KEY=sk-***
OPENAI_MODEL=gpt-4o-mini # 任意のモデル名を指定
CHECKPOINT_NS=text2sql # 既出(Part 1/2と同じ命名でOK)
1. チェックポイント(PostgreSQL)を用意する
LangGraphは**状態(State)**をDBに保存できます。最初にテーブルを自動作成します。
from __future__ import annotations
import os
from langgraph.checkpoint.postgres import PostgresSaver
from psycopg import connect
from psycopg.rows import dict_row
DB_URI = os.getenv("DATABASE_URL") # postgresql://user:pass@host:5432/sqlbot
def build_checkpointer() -> PostgresSaver:
with connect(DB_URI, autocommit=True, row_factory=dict_row) as conn:
saver = PostgresSaver(conn)
saver.setup() # 初回テーブル作成
return saver
代表テーブル(バージョンで多少異なることあり):checkpoint_migrations, checkpoint_writes, checkpoint_blobs など。
ここに スレッド(thread_id) ごとの進行状況が保存され、再実行・再試行が安定します。
2. スキーマキャッシュをDBに保存
NL→SQLの精度を上げるため、INFORMATION_SCHEMAから取得したテーブル/カラム情報をJSONBでキャッシュします(TTLつき)。
CREATE TABLE IF NOT EXISTS bq_schema_cache (
project_id TEXT NOT NULL,
dataset TEXT NOT NULL,
table_name TEXT NOT NULL,
columns JSONB NOT NULL, -- [{name,type,description?}, ...]
fetched_at TIMESTAMPTZ NOT NULL DEFAULT now(),
PRIMARY KEY (project_id, dataset, table_name)
);
from __future__ import annotations
import json, time
from sqlalchemy import text
from bq.client import run_query # ← 絶対インポートに統一
TTL_SEC = 6 * 3600 # 6時間
def get_table_schema(db, project: str, dataset: str, table: str):
row = db.execute(
text("""SELECT columns, fetched_at
FROM bq_schema_cache
WHERE project_id=:p AND dataset=:d AND table_name=:t"""),
{"p": project, "d": dataset, "t": table},
).fetchone()
if row and (time.time() - row.fetched_at.timestamp()) < TTL_SEC:
return row.columns
headers, rows = run_query(f"""
SELECT column_name, data_type
FROM `{project}.{dataset}.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = '{table}'
ORDER BY ordinal_position
""", project=project)
cols = [{"name": r[0], "type": r[1]} for r in rows]
db.execute(
text("""INSERT INTO bq_schema_cache(project_id,dataset,table_name,columns,fetched_at)
VALUES (:p,:d,:t,:c, now())
ON CONFLICT (project_id,dataset,table_name)
DO UPDATE SET columns=:c, fetched_at=now()"""),
{"p": project, "d": dataset, "t": table, "c": json.dumps(cols)},
)
db.commit()
return cols
最初は必要テーブルだけ(例:orders)で十分。後で自動収集・pgvector拡張も可能。
全テーブルを自動読み込みする場合はschema_loader.pyなどを実装して読み込み専用処理を定期実行するようにして対応する。
3. LLMクライアント(最小)
from __future__ import annotations
import os
from openai import OpenAI
_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")
_client = OpenAI()
def complete_system_user(system: str, user: str) -> str:
resp = _client.chat.completions.create(
model=_MODEL,
messages=[{"role": "system", "content": system},
{"role": "user", "content": user}],
temperature=0.2,
)
return resp.choices[0].message.content or ""
4. エラー処理(ガード強化)
- SELECT限定
- 多文(;区切り)拒否
- データセット外アクセス拒否(project.dataset.table が許可DSかチェック)
- LIMIT補完(存在しなければ LIMIT 100 を末尾に付与)
from __future__ import annotations
import re
_FORBIDDEN = re.compile(r"\b(INSERT|UPDATE|DELETE|MERGE|TRUNCATE|DROP|ALTER|CREATE|GRANT|REVOKE)\b", re.I)
def enforce_sql_policy(sql: str, allowed_dataset: str) -> str:
s = sql.strip().rstrip(";")
if not re.match(r"(?s)^\s*SELECT\b", s, re.I):
raise ValueError("Only SELECT is allowed.")
if _FORBIDDEN.search(s):
raise ValueError("Dangerous SQL detected.")
if ";" in sql:
raise ValueError("Multiple statements are not allowed.")
def _qualify(m):
full = m.group(1)
parts = full.split(".")
if len(parts) == 1:
return m.group(0).replace(full, f"{allowed_dataset}.{full}")
ds = parts[-2] if len(parts) >= 2 else ""
if ds.lower() != allowed_dataset.lower():
raise ValueError(f"Access to dataset '{ds}' is not allowed.")
return m.group(0)
s = re.sub(r"\bFROM\s+([a-zA-Z_][\w\.]*)\b", _qualify, s, flags=re.I)
s = re.sub(r"\bJOIN\s+([a-zA-Z_][\w\.]*)\b", _qualify, s, flags=re.I)
if not re.search(r"\bLIMIT\s+\d+\b", s, re.I):
s = f"{s}\nLIMIT 100"
return s
5. グラフの状態とノードを定義する
5-1. State
from __future__ import annotations
from pydantic import BaseModel, Field
from typing import List, Optional, Tuple
class NL2SQLState(BaseModel):
question: str
project_id: str
dataset: str
location: str
schema_hint: Optional[str] = None
sql: Optional[str] = None
headers: Optional[List[str]] = None
rows: Optional[List[Tuple]] = None
error: Optional[str] = None
retries: int = Field(default=0, ge=0)
5-2. プロンプト
SYSTEM_PROMPT = """あなたはBigQuery専用のSQLアシスタントです。
- 返答はSQLのみ(説明文なし)
- DML/DDLは禁止(SELECTのみ)
- 必ず `{dataset}.table` の形式で参照
- 末尾に LIMIT を付ける(多すぎる結果を避ける)
- タイムゾーンはUTC基準で良い(必要ならDATE()で日付化)
"""
USER_TEMPLATE = """自然文:
{question}
利用可能なテーブルと主なカラム(参考):
{schema_hint}
制約:
- dataset={dataset}
- SELECTのみ、1文のみ
- 結果はSQLだけを返す
"""
5-3. ノード(promptの {dataset} を format して渡す)
from __future__ import annotations
from sqlalchemy.orm import Session
from .state import NL2SQLState
from .prompt import SYSTEM_PROMPT, USER_TEMPLATE
from .llm import complete_system_user
from bq.client import run_query
from .guards import enforce_sql_policy
from bq.schema_cache import get_table_schema
def build_schema_hint(db: Session, project: str, dataset: str, examples=("orders",)) -> str:
lines = []
for tbl in examples:
cols = get_table_schema(db, project, dataset, tbl)
if not cols:
continue
left = ", ".join(f"{c['name']}:{c['type']}" for c in cols[:8])
tail = " ..." if len(cols) > 8 else ""
lines.append(f"- {tbl}({left}{tail})")
return "\n".join(lines) if lines else "(スキーマ情報は最小です)"
def node_collect_schema(db: Session):
def _fn(state: NL2SQLState) -> NL2SQLState:
state.schema_hint = build_schema_hint(db, state.project_id, state.dataset, examples=("orders",))
return state
return _fn
def node_llm_generate_sql(state: NL2SQLState) -> NL2SQLState:
system = SYSTEM_PROMPT.format(dataset=state.dataset) # ← ここ重要
user = USER_TEMPLATE.format(
question=state.question,
schema_hint=state.schema_hint or "(なし)",
dataset=state.dataset
)
sql = complete_system_user(system, user).strip()
state.sql = sql
state.error = None
return state
def node_validate_and_execute(state: NL2SQLState) -> NL2SQLState:
try:
safe_sql = enforce_sql_policy(state.sql or "", state.dataset)
headers, rows = run_query(
safe_sql,
project=state.project_id,
location=state.location
)
state.headers, state.rows = headers, rows
state.error = None
state.sql = safe_sql
return state
except Exception as e:
state.error = str(e)
return state
def node_llm_repair_sql(state: NL2SQLState) -> NL2SQLState:
user = f"""次のSQLを修正して。エラー:\n{state.error}\n\nSQL:\n{state.sql}"""
fixed = complete_system_user(
"あなたはSQL修復アシスタントです。BigQuery方言で1文のSELECTだけを返す。",
user
).strip()
state.sql = fixed
return state
6. グラフ(DBセッション生成の簡素化)
from __future__ import annotations
from langgraph.graph import StateGraph, START, END
from .state import NL2SQLState
from .nodes import node_collect_schema, node_llm_generate_sql, node_validate_and_execute, node_llm_repair_sql
from .checkpointer import build_checkpointer
MAX_RETRIES = 2
def build_graph(db_session_factory):
builder = StateGraph(NL2SQLState)
def _collect(state: NL2SQLState):
with db_session_factory() as db:
return node_collect_schema(db)(state)
builder.add_node("collect_schema", _collect)
builder.add_node("draft_sql", node_llm_generate_sql)
builder.add_node("exec", node_validate_and_execute)
builder.add_node("repair", node_llm_repair_sql)
builder.add_edge(START, "collect_schema")
builder.add_edge("collect_schema", "draft_sql")
builder.add_edge("draft_sql", "exec")
def _route_on_error(state: NL2SQLState):
if state.error is None:
return END
if state.retries >= MAX_RETRIES:
return END
state.retries += 1
return "repair"
builder.add_conditional_edges("exec", _route_on_error)
builder.add_edge("repair", "exec")
checkpointer = build_checkpointer()
return builder.compile(checkpointer=checkpointer)
7. ハンドラから呼び出す(Part 2差し替え)
Part 2の handle_sql() を LangGraph呼び出しに置き換えます。
thread_id と checkpoint_ns を指定するのがポイント(Slackスレッド単位で状態が残る)。
from __future__ import annotations
import os, requests
from fastapi import BackgroundTasks
from .slack import parse_slash_payload
from .formatting import format_table
from ..langgraph.graph import build_graph
from .db.session import SessionLocal # ← ここを修正
CHECKPOINT_NS = os.getenv("CHECKPOINT_NS", "text2sql")
_graph = None
def get_graph():
global _graph
if _graph is None:
_graph = build_graph(SessionLocal)
return _graph
def post_to_slack(response_url: str, text: str, in_channel: bool = True):
payload = {"response_type": "in_channel" if in_channel else "ephemeral", "text": text}
requests.post(response_url, json=payload, timeout=10)
async def handle_sql_with_langgraph(body: bytes, bg: BackgroundTasks, tenant: dict):
p = parse_slash_payload(body)
question = p["text"]
response_url = p["response_url"]
state = {
"question": question,
"project_id": tenant["project_id"],
"dataset": tenant["dataset"],
"location": tenant["location"],
}
thread_id = f"{p['channel_id']}:{p['user_id']}"
config = {"configurable": {"thread_id": thread_id, "checkpoint_ns": CHECKPOINT_NS}}
def _run():
graph = get_graph()
out = graph.invoke(state, config=config)
if out.error:
return f":warning: 失敗({out.retries}回試行): {out.error}\n```\n{out.sql or ''}\n```"
table = format_table(out.headers or [], out.rows or [])
return f"*SQL*\n```\n{out.sql}\n```\n*RESULT*\n{table}"
inline = os.getenv("DEV_INLINE_REPLY", "false").lower() == "true" or not response_url
if inline:
return {"response_type": "ephemeral", "text": _run()}
bg.add_task(lambda: post_to_slack(response_url, _run(), in_channel=True))
return {"response_type": "ephemeral", "text": "受け付けました(LangGraph正式版)。結果は追って返信します。"}
main.py 側は、Part 2 の handle_sql をこちらに差し替えるだけです。
# from .handler import handle_sql # 旧
from .handler_langgraph import handle_sql_with_langgraph as handle_sql # 新
8. 動作確認(ローカル)
curl -X POST http://127.0.0.1:8000/slack/cmd \
-H "Content-Type: application/x-www-form-urlencoded" \
--data 'team_id=T09GABUDW2X&user_id=U1&channel_id=C1&response_url=&text=今月の売上を日次で、7日移動平均も'
- 1回目:スキーマ収集 → LLMがSQL生成 → 実行 → 表で返る
- 失敗時:エラーを食わせてリペア → 再実行(最大2回)
9. 実装の意図と運用Tips
チェックポイント:Thread単位で履歴が残るので、再送や途中失敗にも強い。
禁則処理:
- SELECT以外は即拒否
- 多文禁止(;)でインジェクション耐性
- データセット外拒否でテナント越境を防止
- LIMIT補完でコスト・UI崩れを防ぐ
再試行(リペア): - Unrecognized name や Syntax error をLLMに渡して修正
- Not found: Table 系はschema_cacheの更新で解消されることが多い
拡張余地: - bq_schema_cache にdescription/サンプル行を入れる
- pgvectorで列説明の意味検索を追加
10. まとめ
• LangGraphで正式な自然言語問い合わせ→SQLのパイプラインを構築
• PostgreSQLチェックポイントで状態/再試行を堅牢化
• エラー処理で安全性を担保しつつ、自動リペアで実用性を底上げ
追記:Dockerでの実行(Part 3 版)
version: "3.9"
services:
postgres:
image: postgres:15
environment:
POSTGRES_USER: sqlbot
POSTGRES_PASSWORD: sqlbot
POSTGRES_DB: sqlbot
ports: ["5432:5432"]
volumes: [pgdata:/var/lib/postgresql/data]
api:
build:
context: .
dockerfile: backend/Dockerfile
depends_on: [postgres]
ports: ["8000:8000"]
env_file:
- backend/.env
environment:
DATABASE_URL: postgresql+psycopg2://sqlbot:sqlbot@postgres:5432/sqlbot
OPENAI_API_KEY: ${OPENAI_API_KEY} # ← シェル or .env から渡す
# GOOGLE_APPLICATION_CREDENTIALS: /secrets/sa.json # ADCが使えない人向け(代替)
volumes:
- ./backend:/app
- ~/.config/gcloud:/root/.config/gcloud:ro # ADC(推奨)
# - ./secrets/sa.json:/secrets/sa.json:ro # 代替: SA JSON を使う場合
restart: unless-stopped
volumes:
pgdata:
# 起動
docker compose up --build -d
# マイグレーション(Part 2 + Part 3)
docker compose exec -T postgres psql -U sqlbot -d sqlbot < db/migrations/20250929_part3_schema_cache.sql
ランディングページのご案内
プロダクト概要・導入メリット・デモイメージは、こちらのLPにまとめています。
👉 ランディングページ
前回:Part 2 へ
Part 2:Slack多テナント用の参照系テーブルとルーティング実装
次回: Part 4:Cloud Run × BigQuery の最小権限とコスト設計を体系化し、運用で詰まりがちな領域(ロケーション不一致、定額/オンデマンド、結果キャッシュ)を開設する