到達点(MVP): Slackの/sqlに自然文を投げると、1本のクエリ実行後結果が返る。
本稿では FastAPI +(簡易)NL→SQL + BigQuery 実行 + Slack返信 を“最短動線”でつなぎます。LangGraphの本格運用(チェックポイント等)は Part 3 で置き換えます。
つくるもの(最短ルート)
- /slack/cmd を受け取る FastAPI エンドポイント
- 署名検証(開発時はスキップ可能)
- 簡易NL→SQL(テンプレ/フォールバック):まずは1本通す
- BigQuery実行(SELECTのみ/危険なSQLは拒否)
- SlackにACK→遅延返信(開発用に即時返信モードも用意)
ディレクトリ構成(Part 1|MVP最小)
sql-bot/
└─ backend/
├─ app/
│ ├─ __init__.py
│ ├─ main.py # FastAPIエントリ
│ ├─ slack.py # 署名検証/フォームパース
│ ├─ handler.py # MVP: /sql 受け付け→実行→返信
│ ├─ formatting.py # Slack表整形
│ └─ guards.py # 危険SQLガード(SELECT限定/DS固定)
├─ bq/
│ ├─ __init__.py
│ └─ client.py # BigQueryクエリ実行
├─ langgraph/
│ ├─ __init__.py
│ └─ nl2sql.py # 簡易NL→SQL(Part 3で置換)
├─ requirements.txt
└─ .env.example
依存関係と環境変数
fastapi==0.115.5
uvicorn[standard]==0.30.6
google-cloud-bigquery==3.25.0
python-dotenv==1.0.1
requests==2.32.3
sqlalchemy>=2.0
# Slack
SLACK_SIGNING_SECRET=replace_me
SLACK_BOT_TOKEN=xoxb-xxxxxxxx
# GCP / BigQuery
GOOGLE_CLOUD_PROJECT=your-project-id
BIGQUERY_LOCATION=asia-northeast1
BQ_DEFAULT_DATASET=your_dataset # 例: ecommerce_sample
# 開発オプション
DEV_SKIP_SLACK_VERIFY=true # 開発中は署名検証スキップ可
DEV_INLINE_REPLY=false # trueにすると即時返信(Slackのresponse_urlを使わない)
インストールと起動方法
cd sql-bot/backend
python -m venv .venv && source .venv/bin/activate
pip install -r requirements.txt
cp .env.example .env # 値を編集
uvicorn app.main:app --reload
アプリケーションコード
1. FastAPIエントリ
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks
from .slack import verify_signature
from .handler import handle_sql
import os
app = FastAPI(title="SQL Bot MVP")
@app.post("/slack/cmd")
async def slack_cmd(req: Request, bg: BackgroundTasks):
body = await req.body()
headers = req.headers
# 開発時はスキップ可
if os.getenv("DEV_SKIP_SLACK_VERIFY", "false").lower() != "true":
if not verify_signature(headers, body):
raise HTTPException(status_code=401, detail="invalid signature")
return await handle_sql(body, bg)
# 開発用のダミー受け口(response_urlに投げる先がない時の確認用)
@app.post("/dev/echo")
async def dev_echo(req: Request):
payload = await req.json()
print("DEV ECHO:", payload) # ログで確認
return {"ok": True}
2. Slackユーティリティ(署名検証 & フォームパース)
import hmac, hashlib, time, os
from urllib.parse import parse_qs
SLACK_SIGNING_SECRET = os.getenv("SLACK_SIGNING_SECRET", "")
MAX_SKEW_SEC = 60 * 5
def verify_signature(headers, body: bytes) -> bool:
ts = int(headers.get("X-Slack-Request-Timestamp", "0"))
if abs(time.time() - ts) > MAX_SKEW_SEC:
return False
base = f"v0:{ts}:{body.decode()}".encode()
req_sig = headers.get("X-Slack-Signature", "")
my_sig = "v0=" + hmac.new(
key=SLACK_SIGNING_SECRET.encode(),
msg=base,
digestmod=hashlib.sha256,
).hexdigest()
return hmac.compare_digest(req_sig, my_sig)
def parse_slash_payload(body: bytes) -> dict:
form = parse_qs(body.decode())
get = lambda k: (form.get(k, [""])[0] or "").strip()
return {
"team_id": get("team_id"),
"user_id": get("user_id"),
"channel_id": get("channel_id"),
"text": get("text"),
"response_url": get("response_url"),
"command": get("command"),
}
3. BigQueryクライアント
import os
from google.cloud import bigquery
PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
LOCATION = os.getenv("BIGQUERY_LOCATION", "asia-northeast1")
def run_query(sql: str, max_rows: int = 50) -> tuple[list[str], list[tuple]]:
client = bigquery.Client(project=PROJECT, location=LOCATION)
job = client.query(sql, location=LOCATION)
it = job.result(max_results=max_rows) # RowIterator
schema_fields = [f.name for f in it.schema]
rows = list(it)
if not rows:
return schema_fields, []
values = [tuple(row[name] for name in schema_fields) for row in rows]
return schema_fields, values
4. Slack返信の整形(表をコードブロックに)
def format_table(headers: list[str], rows: list[tuple], max_rows_display: int = 20) -> str:
rows = rows[:max_rows_display]
cols = [headers] + [list(map(lambda x: "" if x is None else str(x), r)) for r in rows]
widths = [max(len(str(col[i])) for col in cols) for i in range(len(headers))]
def fmt_line(values):
return " | ".join(str(v).ljust(widths[i]) for i, v in enumerate(values))
lines = []
lines.append(fmt_line(headers))
lines.append("-+-".join("-" * w for w in widths))
for r in rows:
lines.append(fmt_line(r))
return "```\n" + "\n".join(lines) + "\n```"
5. 危険なSQLをガード(SELECT限定 + DS固定)
import re, os
BQ_DEFAULT_DATASET = os.getenv("BQ_DEFAULT_DATASET", "")
_FORBIDDEN = re.compile(r"\b(INSERT|UPDATE|DELETE|MERGE|TRUNCATE|DROP|ALTER|CREATE|GRANT|REVOKE)\b", re.I)
def assert_safe_select(sql: str):
if not re.match(r"(?s)^\s*SELECT\b", sql, re.I):
raise ValueError("Only SELECT is allowed in MVP.")
if _FORBIDDEN.search(sql):
raise ValueError("Dangerous SQL detected.")
if BQ_DEFAULT_DATASET:
def repl(m):
tbl = m.group(1)
if "." in tbl:
return m.group(0)
return m.group(0).replace(tbl, f"{BQ_DEFAULT_DATASET}.{tbl}")
sql = re.sub(r"\bFROM\s+([a-zA-Z_][\w]*)\b", repl, sql, flags=re.I)
return sql
6. “まず1リクエスト通す”ための簡易 NL→SQL(テンプレ + フォールバック)
本稿では簡易版です。Part 3でLangGraphの正式なNL→SQLに差し替えます。
import os
BQ_DEFAULT_DATASET = os.getenv("BQ_DEFAULT_DATASET", "your_dataset")
def generate_sql(natural: str) -> str:
q = natural.strip()
if q.lower().startswith("sql:"):
return q[4:].strip()
if ("今月" in q or "当月" in q) and ("売上" in q or "売上高" in q) and ("日次" in q or "日別" in q):
return f"""
WITH daily_sales AS (
SELECT
DATE(order_date) AS order_day,
SUM(total_amount) AS daily_total
FROM `{BQ_DEFAULT_DATASET}.orders`
WHERE EXTRACT(YEAR FROM order_date) = EXTRACT(YEAR FROM CURRENT_DATE())
AND EXTRACT(MONTH FROM order_date) = EXTRACT(MONTH FROM CURRENT_DATE())
GROUP BY order_day
),
moving AS (
SELECT
order_day,
daily_total,
AVG(daily_total) OVER (ORDER BY order_day ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS seven_day_moving_avg
FROM daily_sales
)
SELECT order_day, daily_total, seven_day_moving_avg
FROM moving
ORDER BY order_day
LIMIT 100
""".strip()
return "SELECT CURRENT_TIMESTAMP() AS now"
7. ハンドラ:受け付け→ACK→遅延返信(開発は即時返信も可)
import os, requests
from .slack import parse_slash_payload
from langgraph.nl2sql import generate_sql
from bq.client import run_query
from .formatting import format_table
from .guards import assert_safe_select
from fastapi import BackgroundTasks
def post_to_slack(response_url: str, text: str, in_channel: bool = True):
payload = {"response_type": "in_channel" if in_channel else "ephemeral", "text": text}
requests.post(response_url, json=payload, timeout=10)
async def handle_sql(body: bytes, bg: BackgroundTasks):
p = parse_slash_payload(body)
natural = p["text"]
response_url = p["response_url"]
# NL→SQL
sql_raw = generate_sql(natural)
# ガード&DS固定
try:
sql = assert_safe_select(sql_raw)
except Exception as e:
# 即時エラー返却
return {"response_type": "ephemeral", "text": f"Error: {e}"}
# 開発用:即時返信モード(Slack不要)
if os.getenv("DEV_INLINE_REPLY", "false").lower() == "true" or not response_url:
headers, rows = run_query(sql)
return {"response_type": "ephemeral", "text": f"*SQL*\n```\n{sql}\n```\n*RESULT*\n{format_table(headers, rows)}"}
# 本番想定:ACK → バックグラウンドで実行・遅延返信
def _run_and_reply():
try:
headers, rows = run_query(sql)
msg = f"*SQL*\n```\n{sql}\n```\n*RESULT*\n{format_table(headers, rows)}"
except Exception as e:
msg = f":warning: 実行エラー: {e}"
post_to_slack(response_url, msg, in_channel=True)
bg.add_task(_run_and_reply)
return {"response_type": "ephemeral", "text": "受け付けました。結果を後ほど返信します。"}
Bigqueryへの権限付与など
1. 認証とプロジェクト指定
# ブラウザでGoogleアカウントにログインしてADCを発行(クライアントライブラリが使用)
gcloud auth application-default login
# 使うプロジェクトを明示
gcloud config set project your-project-id
gcloud auth login は gcloud CLI自身の認証、
gcloud auth application-default login は クライアントライブラリ用のADC。
BigQueryのPython/Go/Nodeクライアントは後者(ADC)を使います。
2. 最小権限(あなたのユーザー)を付与
Googleアカウントに以下の権限が必要です。
- プロジェクト:roles/bigquery.jobUser(クエリ実行/ジョブ作成)
- データセット:roles/bigquery.dataViewer(読み取り) ← データセット単位で付与
PROJECT_ID=your-project-id
DATASET=ecommerce_sample
USER=you@example.com # あなたのGoogleアカウント
# ジョブ実行(プロジェクト)
gcloud projects add-iam-policy-binding "$PROJECT_ID" \
--member="user:${USER}" \
--role="roles/bigquery.jobUser"
# データ閲覧(データセット単位)
bq --project_id="$PROJECT_ID" add-iam-policy-binding "${PROJECT_ID}:${DATASET}" \
--member="user:${USER}" \
--role="roles/bigquery.dataViewer"
bqコマンドでの付与ができない場合はGCPのBigquery管理画面の対象データベースを選択し、共有、権限付与より追加して下さい。
権限は管理者から付けてもらうか、自分が管理権限を持っていれば以下で付与できます。
動作確認
1. 開発モード(署名検証スキップ + 即時返信)
.env を以下のようにして起動します。
DEV_SKIP_SLACK_VERIFY=true
DEV_INLINE_REPLY=true
BQ_DEFAULT_DATASET=your_dataset # あらかじめordersテーブル等を用意しておく
起動後、擬似的にSlashコマンドを叩きます(URLエンコードされたbody)。
curl -X POST http://127.0.0.1:8000/slack/cmd \
-H "Content-Type: application/x-www-form-urlencoded" \
--data 'team_id=T123&user_id=U123&response_url=&text=今月の売上を日次で、7日移動平均も表示'
期待結果:result として表形式のコードブロックがJSONのtextです。
(orders(order_date, total_amount) のモック/実表が必要です)
2. 遅延返信の確認(ダミーecho)
DEV_INLINE_REPLY=false にし、response_url にローカルの /dev/echo を指定して確認します。
curl -X POST http://127.0.0.1:8000/slack/cmd \
-H "Content-Type: application/x-www-form-urlencoded" \
--data 'team_id=T123&user_id=U123&response_url=http://127.0.0.1:8000/dev/echo&text=SQL: SELECT 1 AS ok'
サーバ側ログに DEV ECHO: {...} が出れば、ACK→遅延返信フローが通っています。
Slack本番では response_url をSlack提供のURLに自動で向けます(Part 2で本設定)。
補足:orders がない場合の最小モック
最小検証だけ行うなら、自然文ではなく SQL直書き でOKです。
curl -X POST http://127.0.0.1:8000/slack/cmd \
-H "Content-Type: application/x-www-form-urlencoded" \
--data 'team_id=T123&user_id=U123&response_url=&text=SQL: SELECT CURRENT_DATE() AS today, 42 AS answer'
ここまでのポイント(運用を見据えたMVPの型)
- 3秒制限対策:ACK→遅延返信(BackgroundTasks + response_url)
- 安全性:SELECT限定 + 危険語句ブロック + 既定DS固定で誤爆回避
- 段階的導入:SQL:直書き → 簡易テンプレ → LangGraph(Part 3)
- ローカル検証用フラグ:DEV_SKIP_SLACK_VERIFY, DEV_INLINE_REPLY で開発を速く
本稿のコードはMVPです。本番運用の実装を簡略化しています。実データに接続する前に、アクセス権限・マスキング・監査ログを必ず確認してください。
補足:Docker で動かす(ADC優先/SA JSONは代替)
認証は2通り
A) ADC(Application Default Credentials)をコンテナにマウント(推奨・簡単)
B) サービスアカウントJSONを環境変数で指定(ADCが使えない環境の代替)
本番運用では JSON 鍵ではなく サービスアカウント(鍵レス) を推奨(Part 4 参照)。
追加ファイル
backend/Dockerfile
docker-compose.yml
# syntax=docker/dockerfile:1
FROM python:3.11-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates curl && \
rm -rf /var/lib/apt/lists/*
COPY backend/requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
COPY backend /app
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
docker-compose.yml(A or B を選択)
version: "3.9"
services:
api:
build:
context: .
dockerfile: backend/Dockerfile
container_name: sql-bot-api
ports:
- "8000:8000"
env_file:
- backend/.env
# ABどちらか一方だけ有効化してください
volumes:
- ./backend:/app
# --- A) ADC をコンテナへマウント(推奨) ---
# macOS/Linux:
- ~/.config/gcloud:/root/.config/gcloud:ro
# Windows(PowerShell)例:
# - "C:/Users/%USERNAME%/AppData/Roaming/gcloud:/root/.config/gcloud:ro"
# ------------------------------------------------
# --- B) 代替:サービスアカウントJSONを使用 ---
# 使う場合は上の ADC 行をコメントアウトし、以下2行のコメントを外す:
# - ./secrets/sa.json:/secrets/sa.json:ro
environment:
# Bを使う場合のみ有効化(Aでは不要)
# GOOGLE_APPLICATION_CREDENTIALS: /secrets/sa.json
restart: unless-stopped
サービスアカウントJSON(B)を使う場合の注意
• secrets/sa.json は リポジトリにコミットしない(.gitignore で除外)。
• 付与権限は最小限:roles/bigquery.jobUser(プロジェクト)+ roles/bigquery.dataViewer(データセット)。
• 期限・ローテーション・保管場所に注意。本番は鍵レス(Cloud Run のSA)推奨。
実行手順
-
Aを使う人はホストでADC発行(初回のみ)
-
Bを使う人は JSON を用意
./secrets/sa.json を置き、compose の SA 行と GOOGLE_APPLICATION_CREDENTIALS を有効化。
付与権限は上記の最小権限のみ。 -
起動
docker compose up --build
4. 疎通テスト
curl -X POST http://127.0.0.1:8000/slack/cmd \
-H "Content-Type: application/x-www-form-urlencoded" \
--data 'team_id=T123&user_id=U123&response_url=&text=SQL: SELECT CURRENT_DATE() AS today, 42 AS answer'
ランディングページのご案内
プロダクト概要・導入メリット・デモイメージは、こちらのLPにまとめています。
👉 ランディングページ
前回:Part 0 へ
Part 0:全体像と目次
次回: Part 2:Slack署名検証の本番化、team_id → Workspace → company_id ルーティング、多テナント設計