0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

LangGraphでAIエージェント対応SQLボットを作る Part 2:Slack多テナント用の参照系テーブルとルーティング実装

Last updated at Posted at 2025-10-01

到達点:Slackの team_id から 会社(company) と BigQuery接続先(bq_source) を解決し、
/sql → NL→SQL → BigQuery 実行までを テナント別設定 で通せるようにします。
署名検証は本番仕様(±5分の時刻検証、リプレイ対策)に更新します。

何を作るか(概観)

  • 参照系テーブル:companies / slack_workspaces / bq_sources
  • ルーティング:team_id → company_id → bq_sources(is_default) の一本道
  • 署名検証の本番化:HMAC(X-Slack-Signature)、時刻乖離(±5分)、再送ヘッダ対応
  • アプリ改修:Part 1 の run_query() / assert_safe_select() を テナントのDS/Project で動くよう拡張

ER 図

マイグレーション(PostgreSQL)

db/migrations/20250929_part2_multitenant.sql
CREATE TABLE companies (
  id BIGSERIAL PRIMARY KEY,
  name TEXT NOT NULL,
  created_at TIMESTAMPTZ DEFAULT now(),
  updated_at TIMESTAMPTZ DEFAULT now()
);

CREATE TABLE slack_workspaces (
  id BIGSERIAL PRIMARY KEY,
  company_id BIGINT NOT NULL REFERENCES companies(id),
  team_id TEXT NOT NULL UNIQUE,
  team_name TEXT,
  is_active BOOLEAN NOT NULL DEFAULT TRUE,
  created_at TIMESTAMPTZ DEFAULT now(),
  updated_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX idx_slack_workspaces_company_id ON slack_workspaces(company_id);

CREATE TABLE bq_sources (
  id BIGSERIAL PRIMARY KEY,
  company_id BIGINT NOT NULL REFERENCES companies(id),
  project_id TEXT NOT NULL,
  dataset TEXT NOT NULL,
  location TEXT NOT NULL DEFAULT 'asia-northeast1',
  is_default BOOLEAN NOT NULL DEFAULT TRUE,
  created_at TIMESTAMPTZ DEFAULT now(),
  updated_at TIMESTAMPTZ DEFAULT now(),
  UNIQUE (company_id, project_id, dataset)
);
CREATE INDEX idx_bq_sources_company_id ON bq_sources(company_id);
CREATE INDEX idx_bq_sources_company_default ON bq_sources(company_id, is_default);

最小ダミーデータ

db/seed/tenants.sql
INSERT INTO companies (name) VALUES ('Wondercoms Inc.') RETURNING id; -- 例: 1

INSERT INTO slack_workspaces (company_id, team_id, team_name)
VALUES (1, 'T09GABUDW2X', 'Wondercoms2');

INSERT INTO bq_sources (company_id, project_id, dataset, location, is_default)
VALUES (1, 'your-project-id', 'ecommerce_sample', 'asia-northeast1', TRUE);

アプリの構成追加(Part 1 からの差分)

sql-bot/backend/
  ├─ app/
  │   ├─ main.py
  │   ├─ slack.py
  │   ├─ handler.py
  │   ├─ formatting.py
  │   ├─ guards.py
  │   ├─ tenancy.py        
  │   └─ db/
  │       ├─ session.py     
  │       └─ models.py  
  ├─ bq/
  │   └─ client.py
  └─ ...

SQLAlchemy セッション

app/db/session.py
import os
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

DB_URI = os.getenv("DATABASE_URL")  # postgresql+psycopg2://user:pass@host:5432/sqlbot
engine = create_engine(DB_URI, pool_pre_ping=True, future=True)
SessionLocal = sessionmaker(bind=engine, autoflush=False, autocommit=False, future=True)

# FastAPI依存性
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

参照系モデル

app/db/models.py
from sqlalchemy import Column, BigInteger, Boolean, Text, DateTime, ForeignKey, UniqueConstraint, func
from sqlalchemy.orm import declarative_base, relationship

Base = declarative_base()

class Company(Base):
    __tablename__ = "companies"
    id = Column(BigInteger, primary_key=True)
    name = Column(Text, nullable=False)
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
    workspaces = relationship("SlackWorkspace", back_populates="company")
    bq_sources = relationship("BQSource", back_populates="company")

class SlackWorkspace(Base):
    __tablename__ = "slack_workspaces"
    id = Column(BigInteger, primary_key=True)
    company_id = Column(BigInteger, ForeignKey("companies.id"), nullable=False)
    team_id = Column(Text, nullable=False, unique=True)
    team_name = Column(Text)
    is_active = Column(Boolean, nullable=False, server_default="true")
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
    company = relationship("Company", back_populates="workspaces")

class BQSource(Base):
    __tablename__ = "bq_sources"
    __table_args__ = (
        UniqueConstraint("company_id", "project_id", "dataset"),
    )
    id = Column(BigInteger, primary_key=True)
    company_id = Column(BigInteger, ForeignKey("companies.id"), nullable=False)
    project_id = Column(Text, nullable=False)
    dataset = Column(Text, nullable=False)
    location = Column(Text, nullable=False, server_default="asia-northeast1")
    is_default = Column(Boolean, nullable=False, server_default="true")
    created_at = Column(DateTime(timezone=True), server_default=func.now())
    updated_at = Column(DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
    company = relationship("Company", back_populates="bq_sources")

テナント解決ロジック

app/tenancy.py
from sqlalchemy.orm import Session
from .db.models import SlackWorkspace, BQSource

class TenantNotFound(Exception): ...
class WorkspaceInactive(Exception): ...
class BQSourceNotFound(Exception): ...

def resolve_tenant(db: Session, team_id: str):
    ws = (
        db.query(SlackWorkspace)
          .filter(SlackWorkspace.team_id == team_id)
          .first()
    )
    if not ws:
        raise TenantNotFound(f"unknown team_id: {team_id}")
    if not ws.is_active:
        raise WorkspaceInactive(f"inactive workspace: {team_id}")

    bq = (
        db.query(BQSource)
          .filter(BQSource.company_id == ws.company_id, BQSource.is_default.is_(True))
          .first()
    )
    if not bq:
        raise BQSourceNotFound(f"no default bq_source for company_id={ws.company_id}")

    # ルーティング結果(BigQuery実行に必要な最小情報)
    return {
        "company_id": ws.company_id,
        "team_id": team_id,
        "project_id": bq.project_id,
        "dataset": bq.dataset,
        "location": bq.location,
    }

署名検証の本番化(再送・リプレイ対策)

  • ±5分以内の時刻検証(X-Slack-Request-Timestamp)
  • HMACで署名検証(X-Slack-Signature)
  • Slackの再送ヘッダを考慮:X-Slack-Retry-Num, X-Slack-Retry-Reason

追加の堅牢化として、重複受信の排除(idempotency) を入れると安全です(任意実装)。
例:request_dedup テーブルに sig_hash + timestamp を保存して5分間の重複を拒否。
不要な場合はrequest_dedup.sql削除して、migrationも実行不要です

db/migrations/request_dedup.sql
CREATE TABLE request_dedup (
  id BIGSERIAL PRIMARY KEY,
  sig_hash TEXT NOT NULL,
  ts BIGINT NOT NULL,         -- Slack timestamp(秒)
  created_at TIMESTAMPTZ DEFAULT now()
);
CREATE INDEX idx_request_dedup_sig_ts ON request_dedup(sig_hash, ts);
app/slack.py
import hmac, hashlib, time, os
from urllib.parse import parse_qs

SLACK_SIGNING_SECRET = os.getenv("SLACK_SIGNING_SECRET", "")
MAX_SKEW_SEC = 60 * 5

def verify_signature(headers, body: bytes) -> tuple[bool, int, str]:
    ts = int(headers.get("X-Slack-Request-Timestamp", "0"))
    if abs(time.time() - ts) > MAX_SKEW_SEC:
        return False, ts, ""
    base = f"v0:{ts}:{body.decode()}".encode()
    req_sig = headers.get("X-Slack-Signature", "")
    my_sig = "v0=" + hmac.new(
        key=SLACK_SIGNING_SECRET.encode(),
        msg=base,
        digestmod=hashlib.sha256,
    ).hexdigest()
    return hmac.compare_digest(req_sig, my_sig), ts, my_sig

def parse_slash_payload(body: bytes) -> dict:
    form = parse_qs(body.decode())
    get = lambda k: (form.get(k, [""])[0] or "").strip()
    return {
        "team_id": get("team_id"),
        "user_id": get("user_id"),
        "channel_id": get("channel_id"),
        "text": get("text"),
        "response_url": get("response_url"),
        "command": get("command"),
    }
app/main.py
from fastapi import FastAPI, Request, HTTPException, BackgroundTasks, Depends
from sqlalchemy.orm import Session
from .slack import verify_signature
from .db.session import get_db
from .tenancy import resolve_tenant, TenantNotFound, WorkspaceInactive, BQSourceNotFound
from .handler import handle_sql
from sqlalchemy import text as sa_text

app = FastAPI(title="SQL Bot - Part2")

@app.post("/slack/cmd")
async def slack_cmd(req: Request, bg: BackgroundTasks, db: Session = Depends(get_db)):
    body = await req.body()
    headers = req.headers

    ok, ts, sig = verify_signature(headers, body)
    if not ok:
        raise HTTPException(status_code=401, detail="invalid signature")

    # 任意:重複排除(5分間)、不要な場合外して下さい
    retry_num = headers.get("X-Slack-Retry-Num")
    if retry_num is not None:
        # Slackの再送は**同一イベント**。DBに記録していれば 200 OK でACKのみ返す、などの方針も可。
        pass

    # dedup(sig_hash + ts の組で5分の間に複数回来たら一度だけ処理)
    db.execute(sa_text("""
      DELETE FROM request_dedup WHERE created_at < now() - interval '10 minutes'
    """))
    found = db.execute(sa_text("""
      SELECT 1 FROM request_dedup WHERE sig_hash=:h AND ts=:t
    """), {"h": sig, "t": ts}).fetchone()
    if found:
        return {"response_type": "ephemeral", "text": "Duplicate (replayed) request ignored."}
    db.execute(sa_text("""
      INSERT INTO request_dedup(sig_hash, ts) VALUES(:h, :t)
    """), {"h": sig, "t": ts})
    db.commit()

    # team_id → tenant
    from .slack import parse_slash_payload
    payload = parse_slash_payload(body)
    team_id = payload["team_id"]

    try:
        tenant = resolve_tenant(db, team_id)
    except TenantNotFound:
        raise HTTPException(status_code=403, detail="unknown team_id")
    except WorkspaceInactive:
        raise HTTPException(status_code=403, detail="workspace is inactive")
    except BQSourceNotFound:
        raise HTTPException(status_code=500, detail="no default BigQuery source")

    # 以降の処理にtenant情報を渡す
    return await handle_sql(body, bg, tenant=tenant)

BigQuery 実行をテナント対応に

1. クライアント側(project/location のオーバーライドを許可)

bq/client.py
import os
from google.cloud import bigquery

DEFAULT_PROJECT = os.getenv("GOOGLE_CLOUD_PROJECT")
DEFAULT_LOCATION = os.getenv("BIGQUERY_LOCATION", "asia-northeast1")

def run_query(sql: str, project: str | None = None, location: str | None = None, max_rows: int = 50):
    project = project or DEFAULT_PROJECT
    location = location or DEFAULT_LOCATION
    client = bigquery.Client(project=project, location=location)
    job = client.query(sql, location=location)
    it = job.result(max_results=max_rows)             # RowIterator
    headers = [f.name for f in it.schema]             # ← ここがポイント
    rows = list(it)
    if not rows:
        return headers, []
    values = [tuple(row[h] for h in headers) for row in rows]
    return headers, values

2. ガード側(デフォルトDSをテナントごとに補完)

app/guards.py
import re

_FORBIDDEN = re.compile(r"\b(INSERT|UPDATE|DELETE|MERGE|TRUNCATE|DROP|ALTER|CREATE|GRANT|REVOKE)\b", re.I)

def assert_safe_select(sql: str, default_dataset: str | None):
    if not re.match(r"(?s)^\s*SELECT\b", sql, re.I):
        raise ValueError("Only SELECT is allowed.")
    if _FORBIDDEN.search(sql):
        raise ValueError("Dangerous SQL detected.")

    if default_dataset:
        def fix(m):
            tbl = m.group(1)
            if "." in tbl:  # 既に修飾あり
                return m.group(0)
            return m.group(0).replace(tbl, f"{default_dataset}.{tbl}")

        sql = re.sub(r"\bFROM\s+([a-zA-Z_][\w\.]*)\b", fix, sql, flags=re.I)
        sql = re.sub(r"\bJOIN\s+([a-zA-Z_][\w\.]*)\b", fix, sql, flags=re.I)

    return sql

3. ハンドラ(tenantを受け取って実行)

app/handler.py
import os, requests
from .slack import parse_slash_payload
from ..langgraph.nl2sql import generate_sql
from ..bq.client import run_query
from .formatting import format_table
from .guards import assert_safe_select
from fastapi import BackgroundTasks

def post_to_slack(response_url: str, text: str, in_channel: bool = True):
    payload = {"response_type": "in_channel" if in_channel else "ephemeral", "text": text}
    requests.post(response_url, json=payload, timeout=10)

async def handle_sql(body: bytes, bg: BackgroundTasks, tenant: dict):
    p = parse_slash_payload(body)
    natural = p["text"]
    response_url = p["response_url"]

    sql_raw = generate_sql(natural)
    try:
        sql = assert_safe_select(sql_raw, default_dataset=tenant["dataset"])
    except Exception as e:
        return {"response_type": "ephemeral", "text": f"Error: {e}"}

    inline = os.getenv("DEV_INLINE_REPLY", "false").lower() == "true" or not response_url

    def _execute():
        try:
            headers, rows = run_query(
                sql,
                project=tenant["project_id"],
                location=tenant["location"],
            )
            return f"*Company:* `{tenant['company_id']}`  *Team:* `{tenant['team_id']}`\n" \
                   f"*SQL*\n```\n{sql}\n```\n*RESULT*\n{format_table(headers, rows)}"
        except Exception as e:
            return f":warning: 実行エラー: {e}"

    if inline:
        return {"response_type": "ephemeral", "text": _execute()}

    bg.add_task(lambda: post_to_slack(response_url, _execute(), in_channel=True))
    return {"response_type": "ephemeral", "text": "受け付けました(multi-tenant)。結果は追って返信します。"}

動作確認(curl)

1. 署名検証をスキップしてローカルで最短確認(開発時)

# .env
DEV_INLINE_REPLY=true
DEV_SKIP_SLACK_VERIFY=true
DATABASE_URL=postgresql+psycopg2://user:pass@localhost:5432/sqlbot
curl -X POST http://127.0.0.1:8000/slack/cmd \
  -H "Content-Type: application/x-www-form-urlencoded" \
  --data 'team_id=T09GABUDW2X&user_id=U123&response_url=&text=SQL: SELECT CURRENT_DATE() AS today, 1 AS ok'

RESULT が返る(DBの slack_workspaces / bq_sources を参照)

2. 本番署名検証(Slack から実送)

  • Slack App に Slash コマンドを設定(URL:/slack/cmd)
  • SLACK_SIGNING_SECRET を環境変数に設定
  • 3秒以内のACKは Part 1 の実装通り(遅延返信)

本パートの落とし穴と対策

  • is_default が複数:DB制約はかけていないので、アプリ側で最新レコードを優先する or 部分インデックスを使う(高度実装は Part 4 以降で調整)
  • 未登録 team_id:403で返しつつ、登録手順(社内運用)を記事内・READMEに明記
  • リージョン不一致:bq_sources.location を正しく設定(asia-northeast1 等)
  • 重複再送:request_dedup で5〜10分のウィンドウで除外
  • 権限不足:Part 1 の ADC/SA + jobUser & dataViewer を再確認(プロジェクト / DS)

まとめ

  • DBはPostgreSQL に参照系テーブルを用意し、team_id → company → bq_source を 決定的に 解決
  • 署名検証 は±5分+HMAC、再送は dedup で安全に
  • BigQuery 実行は テナントごとの project/dataset/location を適用

追記:Dockerで起動する(PostgreSQL + API)

Cloud Run は必ずコンテナになるので、Part 2 以降は Docker を標準実行方法にしておくのがおすすめです。
ここでは PostgreSQL(テナントDB & dedup)+ API の2サービス構成を示します。

backend/Dockerfile
FROM python:3.11-slim

ENV PYTHONDONTWRITEBYTECODE=1 \
    PYTHONUNBUFFERED=1 \
    PIP_NO_CACHE_DIR=1

WORKDIR /app

RUN apt-get update && apt-get install -y --no-install-recommends \
    ca-certificates curl && \
    rm -rf /var/lib/apt/lists/*

COPY backend/requirements.txt /app/requirements.txt
RUN pip install -r /app/requirements.txt

COPY backend /app
EXPOSE 8000
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]

docker-compose.yml(API + Postgres / ADC優先・SAは代替)

docker-compose.yml
version: "3.9"
services:
  postgres:
    image: postgres:15
    container_name: sql-bot-postgres
    environment:
      POSTGRES_USER: sqlbot
      POSTGRES_PASSWORD: sqlbot
      POSTGRES_DB: sqlbot
    ports:
      - "5432:5432"
    volumes:
      - pgdata:/var/lib/postgresql/data
      # 初回起動時にマイグレーション & シードを自動実行したい場合は以下を有効化
      # - ./db/migrations:/docker-entrypoint-initdb.d/migrations:ro
      # - ./db/seed:/docker-entrypoint-initdb.d/seed:ro

  api:
    build:
      context: .
    container_name: sql-bot-api
    depends_on:
      - postgres
    ports:
      - "8000:8000"
    env_file:
      - backend/.env
    environment:
      DATABASE_URL: postgresql+psycopg2://sqlbot:sqlbot@postgres:5432/sqlbot
      # --- B) SA JSON を使う人だけ有効化(A: ADC の場合は不要) ---
      # GOOGLE_APPLICATION_CREDENTIALS: /secrets/sa.json
    volumes:
      - ./backend:/app
      # --- A) ADC をマウント(推奨・簡単) ---
      - ~/.config/gcloud:/root/.config/gcloud:ro
      # --- B) SA JSON を使う場合は上のADC行を外し、以下2行を有効に ---
      # - ./secrets/sa.json:/secrets/sa.json:ro
    restart: unless-stopped

volumes:
  pgdata:

どちらか一方だけ(ADC か SA JSON)を有効化してください。
本番(Cloud Run)は サービスアカウント鍵レスで運用(Part 4 で解説)。

実行手順

  1. 起動
docker compose up --build

2. マイグレーション & シードを適用

docker compose exec -T postgres psql -U sqlbot -d sqlbot -f /docker-entrypoint-initdb.d/migrations/20250929_part2_multitenant.sql
docker compose exec -T postgres psql -U sqlbot -d sqlbot -f /docker-entrypoint-initdb.d/seed/tenants.sql

もしくはローカルファイルを直接渡す

docker compose exec -T postgres psql -U sqlbot -d sqlbot < db/migrations/20250929_part2_multitenant.sql
docker compose exec -T postgres psql -U sqlbot -d sqlbot < db/seed/tenants.sql

ランディングページのご案内

プロダクト概要・導入メリット・デモイメージは、こちらのLPにまとめています。
👉 ランディングページ


前回:Part 1 へ

Part 1:アーキテクチャとMVPを一気通貫で(/sql コマンドで1クエリ実行する)

次回: Part 3:LangGraphでNL→SQLを実装(チェックポイント・再試行)

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?