はじめに
GCP 上で ETL / ELT パイプラインを構築していると、「外部イベントをトリガーに Dataform を自動実行したい」 という要件があります。
今回は実務で使う組み合わせCloud Functions → Dataform APIを例に、以下 5 つのポイントで整理します。
1. Dataform APIとは(Workflow Invocations)
Dataform は 2023 以降、Dataform API により外部から workflow を起動できるようになりました。
主要エンドポイントは以下:
POST https://dataform.googleapis.com/v1beta1/projects/{project}/locations/{location}/repositories/{repo}/workflowInvocations
役割は以下の通り:
- Workflow Configuration を指定して Dataform のビルド・実行を開始
- 完了まで待たずに
RUNNING/QUEUEDのレスポンスを返す(非同期) - 実行状況は Dataform UI または API で確認可能
実務では
Cloud Functions から API を叩く → 下流で Dataform が DWH を更新
という構成が最も一般的です。
2. Cloud Functions での Dataform API 呼出例(Python)
📌 Python 版(Functions Framework)
import json
import os
import functions_framework
from google.auth.transport.requests import AuthorizedSession
from google.auth import default as google_auth_default
@functions_framework.http
def start_dataform(request):
creds, _ = google_auth_default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
authed_session = AuthorizedSession(creds)
project_id = os.getenv("DATAFORM_PROJECT_ID")
location = os.getenv("DATAFORM_LOCATION")
repo_id = os.getenv("DATAFORM_REPOSITORY_ID")
default_config_id = os.getenv("DATAFORM_WORKFLOW_CONFIG_ID")
body = request.get_json(silent=True) or {}
workflow_config_id = body.get("workflow_config_id", default_config_id)
url = (
f"https://dataform.googleapis.com/v1beta1/projects/{project_id}"
f"/locations/{location}/repositories/{repo_id}"
f"/workflowInvocations"
)
payload = {"workflowConfig": workflow_config_id}
response = authed_session.post(url, json=payload, timeout=30)
if not response.ok:
return {"error": response.text}, 500
return {
"status": "RUNNING",
"invocation": response.json()
}, 200
3. 認証設定(Service Account / ADC)
Cloud Functions では通常 Application Default Credentials (ADC) が利用されます。
必要なロール:
roles/dataform.admin
roles/dataform.editor
roles/cloudfunctions.invoker
roles/iam.serviceAccountUser
ポイント:
- Cloud Functions 実行時の Service Account にロールを付与する
- Dataform API は
cloud-platformスコープで十分 - Token 管理は不要(AuthorizedSession が自動更新)
4. 成功・失敗ログの扱い(Cloud Logging + Slack 通知)
Dataform は非同期実行のため、成功・失敗の監視が非常に重要です。
📌 Cloud Logging で記録すべき項目
- request payload(workflow_config_id)
- Dataform API のレスポンス(invocationId)
- status code
- 発生例外
- 実行時間(latency)
Log-based Metric を作成すれば、Alert に連動させることも可能です。
📌 Slack 通知(例:失敗時のみ)
Cloud Functions 内で直接送信するか、Error Reporting 経由でも可能。
Webhook の例:
import requests
def notify_slack(message):
webhook = os.getenv("SLACK_WEBHOOK_URL")
requests.post(webhook, json={"text": message})
失敗時:
notify_slack(f"[Dataform Trigger] Error: {response.text}")
5.(任意)Cloud Tasks との組み合わせ
頻繁に Dataform を起動する場合、以下を避けるために Cloud Tasks を併用することがあります:
- 多重実行
- 実行順序の乱れ
- リトライ処理の複雑化
構成例:
イベント → Cloud Tasks → Cloud Functions(Dataform API Call)
必要に応じて Task キュー制御を追加してください。
📌 まとめ
今回紹介した構成は、GCP で ETL/ELT を運用する際に最も再現性が高いパターンです。
本記事で紹介したポイント
- Dataform API は Workflow Invocation を作るだけ、非同期で返ってくる
- Cloud Functions からの呼び出しで簡単に自動化できる
- 認証は ADC + 適切なロール付与 で完結
- Cloud Logging と Slack 通知で運用監視を強化できる
- 必要に応じて Cloud Tasks で実行制御も可能