LoginSignup
0
0

Dataform最初の一歩: CloudComposerからDataformのワークフローを呼び出すまで

Posted at

背景

Dataform使ってみる.
とりあえず、CloudComposerから呼び出すところまで作る

まずはDataformで簡単なワークフローを作成してみる

こんな感じになる

スクリーンショット 2023-06-14 10.13.04.png

Cloud Composerから呼び出してみる

Cloud Composerのコード

from datetime import datetime

from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow.operators.dummy import DummyOperator

from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
    DataformCancelWorkflowInvocationOperator,
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
    DataformGetCompilationResultOperator,
    DataformGetWorkflowInvocationOperator,
)

DAG_ID = "dataform"
PROJECT_ID = "suzuki-001"
REPOSITORY_ID = "quickstart-repository"
REGION = "us-central1"
GIT_COMMITISH = "main"

with models.DAG(
    DAG_ID,
    schedule_interval='@once',  # Override to match your needs
    start_date=datetime(2023, 6, 12),
    catchup=False,  # Override to match your needs
    tags=['dataform'],
) as dag:

    dummy1 = DummyOperator(
        task_id="dummy1"
    )

    dummy2 = DummyOperator(
        task_id="dummy2"
    )

    dummy3 = DummyOperator(
        task_id="dummy3"
    )

    create_compilation_result = DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
        },
    )

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id='create_workflow_invocation',
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
         workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
           , "invocation_config": { "included_tags": ["country"], "transitive_dependencies_included": True }
        },
    )

[dummy1, dummy2] >> dummy3 >> create_compilation_result >> create_workflow_invocation

DAGはこんな感じ

スクリーンショット 2023-06-14 11.10.25.png

簡単に呼び出せた。

タグの指定

参照

リポジトリ in Dataform

  • Dataformには内蔵のリポジトリがついている
  • コードをバージョン管理できる
  • github や gitlab と連携できる

ワークスペース

Dataformが用意する専用ワークスペース

definitions/: Dataform コアまたは JavaScript のアセット定義用のディレクトリ。
includes/: リポジトリ全体で再利用できるスクリプトと変数の空のディレクトリ。
dataform.json: アセットを公開する Google Cloud プロジェクト ID と BigQuery スキーマを含むデフォルトの Dataform 構成ファイル。デフォルト設定をオーバーライドしてニーズに合わせてカスタマイズできますが、Dataform の使用を開始する必要はありません。
package.json: 最新バージョンの @dataform/core を含むデフォルトの Dataform 依存関係構成ファイル。このファイルを使用して、パッケージをインポートできます。

dataformコア

Dataform コアは、SQL テーブルとワークフローを作成するためのオープンソースのメタ言語です。Dataform コアは、依存関係管理システム、自動データ品質テスト、データ ドキュメントを提供することで SQL を拡張します。

Dataform コアは次の目的で使用できます。
テーブル、ビュー、マテリアライズド ビュー、増分テーブルを定義する。
データ変換ロジックを定義する。
ソースデータの宣言とテーブルの依存関係を管理する。
コード内でテーブルと列の説明を記録する。
異なるクエリ間で関数と変数を再利用する。
データ アサーションを作成して、データの整合性を確保します。

パーティションやクラスタの定義

増分テーブル

データソースの宣言

config {
  type: "declaration",
  database: "",
  schema: "",
  name: "",
}
  • database に、データソースを含むプロジェクトのプロジェクト ID を入力します。
  • schema に、データソースが存在する BigQuery データセットを入力します。
  • name に、データソースとして使用するテーブルまたはビューの名前を入力します。その名前を使用して、Dataform でデータソースを参照できます。

タグ

タグ毎に実行できる

テスト assertation

JSでも書けちゃう

使い分けどうするの?(今度調べる)

  • ユーザー定義関数
  • 手続型言語
  • JSの使い分け

with CLOUD COMPOSER

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0