TL;DR
イベント駆動で最小構成:Salesforce/Stripe等 → Pub/Sub → Cloud Functions → BigQuery →(任意)Slack通知
冪等化と監査ログを最初に実装。通知は副作用として扱う
MERGEで最新状態を確定、バックフィル/ロールバックはBigQueryのタイムトラベルで吸収
1スプリント目は**“Won→Paidの可視化”**に限定(やらないことを決める)
目次
背景と前提
アーキテクチャ(Mermaid)
イベント仕様(最小Envelope)
実装ステップ(DDL/関数/デプロイ)
変換と最新化(MERGE)
動作確認(テストPublisher)
監視・運用・ロールバック
セキュリティ最小権限
コスト目安と撤退基準
よくある質問
付録(Terraform/Makefileひな型)
背景と前提
やることは一つ:商談(Closed Won) と 請求(Invoice Paid) を一気通貫で可視化し、当月NRR/回収ステータスを自動集計します。
前提は3つだけ。
ソース・オブ・トゥルースを固定(商談=Salesforce、請求=Stripe)
冪等化:event_id を必須、重複投入OK/下流で無害化
監査:副作用(通知・書き込み)を監査テーブルへ記録
アーキテクチャ(Mermaid)
graph LR
SF[(Salesforce CDC/PE)] --> T((Pub/Sub
revops-events)))
ST[(Stripe Webhook)] --> T
T --> CF[Cloud Functions v2
handle_pubsub]
CF --> RAW[(BigQuery
revops.events_raw)]
RAW --> STG1[revops.stg_opportunity]
RAW --> STG2[revops.stg_invoice]
STG1 --> FACT[revops.fact_opportunity]
STG2 --> FACTB[revops.fact_billing]
CF -- important only --> SLK[(Slack Webhook)]
FACT & FACTB --> BI[(Looker Studio/BI)]
イベント仕様(最小Envelope)
**イベントは“中立の封筒 + 生payload”**にします。
{
"source": "salesforce",
"type": "salesforce.opportunity.updated",
"event_ts": "2025-10-08T09:00:00Z",
"event_id": "salesforce:006xx00000ABCDEF:12345", // 自然キーを推奨
"correlation_id": "req-20251008-0001",
"payload": { /* ソース生JSONをそのまま */ }
}
event_id は自然キー(ReplayId/オブジェクトID+更新番号など)を優先。なければソート済みJSONのSHA256で安定化。
実装ステップ
- BigQuery:スキーマ作成(DDL)
-- データセット
CREATE SCHEMA IF NOT EXISTSrevops;
-- Raw:重複を許容する“着地”
CREATE TABLE IF NOT EXISTS revops.events_raw (
event_id STRING,
source STRING,
type STRING,
event_ts TIMESTAMP,
payload JSON,
correlation_id STRING,
inserted_at TIMESTAMP
)
PARTITION BY DATE(event_ts)
CLUSTER BY source, type;
-- 監査イベント:副作用や失敗も記録
CREATE TABLE IF NOT EXISTS revops.audit_events (
at TIMESTAMP,
actor STRING,
resource STRING,
action STRING,
status STRING,
request_id STRING,
event_id STRING
)
PARTITION BY DATE(at)
CLUSTER BY resource, action;
- Cloud Functions v2(Python 3.11)
main.py
import base64, os, json, hashlib
from datetime import datetime, timezone
from typing import Any, Dict
import requests
from google.cloud import bigquery
BQ_DATASET = os.environ.get("BQ_DATASET", "revops")
SLACK_WEBHOOK_URL = os.environ.get("SLACK_WEBHOOK_URL", "")
bq = bigquery.Client()
def _now(): return datetime.now(timezone.utc).isoformat()
def _stable_id(source: str, msg: Dict[str, Any]) -> str:
natural = msg.get("event_id") or msg.get("id") or msg.get("sf_replay_id")
if natural: return f"{source}:{natural}"
body = json.dumps(msg, sort_keys=True, ensure_ascii=False).encode()
return f"{source}:sha256:{hashlib.sha256(body).hexdigest()}"
def _insert(table: str, row: Dict[str, Any], row_id: str|None=None):
table_ref = f"{bq.project}.{BQ_DATASET}.{table}"
errs = bq.insert_rows_json(table_ref, [row], row_ids=[row_id] if row_id else None)
if errs: raise RuntimeError(errs)
def _notify(evt_type: str, p: Dict[str, Any]):
if not SLACK_WEBHOOK_URL: return
text = None
if evt_type == "salesforce.opportunity.updated":
if p.get("stage_name") in {"Closed Won", "Closed Lost"}:
text = f":salesforce: {p.get('opportunity_name','Opportunity')} -> {p['stage_name']} ({p.get('amount')})"
elif evt_type == "stripe.invoice.paid":
text = f"
Invoice paid {p.get('customer_email') or p.get('customer_id')} ({p.get('amount_due')})"
if text:
try:
requests.post(SLACK_WEBHOOK_URL, json={"text": text}, timeout=5).raise_for_status()
except Exception as e:
print({"slack_error": str(e)})
def handle_pubsub(event, context):
try:
raw = base64.b64decode(event["data"]).decode("utf-8")
msg = json.loads(raw)
except Exception as e:
print({"decode_error": str(e)}); return
src, typ = msg.get("source","unknown"), msg.get("type","unknown")
payload = msg.get("payload", {})
eid = _stable_id(src, msg)
row = {
"event_id": eid,
"source": src,
"type": typ,
"event_ts": msg.get("event_ts") or _now(),
"payload": payload,
"correlation_id": msg.get("correlation_id") or eid[:32],
"inserted_at": _now()
}
try:
_insert("events_raw", row, row_id=eid) # ベストエフォート重複排除
_insert("audit_events", {
"at": _now(), "actor":"system", "resource": typ,
"action": typ, "status":"received", "request_id": row["correlation_id"], "event_id": eid
})
except Exception as e:
print({"bq_error": str(e)})
_notify(typ, payload)
print({"ok": True, "event_id": eid})
requirements.txt
google-cloud-bigquery>=3.13.0
requests>=2.32.0
- デプロイ(gcloud)
Pub/Sub
gcloud pubsub topics create revops-events
BigQuery(SQLは上記DDLを実行)
bq --location=asia-northeast1 mk -d revops
Functions v2
gcloud functions deploy handle_pubsub
--gen2 --region=asia-northeast1 --runtime=python311
--trigger-topic=revops-events
--set-env-vars=BQ_DATASET=revops,SLACK_WEBHOOK_URL=https://hooks.slack.com/services/XXX/YYY/ZZZ
変換と最新化(MERGE)
ステージング(型付け)
-- Salesforce Opportunity(型付け)
CREATE OR REPLACE TABLE revops.stg_opportunity
PARTITION BY DATE(event_ts)
AS
SELECT
JSON_VALUE(payload,'$.opportunity_id') AS opportunity_id,
JSON_VALUE(payload,'$.opportunity_name') AS opportunity_name,
JSON_VALUE(payload,'$.account_id') AS account_id,
SAFE_CAST(JSON_VALUE(payload,'$.amount') AS NUMERIC) AS amount,
JSON_VALUE(payload,'$.stage_name') AS stage_name,
SAFE_CAST(JSON_VALUE(payload,'$.close_date') AS DATE) AS close_date,
SAFE_CAST(JSON_VALUE(payload,'$.is_deleted') AS BOOL) AS is_deleted,
event_ts
FROM revops.events_raw
WHERE type = 'salesforce.opportunity.updated';
-- Stripe Invoice(型付け)
CREATE OR REPLACE TABLE revops.stg_invoice
PARTITION BY DATE(event_ts)
AS
SELECT
JSON_VALUE(payload,'$.invoice_id') AS invoice_id,
JSON_VALUE(payload,'$.customer_id') AS customer_id,
JSON_VALUE(payload,'$.customer_email') AS customer_email,
SAFE_CAST(JSON_VALUE(payload,'$.amount_due') AS NUMERIC) AS amount_due,
JSON_VALUE(payload,'$.status') AS status,
SAFE_CAST(JSON_VALUE(payload,'$.paid_at') AS TIMESTAMP) AS paid_at,
event_ts
FROM revops.events_raw
WHERE type = 'stripe.invoice.paid';
スナップショット(最新化:MERGE)
-- 商談の最新化
CREATE TABLE IF NOT EXISTS revops.fact_opportunity(
opportunity_id STRING, account_id STRING, opportunity_name STRING,
amount NUMERIC, stage_name STRING, close_date DATE, is_deleted BOOL,
updated_at TIMESTAMP
)
PARTITION BY DATE(updated_at);
MERGE revops.fact_opportunity T
USING (
SELECT * EXCEPT(rn) FROM (
SELECT
opportunity_id, account_id, opportunity_name, amount, stage_name, close_date, is_deleted, event_ts,
ROW_NUMBER() OVER(PARTITION BY opportunity_id ORDER BY event_ts DESC) rn
FROM revops.stg_opportunity
)
WHERE rn = 1
) S
ON T.opportunity_id = S.opportunity_id
WHEN MATCHED THEN UPDATE SET
account_id=S.account_id, opportunity_name=S.opportunity_name,
amount=S.amount, stage_name=S.stage_name, close_date=S.close_date,
is_deleted=S.is_deleted, updated_at=S.event_ts
WHEN NOT MATCHED THEN INSERT (opportunity_id, account_id, opportunity_name, amount, stage_name, close_date, is_deleted, updated_at)
VALUES (S.opportunity_id, S.account_id, S.opportunity_name, S.amount, S.stage_name, S.close_date, S.is_deleted, S.event_ts);
スケジュールドクエリ / dbt / Dataform で 5–15分間隔に設定。
動作確認(テストPublisher)
ローカルからテストイベントを投げます。
gen_events.py
import json, os
from google.cloud import pubsub_v1
project = os.environ["PROJECT_ID"]; topic = "revops-events"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project, topic)
def publish(msg): print("published:", publisher.publish(topic_path, json.dumps(msg).encode()).result())
publish({
"source":"salesforce","type":"salesforce.opportunity.updated",
"event_ts":"2025-10-08T09:00:00Z","event_id":"006xx00000ABCDEF:42",
"payload":{"opportunity_id":"006xx00000ABCDEF","opportunity_name":"ACME_2025_拡張",
"account_id":"001xx00000ZZZZZ","amount":1200000,"stage_name":"Closed Won",
"close_date":"2025-10-15","is_deleted":false}
})
publish({
"source":"stripe","type":"stripe.invoice.paid",
"event_ts":"2025-10-08T09:05:00Z","event_id":"in_1ABCDxyz",
"payload":{"invoice_id":"in_1ABCDxyz","customer_id":"cus_123","customer_email":"user@example.com",
"amount_due":1200000,"status":"paid","paid_at":"2025-10-08T09:05:01Z"}
})
確認クエリ
-- Raw着地数
SELECT type, COUNT(*) FROM revops.events_raw
WHERE DATE(event_ts)=CURRENT_DATE() GROUP BY type;
-- ステージ → ファクトの最新件数
SELECT COUNT(*) FROM revops.fact_opportunity;
監視・運用・ロールバック
遅延監視
SELECT type, AVG(TIMESTAMP_DIFF(inserted_at, event_ts, SECOND)) AS sec_delay
FROM revops.events_raw
WHERE DATE(event_ts) >= DATE_SUB(CURRENT_DATE(), INTERVAL 1 DAY)
GROUP BY type;
失敗検知:audit_events の status!='received'(将来の失敗記録も含める設計)
ロールバック:BigQueryのタイムトラベルで特定時点へ復元
-- 24時間前のfactを参照
SELECT * FROM revops.fact_opportunity FOR SYSTEM_TIME AS OF TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 24 HOUR);
バックフィル:events_raw を期間指定で再変換(スケジュール外でも再実行可能)
セキュリティ最小権限
FunctionsのSA:roles/pubsub.subscriber, roles/bigquery.dataEditor, roles/bigquery.user
Secret Managerで Slack Webhook を管理、環境変数へ参照のみ
PIIは Raw でマスキング or ステージで選択投影
監査(actor/resource/action/request_id)は必須列として固定
コスト目安と撤退基準
規模:1〜3万イベント/日、Raw 10–20GB/月、Functions 100–200万リクエスト/月
GCP最小構成で数万円/月レンジが多い(組織差あり)
撤退基準(ローンチ後30日で判断)
Won→Paid の平均遅延 ≤15分にならない
ダッシュボードの日次利用率 < 50%
重複/欠損の発生率 > 0.1% が継続
→ いずれか満たす場合は要件再定義 or スタック変更(例:Change Data Capture方式の再評価)
よくある質問(短問短答)
Q. 重複はゼロにできますか?
A. いいえ。Pub/Subは少なくとも1回配信です。event_id と MERGE で無害化します。
Q. 双方向同期は?
A. まず一方向で固定。双方向は競合解決ルールと監査が不可欠で別設計です。
Q. 通知が落ちたら?
A. 業務は継続。通知は副作用。後から再送可、根治はデータ再計算です。
Q. dbt/Dataformは必須?
**A. いいえ。**最初はスケジュールドクエリで十分。運用の成熟に合わせて導入。
