0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Salesforce→Pub/Sub→BigQuery→Slackで「商談→請求」を自動化する最小構成(15日で走らせる)

Posted at

1be25995-347b-4108-a26e-d5d7cccdfe0f.png

TL;DR

イベント駆動で最小構成:Salesforce/Stripe等 → Pub/Sub → Cloud Functions → BigQuery →(任意)Slack通知

冪等化と監査ログを最初に実装。通知は副作用として扱う

MERGEで最新状態を確定、バックフィル/ロールバックはBigQueryのタイムトラベルで吸収

1スプリント目は**“Won→Paidの可視化”**に限定(やらないことを決める)

目次

背景と前提

アーキテクチャ(Mermaid)

イベント仕様(最小Envelope)

実装ステップ(DDL/関数/デプロイ)

変換と最新化(MERGE)

動作確認(テストPublisher)

監視・運用・ロールバック

セキュリティ最小権限

コスト目安と撤退基準

よくある質問

付録(Terraform/Makefileひな型)

背景と前提

やることは一つ:商談(Closed Won) と 請求(Invoice Paid) を一気通貫で可視化し、当月NRR/回収ステータスを自動集計します。
前提は3つだけ。

ソース・オブ・トゥルースを固定(商談=Salesforce、請求=Stripe)

冪等化:event_id を必須、重複投入OK/下流で無害化

監査:副作用(通知・書き込み)を監査テーブルへ記録

アーキテクチャ(Mermaid)
graph LR
SF[(Salesforce CDC/PE)] --> T((Pub/Sub
revops-events)))
ST[(Stripe Webhook)] --> T
T --> CF[Cloud Functions v2
handle_pubsub]
CF --> RAW[(BigQuery
revops.events_raw)]
RAW --> STG1[revops.stg_opportunity]
RAW --> STG2[revops.stg_invoice]
STG1 --> FACT[revops.fact_opportunity]
STG2 --> FACTB[revops.fact_billing]
CF -- important only --> SLK[(Slack Webhook)]
FACT & FACTB --> BI[(Looker Studio/BI)]

イベント仕様(最小Envelope)

**イベントは“中立の封筒 + 生payload”**にします。

{
"source": "salesforce",
"type": "salesforce.opportunity.updated",
"event_ts": "2025-10-08T09:00:00Z",
"event_id": "salesforce:006xx00000ABCDEF:12345", // 自然キーを推奨
"correlation_id": "req-20251008-0001",
"payload": { /* ソース生JSONをそのまま */ }
}

event_id は自然キー(ReplayId/オブジェクトID+更新番号など)を優先。なければソート済みJSONのSHA256で安定化。

実装ステップ

  1. BigQuery:スキーマ作成(DDL)
    -- データセット
    CREATE SCHEMA IF NOT EXISTS revops;

-- Raw:重複を許容する“着地”
CREATE TABLE IF NOT EXISTS revops.events_raw (
event_id STRING,
source STRING,
type STRING,
event_ts TIMESTAMP,
payload JSON,
correlation_id STRING,
inserted_at TIMESTAMP
)
PARTITION BY DATE(event_ts)
CLUSTER BY source, type;

-- 監査イベント:副作用や失敗も記録
CREATE TABLE IF NOT EXISTS revops.audit_events (
at TIMESTAMP,
actor STRING,
resource STRING,
action STRING,
status STRING,
request_id STRING,
event_id STRING
)
PARTITION BY DATE(at)
CLUSTER BY resource, action;

  1. Cloud Functions v2(Python 3.11)

main.py

import base64, os, json, hashlib
from datetime import datetime, timezone
from typing import Any, Dict
import requests
from google.cloud import bigquery

BQ_DATASET = os.environ.get("BQ_DATASET", "revops")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
bq = bigquery.Client()

def _now(): return datetime.now(timezone.utc).isoformat()
def _stable_id(source: str, msg: Dict[str, Any]) -> str:
natural = msg.get("event_id") or msg.get("id") or msg.get("sf_replay_id")
if natural: return f"{source}:{natural}"
body = json.dumps(msg, sort_keys=True, ensure_ascii=False).encode()
return f"{source}:sha256:{hashlib.sha256(body).hexdigest()}"

def _insert(table: str, row: Dict[str, Any], row_id: str|None=None):
table_ref = f"{bq.project}.{BQ_DATASET}.{table}"
errs = bq.insert_rows_json(table_ref, [row], row_ids=[row_id] if row_id else None)
if errs: raise RuntimeError(errs)

def _notify(evt_type: str, p: Dict[str, Any]):
if not SLACK_WEBHOOK_URL: return
text = None
if evt_type == "salesforce.opportunity.updated":
if p.get("stage_name") in {"Closed Won", "Closed Lost"}:
text = f":salesforce: {p.get('opportunity_name','Opportunity')} -> {p['stage_name']} ({p.get('amount')})"
elif evt_type == "stripe.invoice.paid":
text = f":money_with_wings: Invoice paid {p.get('customer_email') or p.get('customer_id')} ({p.get('amount_due')})"
if text:
try:
requests.post(SLACK_WEBHOOK_URL, json={"text": text}, timeout=5).raise_for_status()
except Exception as e:
print({"slack_error": str(e)})

def handle_pubsub(event, context):
try:
raw = base64.b64decode(event["data"]).decode("utf-8")
msg = json.loads(raw)
except Exception as e:
print({"decode_error": str(e)}); return

src, typ = msg.get("source","unknown"), msg.get("type","unknown")
payload = msg.get("payload", {})
eid = _stable_id(src, msg)
row = {
    "event_id": eid,
    "source": src,
    "type": typ,
    "event_ts": msg.get("event_ts") or _now(),
    "payload": payload,
    "correlation_id": msg.get("correlation_id") or eid[:32],
    "inserted_at": _now()
}
try:
    _insert("events_raw", row, row_id=eid)   # ベストエフォート重複排除
    _insert("audit_events", {
        "at": _now(), "actor":"system", "resource": typ,
        "action": typ, "status":"received", "request_id": row["correlation_id"], "event_id": eid
    })
except Exception as e:
    print({"bq_error": str(e)})

_notify(typ, payload)
print({"ok": True, "event_id": eid})

requirements.txt

google-cloud-bigquery>=3.13.0
requests>=2.32.0

  1. デプロイ(gcloud)

Pub/Sub

gcloud pubsub topics create revops-events

BigQuery(SQLは上記DDLを実行)

bq --location=asia-northeast1 mk -d revops

Functions v2

gcloud functions deploy handle_pubsub
--gen2 --region=asia-northeast1 --runtime=python311
--trigger-topic=revops-events
--set-env-vars=BQ_DATASET=revops,SLACK_WEBHOOK_URL=https://hooks.slack.com/services/XXX/YYY/ZZZ

変換と最新化(MERGE)
ステージング(型付け)
-- Salesforce Opportunity(型付け)
CREATE OR REPLACE TABLE revops.stg_opportunity
PARTITION BY DATE(event_ts)
AS
SELECT
JSON_VALUE(payload,'$.opportunity_id') AS opportunity_id,
JSON_VALUE(payload,'$.opportunity_name') AS opportunity_name,
JSON_VALUE(payload,'$.account_id') AS account_id,
SAFE_CAST(JSON_VALUE(payload,'$.amount') AS NUMERIC) AS amount,
JSON_VALUE(payload,'$.stage_name') AS stage_name,
SAFE_CAST(JSON_VALUE(payload,'$.close_date') AS DATE) AS close_date,
SAFE_CAST(JSON_VALUE(payload,'$.is_deleted') AS BOOL) AS is_deleted,
event_ts
FROM revops.events_raw
WHERE type = 'salesforce.opportunity.updated';

-- Stripe Invoice(型付け)
CREATE OR REPLACE TABLE revops.stg_invoice
PARTITION BY DATE(event_ts)
AS
SELECT
JSON_VALUE(payload,'$.invoice_id') AS invoice_id,
JSON_VALUE(payload,'$.customer_id') AS customer_id,
JSON_VALUE(payload,'$.customer_email') AS customer_email,
SAFE_CAST(JSON_VALUE(payload,'$.amount_due') AS NUMERIC) AS amount_due,
JSON_VALUE(payload,'$.status') AS status,
SAFE_CAST(JSON_VALUE(payload,'$.paid_at') AS TIMESTAMP) AS paid_at,
event_ts
FROM revops.events_raw
WHERE type = 'stripe.invoice.paid';

スナップショット(最新化:MERGE)
-- 商談の最新化
CREATE TABLE IF NOT EXISTS revops.fact_opportunity(
opportunity_id STRING, account_id STRING, opportunity_name STRING,
amount NUMERIC, stage_name STRING, close_date DATE, is_deleted BOOL,
updated_at TIMESTAMP
)
PARTITION BY DATE(updated_at);

MERGE revops.fact_opportunity T
USING (
SELECT * EXCEPT(rn) FROM (
SELECT
opportunity_id, account_id, opportunity_name, amount, stage_name, close_date, is_deleted, event_ts,
ROW_NUMBER() OVER(PARTITION BY opportunity_id ORDER BY event_ts DESC) rn
FROM revops.stg_opportunity
)
WHERE rn = 1
) S
ON T.opportunity_id = S.opportunity_id
WHEN MATCHED THEN UPDATE SET
account_id=S.account_id, opportunity_name=S.opportunity_name,
amount=S.amount, stage_name=S.stage_name, close_date=S.close_date,
is_deleted=S.is_deleted, updated_at=S.event_ts
WHEN NOT MATCHED THEN INSERT (opportunity_id, account_id, opportunity_name, amount, stage_name, close_date, is_deleted, updated_at)
VALUES (S.opportunity_id, S.account_id, S.opportunity_name, S.amount, S.stage_name, S.close_date, S.is_deleted, S.event_ts);

スケジュールドクエリ / dbt / Dataform で 5–15分間隔に設定。

動作確認(テストPublisher)

ローカルからテストイベントを投げます。

gen_events.py

import json, os
from google.cloud import pubsub_v1
project = os.environ["PROJECT_ID"]; topic = "revops-events"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic)
def publish(msg): print("published:", publisher.publish(topic_path, json.dumps(msg).encode()).result())

publish({
"source":"salesforce","type":"salesforce.opportunity.updated",
"event_ts":"2025-10-08T09:00:00Z","event_id":"006xx00000ABCDEF:42",
"payload":{"opportunity_id":"006xx00000ABCDEF","opportunity_name":"ACME_2025_拡張",
"account_id":"001xx00000ZZZZZ","amount":1200000,"stage_name":"Closed Won",
"close_date":"2025-10-15","is_deleted":false}
})
publish({
"source":"stripe","type":"stripe.invoice.paid",
"event_ts":"2025-10-08T09:05:00Z","event_id":"in_1ABCDxyz",
"payload":{"invoice_id":"in_1ABCDxyz","customer_id":"cus_123","customer_email":"user@example.com",
"amount_due":1200000,"status":"paid","paid_at":"2025-10-08T09:05:01Z"}
})

確認クエリ

-- Raw着地数
SELECT type, COUNT(*) FROM revops.events_raw
WHERE DATE(event_ts)=CURRENT_DATE() GROUP BY type;

-- ステージ → ファクトの最新件数
SELECT COUNT(*) FROM revops.fact_opportunity;

監視・運用・ロールバック

遅延監視

SELECT type, AVG(TIMESTAMP_DIFF(inserted_at, event_ts, SECOND)) AS sec_delay
FROM revops.events_raw
WHERE DATE(event_ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY type;

失敗検知:audit_events の status!='received'(将来の失敗記録も含める設計)

ロールバック:BigQueryのタイムトラベルで特定時点へ復元

-- 24時間前のfactを参照
SELECT * FROM revops.fact_opportunity FOR SYSTEM_TIME AS OF TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR);

バックフィル:events_raw を期間指定で再変換(スケジュール外でも再実行可能)

セキュリティ最小権限

FunctionsのSA:roles/pubsub.subscriber, roles/bigquery.dataEditor, roles/bigquery.user

Secret Managerで Slack Webhook を管理、環境変数へ参照のみ

PIIは Raw でマスキング or ステージで選択投影

監査(actor/resource/action/request_id)は必須列として固定

コスト目安と撤退基準

規模:1〜3万イベント/日、Raw 10–20GB/月、Functions 100–200万リクエスト/月

GCP最小構成で数万円/月レンジが多い(組織差あり)

撤退基準(ローンチ後30日で判断)

Won→Paid の平均遅延 ≤15分にならない

ダッシュボードの日次利用率 < 50%

重複/欠損の発生率 > 0.1% が継続
→ いずれか満たす場合は要件再定義 or スタック変更(例:Change Data Capture方式の再評価)

よくある質問(短問短答)

Q. 重複はゼロにできますか?
A. いいえ。Pub/Subは少なくとも1回配信です。event_id と MERGE で無害化します。

Q. 双方向同期は?
A. まず一方向で固定。双方向は競合解決ルールと監査が不可欠で別設計です。

Q. 通知が落ちたら?
A. 業務は継続。通知は副作用。後から再送可、根治はデータ再計算です。

Q. dbt/Dataformは必須?
**A. いいえ。**最初はスケジュールドクエリで十分。運用の成熟に合わせて導入。

樋口洋士
https://signalstack.tokyo/

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?