到達点:Slackの team_id から 会社(company) と BigQuery接続先(bq_source) を解決し、
/sql → NL→SQL → BigQuery 実行までを テナント別設定 で通せるようにします。
署名検証は本番仕様(±5分の時刻検証、リプレイ対策)に更新します。
何を作るか(概観)
- 参照系テーブル:companies / slack_workspaces / bq_sources
- ルーティング:team_id → company_id → bq_sources(is_default) の一本道
- 署名検証の本番化:HMAC(X-Slack-Signature)、時刻乖離(±5分)、再送ヘッダ対応
- アプリ改修:Part 1 の run_query() / assert_safe_select() を テナントのDS/Project で動くよう拡張
ER 図
マイグレーション(PostgreSQL)
CREATE TABLE companies (
id BIGSERIAL PRIMARY KEY,
name TEXT NOT NULL,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE TABLE slack_workspaces (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES companies(id),
team_id TEXT NOT NULL UNIQUE,
team_name TEXT,
is_active BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX idx_slack_workspaces_company_id ON slack_workspaces(company_id);
CREATE TABLE bq_sources (
id BIGSERIAL PRIMARY KEY,
company_id BIGINT NOT NULL REFERENCES companies(id),
project_id TEXT NOT NULL,
dataset TEXT NOT NULL,
location TEXT NOT NULL DEFAULT 'asia-northeast1',
is_default BOOLEAN NOT NULL DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT now(),
updated_at TIMESTAMPTZ DEFAULT now(),
UNIQUE (company_id, project_id, dataset)
);
CREATE INDEX idx_bq_sources_company_id ON bq_sources(company_id);
CREATE INDEX idx_bq_sources_company_default ON bq_sources(company_id, is_default);
最小ダミーデータ
INSERT INTO companies (name) VALUES ('Wondercoms Inc.') RETURNING id; -- 例: 1
INSERT INTO slack_workspaces (company_id, team_id, team_name)
VALUES (1, 'T09GABUDW2X', 'Wondercoms2');
INSERT INTO bq_sources (company_id, project_id, dataset, location, is_default)
VALUES (1, 'your-project-id', 'ecommerce_sample', 'asia-northeast1', TRUE);
アプリの構成追加(Part 1 からの差分)
sql-bot/backend/
├─ app/
│ ├─ main.py
│ ├─ slack.py
│ ├─ handler.py
│ ├─ formatting.py
│ ├─ guards.py
│ ├─ tenancy.py
│ └─ db/
│ ├─ session.py
│ └─ models.py
├─ bq/
│ └─ client.py
└─ ...
SQLAlchemy セッション
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
DB_URI = os.getenv("DATABASE_URL") # postgresql+psycopg2://user:pass@host:5432/sqlbot
engine = create_engine(DB_URI, pool_pre_ping=True, future=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)
# FastAPI依存性
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
参照系モデル
from sqlalchemy import Column, BigInteger, Boolean, Text, DateTime, ForeignKey, UniqueConstraint, func
from sqlalchemy.orm import declarative_base, relationship
Base = declarative_base()
class Company(Base):
__tablename__ = "companies"
id = Column(BigInteger, primary_key=True)
name = Column(Text, nullable=False)
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
workspaces = relationship("SlackWorkspace", back_populates="company")
bq_sources = relationship("BQSource", back_populates="company")
class SlackWorkspace(Base):
__tablename__ = "slack_workspaces"
id = Column(BigInteger, primary_key=True)
company_id = Column(BigInteger, ForeignKey("companies.id"), nullable=False)
team_id = Column(Text, nullable=False, unique=True)
team_name = Column(Text)
is_active = Column(Boolean, nullable=False, server_default="true")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
company = relationship("Company", back_populates="workspaces")
class BQSource(Base):
__tablename__ = "bq_sources"
__table_args__ = (
UniqueConstraint("company_id", "project_id", "dataset"),
)
id = Column(BigInteger, primary_key=True)
company_id = Column(BigInteger, ForeignKey("companies.id"), nullable=False)
project_id = Column(Text, nullable=False)
dataset = Column(Text, nullable=False)
location = Column(Text, nullable=False, server_default="asia-northeast1")
is_default = Column(Boolean, nullable=False, server_default="true")
created_at = Column(DateTime(timezone=True), server_default=func.now())
updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
company = relationship("Company", back_populates="bq_sources")
テナント解決ロジック
from sqlalchemy.orm import Session
from .db.models import SlackWorkspace, BQSource
class TenantNotFound(Exception): ...
class WorkspaceInactive(Exception): ...
class BQSourceNotFound(Exception): ...
def resolve_tenant(db: Session, team_id: str):
ws = (
db.query(SlackWorkspace)
.filter(SlackWorkspace.team_id == team_id)
.first()
)
if not ws:
raise TenantNotFound(f"unknown team_id: {team_id}")
if not ws.is_active:
raise WorkspaceInactive(f"inactive workspace: {team_id}")
bq = (
db.query(BQSource)
.filter(BQSource.company_id == ws.company_id, BQSource.is_default.is_(True))
.first()
)
if not bq:
raise BQSourceNotFound(f"no default bq_source for company_id={ws.company_id}")
# ルーティング結果(BigQuery実行に必要な最小情報)
return {
"company_id": ws.company_id,
"team_id": team_id,
"project_id": bq.project_id,
"dataset": bq.dataset,
"location": bq.location,
}
署名検証の本番化(再送・リプレイ対策)
- ±5分以内の時刻検証(X-Slack-Request-Timestamp)
- HMACで署名検証(X-Slack-Signature)
- Slackの再送ヘッダを考慮:X-Slack-Retry-Num, X-Slack-Retry-Reason
追加の堅牢化として、重複受信の排除(idempotency) を入れると安全です(任意実装)。
例:request_dedup テーブルに sig_hash + timestamp を保存して5分間の重複を拒否。
不要な場合はrequest_dedup.sql削除して、migrationも実行不要です
CREATE TABLE request_dedup (
id BIGSERIAL PRIMARY KEY,
sig_hash TEXT NOT NULL,
ts BIGINT NOT NULL, -- Slack timestamp(秒)
created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX idx_request_dedup_sig_ts ON request_dedup(sig_hash, ts);
import hmac, hashlib, time, os
from urllib.parse import parse_qs
SLACK_SIGNING_SECRET = os.getenv("SLACK_SIGNING_SECRET", "")
MAX_SKEW_SEC = 60 * 5
def verify_signature(headers, body: bytes) -> tuple[bool, int, str]:
ts = int(headers.get("X-Slack-Request-Timestamp", "0"))
if abs(time.time() - ts) > MAX_SKEW_SEC:
return False, ts, ""
base = f"v0:{ts}:{body.decode()}".encode()
req_sig = headers.get("X-Slack-Signature", "")
my_sig = "v0=" + hmac.new(
key=SLACK_SIGNING_SECRET.encode(),
msg=base,
digestmod=hashlib.sha256,
).hexdigest()
return hmac.compare_digest(req_sig, my_sig), ts, my_sig
def parse_slash_payload(body: bytes) -> dict:
form = parse_qs(body.decode())
get = lambda k: (form.get(k, [""])[0] or "").strip()
return {
"team_id": get("team_id"),
"user_id": get("user_id"),
"channel_id": get("channel_id"),
"text": get("text"),
"response_url": get("response_url"),
"command": get("command"),
}
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from .slack import verify_signature
from .db.session import get_db
from .tenancy import resolve_tenant, TenantNotFound, WorkspaceInactive, BQSourceNotFound
from .handler import handle_sql
from sqlalchemy import text as sa_text
app = FastAPI(title="SQL Bot - Part2")
@app.post("/slack/cmd")
async def slack_cmd(req: Request, bg: BackgroundTasks, db: Session = Depends(get_db)):
body = await req.body()
headers = req.headers
ok, ts, sig = verify_signature(headers, body)
if not ok:
raise HTTPException(status_code=401, detail="invalid signature")
# 任意:重複排除(5分間)、不要な場合外して下さい
retry_num = headers.get("X-Slack-Retry-Num")
if retry_num is not None:
# Slackの再送は**同一イベント**。DBに記録していれば 200 OK でACKのみ返す、などの方針も可。
pass
# dedup(sig_hash + ts の組で5分の間に複数回来たら一度だけ処理)
db.execute(sa_text("""
DELETE FROM request_dedup WHERE created_at < now() - interval '10 minutes'
"""))
found = db.execute(sa_text("""
SELECT 1 FROM request_dedup WHERE sig_hash=:h AND ts=:t
"""), {"h": sig, "t": ts}).fetchone()
if found:
return {"response_type": "ephemeral", "text": "Duplicate (replayed) request ignored."}
db.execute(sa_text("""
INSERT INTO request_dedup(sig_hash, ts) VALUES(:h, :t)
"""), {"h": sig, "t": ts})
db.commit()
# team_id → tenant
from .slack import parse_slash_payload
payload = parse_slash_payload(body)
team_id = payload["team_id"]
try:
tenant = resolve_tenant(db, team_id)
except TenantNotFound:
raise HTTPException(status_code=403, detail="unknown team_id")
except WorkspaceInactive:
raise HTTPException(status_code=403, detail="workspace is inactive")
except BQSourceNotFound:
raise HTTPException(status_code=500, detail="no default BigQuery source")
# 以降の処理にtenant情報を渡す
return await handle_sql(body, bg, tenant=tenant)
BigQuery 実行をテナント対応に
1. クライアント側(project/location のオーバーライドを許可)
import os
from google.cloud import bigquery
DEFAULT_PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
DEFAULT_LOCATION = os.getenv("BIGQUERY_LOCATION", "asia-northeast1")
def run_query(sql: str, project: str | None = None, location: str | None = None, max_rows: int = 50):
project = project or DEFAULT_PROJECT
location = location or DEFAULT_LOCATION
client = bigquery.Client(project=project, location=location)
job = client.query(sql, location=location)
it = job.result(max_results=max_rows) # RowIterator
headers = [f.name for f in it.schema] # ← ここがポイント
rows = list(it)
if not rows:
return headers, []
values = [tuple(row[h] for h in headers) for row in rows]
return headers, values
2. ガード側(デフォルトDSをテナントごとに補完)
import re
_FORBIDDEN = re.compile(r"\b(INSERT|UPDATE|DELETE|MERGE|TRUNCATE|DROP|ALTER|CREATE|GRANT|REVOKE)\b", re.I)
def assert_safe_select(sql: str, default_dataset: str | None):
if not re.match(r"(?s)^\s*SELECT\b", sql, re.I):
raise ValueError("Only SELECT is allowed.")
if _FORBIDDEN.search(sql):
raise ValueError("Dangerous SQL detected.")
if default_dataset:
def fix(m):
tbl = m.group(1)
if "." in tbl: # 既に修飾あり
return m.group(0)
return m.group(0).replace(tbl, f"{default_dataset}.{tbl}")
sql = re.sub(r"\bFROM\s+([a-zA-Z_][\w\.]*)\b", fix, sql, flags=re.I)
sql = re.sub(r"\bJOIN\s+([a-zA-Z_][\w\.]*)\b", fix, sql, flags=re.I)
return sql
3. ハンドラ(tenantを受け取って実行)
import os, requests
from .slack import parse_slash_payload
from ..langgraph.nl2sql import generate_sql
from ..bq.client import run_query
from .formatting import format_table
from .guards import assert_safe_select
from fastapi import BackgroundTasks
def post_to_slack(response_url: str, text: str, in_channel: bool = True):
payload = {"response_type": "in_channel" if in_channel else "ephemeral", "text": text}
requests.post(response_url, json=payload, timeout=10)
async def handle_sql(body: bytes, bg: BackgroundTasks, tenant: dict):
p = parse_slash_payload(body)
natural = p["text"]
response_url = p["response_url"]
sql_raw = generate_sql(natural)
try:
sql = assert_safe_select(sql_raw, default_dataset=tenant["dataset"])
except Exception as e:
return {"response_type": "ephemeral", "text": f"Error: {e}"}
inline = os.getenv("DEV_INLINE_REPLY", "false").lower() == "true" or not response_url
def _execute():
try:
headers, rows = run_query(
sql,
project=tenant["project_id"],
location=tenant["location"],
)
return f"*Company:* `{tenant['company_id']}` *Team:* `{tenant['team_id']}`\n" \
f"*SQL*\n```\n{sql}\n```\n*RESULT*\n{format_table(headers, rows)}"
except Exception as e:
return f":warning: 実行エラー: {e}"
if inline:
return {"response_type": "ephemeral", "text": _execute()}
bg.add_task(lambda: post_to_slack(response_url, _execute(), in_channel=True))
return {"response_type": "ephemeral", "text": "受け付けました(multi-tenant)。結果は追って返信します。"}
動作確認(curl)
1. 署名検証をスキップしてローカルで最短確認(開発時)
# .env
DEV_INLINE_REPLY=true
DEV_SKIP_SLACK_VERIFY=true
DATABASE_URL=postgresql+psycopg2://user:pass@localhost:5432/sqlbot
curl -X POST http://127.0.0.1:8000/slack/cmd \
-H "Content-Type: application/x-www-form-urlencoded" \
--data 'team_id=T09GABUDW2X&user_id=U123&response_url=&text=SQL: SELECT CURRENT_DATE() AS today, 1 AS ok'
RESULT が返る(DBの slack_workspaces / bq_sources を参照)
2. 本番署名検証(Slack から実送)
- Slack App に Slash コマンドを設定(URL:/slack/cmd)
- SLACK_SIGNING_SECRET を環境変数に設定
- 3秒以内のACKは Part 1 の実装通り(遅延返信)
本パートの落とし穴と対策
- is_default が複数:DB制約はかけていないので、アプリ側で最新レコードを優先する or 部分インデックスを使う(高度実装は Part 4 以降で調整)
- 未登録 team_id:403で返しつつ、登録手順(社内運用)を記事内・READMEに明記
- リージョン不一致:bq_sources.location を正しく設定(asia-northeast1 等)
- 重複再送:request_dedup で5〜10分のウィンドウで除外
- 権限不足:Part 1 の ADC/SA + jobUser & dataViewer を再確認(プロジェクト / DS)
まとめ
- DBはPostgreSQL に参照系テーブルを用意し、team_id → company → bq_source を 決定的に 解決
- 署名検証 は±5分+HMAC、再送は dedup で安全に
- BigQuery 実行は テナントごとの project/dataset/location を適用
追記:Dockerで起動する(PostgreSQL + API)
Cloud Run は必ずコンテナになるので、Part 2 以降は Docker を標準実行方法にしておくのがおすすめです。
ここでは PostgreSQL(テナントDB & dedup)+ API の2サービス構成を示します。
FROM python:3.11-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1 \
PIP_NO_CACHE_DIR=1
WORKDIR /app
RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates curl && \
rm -rf /var/lib/apt/lists/*
COPY backend/requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt
COPY backend /app
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
docker-compose.yml(API + Postgres / ADC優先・SAは代替)
version: "3.9"
services:
postgres:
image: postgres:15
container_name: sql-bot-postgres
environment:
POSTGRES_USER: sqlbot
POSTGRES_PASSWORD: sqlbot
POSTGRES_DB: sqlbot
ports:
- "5432:5432"
volumes:
- pgdata:/var/lib/postgresql/data
# 初回起動時にマイグレーション & シードを自動実行したい場合は以下を有効化
# - ./db/migrations:/docker-entrypoint-initdb.d/migrations:ro
# - ./db/seed:/docker-entrypoint-initdb.d/seed:ro
api:
build:
context: .
container_name: sql-bot-api
depends_on:
- postgres
ports:
- "8000:8000"
env_file:
- backend/.env
environment:
DATABASE_URL: postgresql+psycopg2://sqlbot:sqlbot@postgres:5432/sqlbot
# --- B) SA JSON を使う人だけ有効化(A: ADC の場合は不要) ---
# GOOGLE_APPLICATION_CREDENTIALS: /secrets/sa.json
volumes:
- ./backend:/app
# --- A) ADC をマウント(推奨・簡単) ---
- ~/.config/gcloud:/root/.config/gcloud:ro
# --- B) SA JSON を使う場合は上のADC行を外し、以下2行を有効に ---
# - ./secrets/sa.json:/secrets/sa.json:ro
restart: unless-stopped
volumes:
pgdata:
どちらか一方だけ(ADC か SA JSON)を有効化してください。
本番(Cloud Run)は サービスアカウント鍵レスで運用(Part 4 で解説)。
実行手順
- 起動
docker compose up --build
2. マイグレーション & シードを適用
docker compose exec -T postgres psql -U sqlbot -d sqlbot -f /docker-entrypoint-initdb.d/migrations/20250929_part2_multitenant.sql
docker compose exec -T postgres psql -U sqlbot -d sqlbot -f /docker-entrypoint-initdb.d/seed/tenants.sql
もしくはローカルファイルを直接渡す
docker compose exec -T postgres psql -U sqlbot -d sqlbot < db/migrations/20250929_part2_multitenant.sql
docker compose exec -T postgres psql -U sqlbot -d sqlbot < db/seed/tenants.sql
ランディングページのご案内
プロダクト概要・導入メリット・デモイメージは、こちらのLPにまとめています。
👉 ランディングページ
前回:Part 1 へ
Part 1:アーキテクチャとMVPを一気通貫で(/sql コマンドで1クエリ実行する)
次回: Part 3:LangGraphでNL→SQLを実装(チェックポイント・再試行)