4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Cloud Functions から Dataform を呼び出す – 実務レベルで理解する自動実行フロー

4
Last updated at Posted at 2025-12-11

はじめに

GCP 上で ETL / ELT パイプラインを構築していると、「外部イベントをトリガーに Dataform を自動実行したい」 という要件があります。
今回は実務で使う組み合わせCloud Functions → Dataform APIを例に、以下 5 つのポイントで整理します。

1. Dataform APIとは(Workflow Invocations)

Dataform は 2023 以降、Dataform API により外部から workflow を起動できるようになりました。
主要エンドポイントは以下:

POST https://dataform.googleapis.com/v1beta1/projects/{project}/locations/{location}/repositories/{repo}/workflowInvocations

役割は以下の通り:

  • Workflow Configuration を指定して Dataform のビルド・実行を開始
  • 完了まで待たずに RUNNING / QUEUED のレスポンスを返す(非同期)
  • 実行状況は Dataform UI または API で確認可能

実務では
Cloud Functions から API を叩く → 下流で Dataform が DWH を更新
という構成が最も一般的です。

2. Cloud Functions での Dataform API 呼出例(Python)

📌 Python 版(Functions Framework)

import json
import os
import functions_framework
from google.auth.transport.requests import AuthorizedSession
from google.auth import default as google_auth_default

@functions_framework.http
def start_dataform(request):
    creds, _ = google_auth_default(scopes=["https://www.googleapis.com/auth/cloud-platform"])
    authed_session = AuthorizedSession(creds)

    project_id = os.getenv("DATAFORM_PROJECT_ID")
    location = os.getenv("DATAFORM_LOCATION")
    repo_id = os.getenv("DATAFORM_REPOSITORY_ID")
    default_config_id = os.getenv("DATAFORM_WORKFLOW_CONFIG_ID")

    body = request.get_json(silent=True) or {}
    workflow_config_id = body.get("workflow_config_id", default_config_id)

    url = (
        f"https://dataform.googleapis.com/v1beta1/projects/{project_id}"
        f"/locations/{location}/repositories/{repo_id}"
        f"/workflowInvocations"
    )

    payload = {"workflowConfig": workflow_config_id}

    response = authed_session.post(url, json=payload, timeout=30)

    if not response.ok:
        return {"error": response.text}, 500

    return {
        "status": "RUNNING",
        "invocation": response.json()
    }, 200

3. 認証設定(Service Account / ADC)

Cloud Functions では通常 Application Default Credentials (ADC) が利用されます。

必要なロール:

roles/dataform.admin
roles/dataform.editor
roles/cloudfunctions.invoker
roles/iam.serviceAccountUser

ポイント:

  • Cloud Functions 実行時の Service Account にロールを付与する
  • Dataform API は cloud-platform スコープで十分
  • Token 管理は不要(AuthorizedSession が自動更新)

4. 成功・失敗ログの扱い(Cloud Logging + Slack 通知)

Dataform は非同期実行のため、成功・失敗の監視が非常に重要です。

📌 Cloud Logging で記録すべき項目

  • request payload(workflow_config_id)
  • Dataform API のレスポンス(invocationId)
  • status code
  • 発生例外
  • 実行時間(latency)

Log-based Metric を作成すれば、Alert に連動させることも可能です。


📌 Slack 通知(例:失敗時のみ)

Cloud Functions 内で直接送信するか、Error Reporting 経由でも可能。

Webhook の例:

import requests

def notify_slack(message):
    webhook = os.getenv("SLACK_WEBHOOK_URL")
    requests.post(webhook, json={"text": message})

失敗時:

notify_slack(f"[Dataform Trigger] Error: {response.text}")

5.(任意)Cloud Tasks との組み合わせ

頻繁に Dataform を起動する場合、以下を避けるために Cloud Tasks を併用することがあります:

  • 多重実行
  • 実行順序の乱れ
  • リトライ処理の複雑化
    構成例:
イベント → Cloud Tasks → Cloud Functions(Dataform API Call)

必要に応じて Task キュー制御を追加してください。

📌 まとめ

今回紹介した構成は、GCP で ETL/ELT を運用する際に最も再現性が高いパターンです。

本記事で紹介したポイント

  • Dataform API は Workflow Invocation を作るだけ、非同期で返ってくる
  • Cloud Functions からの呼び出しで簡単に自動化できる
  • 認証は ADC + 適切なロール付与 で完結
  • Cloud Logging と Slack 通知で運用監視を強化できる
  • 必要に応じて Cloud Tasks で実行制御も可能
4
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?