0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangGraphでAIエージェント対応SQLボットを作る Part 1:アーキテクチャとMVPを一気通貫で(/sql コマンドで1クエリ実行する)

Last updated at Posted at 2025-09-30

到達点(MVP): Slackの/sqlに自然文を投げると、1本のクエリ実行後結果が返る。
本稿では FastAPI +(簡易)NL→SQL + BigQuery 実行 + Slack返信 を“最短動線”でつなぎます。LangGraphの本格運用(チェックポイント等)は Part 3 で置き換えます。

つくるもの(最短ルート)

  • /slack/cmd を受け取る FastAPI エンドポイント
  • 署名検証(開発時はスキップ可能)
  • 簡易NL→SQL(テンプレ/フォールバック):まずは1本通す
  • BigQuery実行(SELECTのみ/危険なSQLは拒否)
  • SlackにACK→遅延返信(開発用に即時返信モードも用意)

ディレクトリ構成(Part 1|MVP最小)

sql-bot/
  └─ backend/
      ├─ app/
      │   ├─ __init__.py
      │   ├─ main.py               # FastAPIエントリ
      │   ├─ slack.py              # 署名検証/フォームパース
      │   ├─ handler.py            # MVP: /sql 受け付け→実行→返信
      │   ├─ formatting.py         # Slack表整形
      │   └─ guards.py             # 危険SQLガード(SELECT限定/DS固定)
      ├─ bq/
      │   ├─ __init__.py
      │   └─ client.py             # BigQueryクエリ実行
      ├─ langgraph/
      │   ├─ __init__.py
      │   └─ nl2sql.py             # 簡易NL→SQL(Part 3で置換)
      ├─ requirements.txt
      └─ .env.example

依存関係と環境変数

requirements.txt
fastapi==0.115.5
uvicorn[standard]==0.30.6
google-cloud-bigquery==3.25.0
python-dotenv==1.0.1
requests==2.32.3
sqlalchemy>=2.0
.env
# Slack
SLACK_SIGNING_SECRET=replace_me
SLACK_BOT_TOKEN=xoxb-xxxxxxxx

# GCP / BigQuery
GOOGLE_CLOUD_PROJECT=your-project-id
BIGQUERY_LOCATION=asia-northeast1
BQ_DEFAULT_DATASET=your_dataset   # 例: ecommerce_sample

# 開発オプション
DEV_SKIP_SLACK_VERIFY=true        # 開発中は署名検証スキップ可
DEV_INLINE_REPLY=false            # trueにすると即時返信(Slackのresponse_urlを使わない)

インストールと起動方法

cd sql-bot/backend
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env   # 値を編集
uvicorn app.main:app --reload

アプリケーションコード

1. FastAPIエントリ

app/main.py
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from .slack import verify_signature
from .handler import handle_sql
import os

app = FastAPI(title="SQL Bot MVP")

@app.post("/slack/cmd")
async def slack_cmd(req: Request, bg: BackgroundTasks):
    body = await req.body()
    headers = req.headers

    # 開発時はスキップ可
    if os.getenv("DEV_SKIP_SLACK_VERIFY", "false").lower() != "true":
        if not verify_signature(headers, body):
            raise HTTPException(status_code=401, detail="invalid signature")

    return await handle_sql(body, bg)

# 開発用のダミー受け口(response_urlに投げる先がない時の確認用)
@app.post("/dev/echo")
async def dev_echo(req: Request):
    payload = await req.json()
    print("DEV ECHO:", payload)  # ログで確認
    return {"ok": True}

2. Slackユーティリティ(署名検証 & フォームパース)

app/slack.py
import hmac, hashlib, time, os
from urllib.parse import parse_qs

SLACK_SIGNING_SECRET = os.getenv("SLACK_SIGNING_SECRET", "")
MAX_SKEW_SEC = 60 * 5

def verify_signature(headers, body: bytes) -> bool:
    ts = int(headers.get("X-Slack-Request-Timestamp", "0"))
    if abs(time.time() - ts) > MAX_SKEW_SEC:
        return False
    base = f"v0:{ts}:{body.decode()}".encode()
    req_sig = headers.get("X-Slack-Signature", "")
    my_sig = "v0=" + hmac.new(
        key=SLACK_SIGNING_SECRET.encode(),
        msg=base,
        digestmod=hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(req_sig, my_sig)

def parse_slash_payload(body: bytes) -> dict:
    form = parse_qs(body.decode())
    get = lambda k: (form.get(k, [""])[0] or "").strip()
    return {
        "team_id": get("team_id"),
        "user_id": get("user_id"),
        "channel_id": get("channel_id"),
        "text": get("text"),
        "response_url": get("response_url"),
        "command": get("command"),
    }

3. BigQueryクライアント

bq/client.py
import os
from google.cloud import bigquery

PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
LOCATION = os.getenv("BIGQUERY_LOCATION", "asia-northeast1")

def run_query(sql: str, max_rows: int = 50) -> tuple[list[str], list[tuple]]:
    client = bigquery.Client(project=PROJECT, location=LOCATION)
    job = client.query(sql, location=LOCATION)
    it = job.result(max_results=max_rows)     # RowIterator
    schema_fields = [f.name for f in it.schema]
    rows = list(it)
    if not rows:
        return schema_fields, []
    values = [tuple(row[name] for name in schema_fields) for row in rows]
    return schema_fields, values

4. Slack返信の整形(表をコードブロックに)

app/formatting.py
def format_table(headers: list[str], rows: list[tuple], max_rows_display: int = 20) -> str:
    rows = rows[:max_rows_display]
    cols = [headers] + [list(map(lambda x: "" if x is None else str(x), r)) for r in rows]
    widths = [max(len(str(col[i])) for col in cols) for i in range(len(headers))]

    def fmt_line(values):
        return " | ".join(str(v).ljust(widths[i]) for i, v in enumerate(values))

    lines = []
    lines.append(fmt_line(headers))
    lines.append("-+-".join("-" * w for w in widths))
    for r in rows:
        lines.append(fmt_line(r))
    return "```\n" + "\n".join(lines) + "\n```"

5. 危険なSQLをガード(SELECT限定 + DS固定)

app/guards.py
import re, os
BQ_DEFAULT_DATASET = os.getenv("BQ_DEFAULT_DATASET", "")

_FORBIDDEN = re.compile(r"\b(INSERT|UPDATE|DELETE|MERGE|TRUNCATE|DROP|ALTER|CREATE|GRANT|REVOKE)\b", re.I)

def assert_safe_select(sql: str):
    if not re.match(r"(?s)^\s*SELECT\b", sql, re.I):
        raise ValueError("Only SELECT is allowed in MVP.")
    if _FORBIDDEN.search(sql):
        raise ValueError("Dangerous SQL detected.")
    if BQ_DEFAULT_DATASET:
        def repl(m):
            tbl = m.group(1)
            if "." in tbl:
                return m.group(0)
            return m.group(0).replace(tbl, f"{BQ_DEFAULT_DATASET}.{tbl}")
        sql = re.sub(r"\bFROM\s+([a-zA-Z_][\w]*)\b", repl, sql, flags=re.I)
    return sql

6. “まず1リクエスト通す”ための簡易 NL→SQL(テンプレ + フォールバック)

本稿では簡易版です。Part 3でLangGraphの正式なNL→SQLに差し替えます。

langgraph/nl2sql.py
import os
BQ_DEFAULT_DATASET = os.getenv("BQ_DEFAULT_DATASET", "your_dataset")

def generate_sql(natural: str) -> str:
    q = natural.strip()
    if q.lower().startswith("sql:"):
        return q[4:].strip()
    if ("今月" in q or "当月" in q) and ("売上" in q or "売上高" in q) and ("日次" in q or "日別" in q):
        return f"""
WITH daily_sales AS (
  SELECT
    DATE(order_date) AS order_day,
    SUM(total_amount) AS daily_total
  FROM `{BQ_DEFAULT_DATASET}.orders`
  WHERE EXTRACT(YEAR FROM order_date) = EXTRACT(YEAR FROM CURRENT_DATE())
    AND EXTRACT(MONTH FROM order_date) = EXTRACT(MONTH FROM CURRENT_DATE())
  GROUP BY order_day
),
moving AS (
  SELECT
    order_day,
    daily_total,
    AVG(daily_total) OVER (ORDER BY order_day ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS seven_day_moving_avg
  FROM daily_sales
)
SELECT order_day, daily_total, seven_day_moving_avg
FROM moving
ORDER BY order_day
LIMIT 100
""".strip()
    return "SELECT CURRENT_TIMESTAMP() AS now"

7. ハンドラ:受け付け→ACK→遅延返信(開発は即時返信も可)

app/handler.py
import os, requests
from .slack import parse_slash_payload
from langgraph.nl2sql import generate_sql      
from bq.client import run_query               
from .formatting import format_table
from .guards import assert_safe_select        
from fastapi import BackgroundTasks

def post_to_slack(response_url: str, text: str, in_channel: bool = True):
    payload = {"response_type": "in_channel" if in_channel else "ephemeral", "text": text}
    requests.post(response_url, json=payload, timeout=10)

async def handle_sql(body: bytes, bg: BackgroundTasks):
    p = parse_slash_payload(body)
    natural = p["text"]
    response_url = p["response_url"]

    # NL→SQL
    sql_raw = generate_sql(natural)

    # ガード&DS固定
    try:
        sql = assert_safe_select(sql_raw)
    except Exception as e:
        # 即時エラー返却
        return {"response_type": "ephemeral", "text": f"Error: {e}"}

    # 開発用:即時返信モード(Slack不要)
    if os.getenv("DEV_INLINE_REPLY", "false").lower() == "true" or not response_url:
        headers, rows = run_query(sql)
        return {"response_type": "ephemeral", "text": f"*SQL*\n```\n{sql}\n```\n*RESULT*\n{format_table(headers, rows)}"}

    # 本番想定:ACK → バックグラウンドで実行・遅延返信
    def _run_and_reply():
        try:
            headers, rows = run_query(sql)
            msg = f"*SQL*\n```\n{sql}\n```\n*RESULT*\n{format_table(headers, rows)}"
        except Exception as e:
            msg = f":warning: 実行エラー: {e}"
        post_to_slack(response_url, msg, in_channel=True)

    bg.add_task(_run_and_reply)
    return {"response_type": "ephemeral", "text": "受け付けました。結果を後ほど返信します。"}

Bigqueryへの権限付与など

1. 認証とプロジェクト指定

# ブラウザでGoogleアカウントにログインしてADCを発行(クライアントライブラリが使用)
gcloud auth application-default login

# 使うプロジェクトを明示
gcloud config set project your-project-id

gcloud auth login は gcloud CLI自身の認証、
gcloud auth application-default login は クライアントライブラリ用のADC。
BigQueryのPython/Go/Nodeクライアントは後者(ADC)を使います。

2. 最小権限(あなたのユーザー)を付与

Googleアカウントに以下の権限が必要です。

  • プロジェクト:roles/bigquery.jobUser(クエリ実行/ジョブ作成)
  • データセット:roles/bigquery.dataViewer(読み取り) ← データセット単位で付与
PROJECT_ID=your-project-id
DATASET=ecommerce_sample
USER=you@example.com   # あなたのGoogleアカウント

# ジョブ実行(プロジェクト)
gcloud projects add-iam-policy-binding "$PROJECT_ID" \
  --member="user:${USER}" \
  --role="roles/bigquery.jobUser"

# データ閲覧(データセット単位)
bq --project_id="$PROJECT_ID" add-iam-policy-binding "${PROJECT_ID}:${DATASET}" \
  --member="user:${USER}" \
  --role="roles/bigquery.dataViewer"

bqコマンドでの付与ができない場合はGCPのBigquery管理画面の対象データベースを選択し、共有、権限付与より追加して下さい。
権限は管理者から付けてもらうか、自分が管理権限を持っていれば以下で付与できます。

動作確認

1. 開発モード(署名検証スキップ + 即時返信)

.env を以下のようにして起動します。

DEV_SKIP_SLACK_VERIFY=true
DEV_INLINE_REPLY=true
BQ_DEFAULT_DATASET=your_dataset   # あらかじめordersテーブル等を用意しておく

起動後、擬似的にSlashコマンドを叩きます(URLエンコードされたbody)。

curl -X POST http://127.0.0.1:8000/slack/cmd \
  -H "Content-Type: application/x-www-form-urlencoded" \
  --data 'team_id=T123&user_id=U123&response_url=&text=今月の売上を日次で、7日移動平均も表示'

期待結果:result として表形式のコードブロックがJSONのtextです。
(orders(order_date, total_amount) のモック/実表が必要です)

2. 遅延返信の確認(ダミーecho)

DEV_INLINE_REPLY=false にし、response_url にローカルの /dev/echo を指定して確認します。

curl -X POST http://127.0.0.1:8000/slack/cmd \
  -H "Content-Type: application/x-www-form-urlencoded" \
  --data 'team_id=T123&user_id=U123&response_url=http://127.0.0.1:8000/dev/echo&text=SQL: SELECT 1 AS ok'

サーバ側ログに DEV ECHO: {...} が出れば、ACK→遅延返信フローが通っています。
Slack本番では response_url をSlack提供のURLに自動で向けます(Part 2で本設定)。


補足:orders がない場合の最小モック

最小検証だけ行うなら、自然文ではなく SQL直書き でOKです。

curl -X POST http://127.0.0.1:8000/slack/cmd \
  -H "Content-Type: application/x-www-form-urlencoded" \
  --data 'team_id=T123&user_id=U123&response_url=&text=SQL: SELECT CURRENT_DATE() AS today, 42 AS answer'

ここまでのポイント(運用を見据えたMVPの型)

  • 3秒制限対策:ACK→遅延返信(BackgroundTasks + response_url)
  • 安全性:SELECT限定 + 危険語句ブロック + 既定DS固定で誤爆回避
  • 段階的導入:SQL:直書き → 簡易テンプレ → LangGraph(Part 3)
  • ローカル検証用フラグ:DEV_SKIP_SLACK_VERIFY, DEV_INLINE_REPLY で開発を速く

本稿のコードはMVPです。本番運用の実装を簡略化しています。実データに接続する前に、アクセス権限・マスキング・監査ログを必ず確認してください。

補足:Docker で動かす(ADC優先/SA JSONは代替)

認証は2通り
A) ADC(Application Default Credentials)をコンテナにマウント(推奨・簡単)
B) サービスアカウントJSONを環境変数で指定(ADCが使えない環境の代替)
本番運用では JSON 鍵ではなく サービスアカウント(鍵レス) を推奨(Part 4 参照)。

追加ファイル

backend/Dockerfile
docker-compose.yml
backend/Dockerfile
# syntax=docker/dockerfile:1
FROM python:3.11-slim

ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    ca-certificates curl && \
    rm -rf /var/lib/apt/lists/*

COPY backend/requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt

COPY backend /app
EXPOSE 8000

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

docker-compose.yml(A or B を選択)

docker-compose.yml
version: "3.9"

services:
  api:
    build:
      context: .
      dockerfile: backend/Dockerfile
    container_name: sql-bot-api
    ports:
      - "8000:8000"
    env_file:
      - backend/.env
    # ABどちらか一方だけ有効化してください
    volumes:
      - ./backend:/app                     
      # --- A) ADC をコンテナへマウント(推奨) ---
      # macOS/Linux:
      - ~/.config/gcloud:/root/.config/gcloud:ro
      # Windows(PowerShell)例:
      # - "C:/Users/%USERNAME%/AppData/Roaming/gcloud:/root/.config/gcloud:ro"
      # ------------------------------------------------
      # --- B) 代替:サービスアカウントJSONを使用 ---
      # 使う場合は上の ADC 行をコメントアウトし、以下2行のコメントを外す:
      # - ./secrets/sa.json:/secrets/sa.json:ro
    environment:
      # Bを使う場合のみ有効化(Aでは不要)
      # GOOGLE_APPLICATION_CREDENTIALS: /secrets/sa.json
    restart: unless-stopped

サービスアカウントJSON(B)を使う場合の注意
• secrets/sa.json は リポジトリにコミットしない(.gitignore で除外)。
• 付与権限は最小限:roles/bigquery.jobUser(プロジェクト)+ roles/bigquery.dataViewer(データセット)。
• 期限・ローテーション・保管場所に注意。本番は鍵レス(Cloud Run のSA)推奨。

実行手順

  1. Aを使う人はホストでADC発行(初回のみ)

  2. Bを使う人は JSON を用意
    ./secrets/sa.json を置き、compose の SA 行と GOOGLE_APPLICATION_CREDENTIALS を有効化。
    付与権限は上記の最小権限のみ。

  3. 起動

docker compose up --build

4. 疎通テスト

curl -X POST http://127.0.0.1:8000/slack/cmd \
  -H "Content-Type: application/x-www-form-urlencoded" \
  --data 'team_id=T123&user_id=U123&response_url=&text=SQL: SELECT CURRENT_DATE() AS today, 42 AS answer'

ランディングページのご案内

プロダクト概要・導入メリット・デモイメージは、こちらのLPにまとめています。
👉 ランディングページ


前回:Part 0 へ

Part 0:全体像と目次

次回: Part 2:Slack署名検証の本番化、team_id → Workspace → company_id ルーティング、多テナント設計

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?