背景
Dataform使ってみる.
とりあえず、CloudComposerから呼び出すところまで作る
まずはDataformで簡単なワークフローを作成してみる
こんな感じになる
Cloud Composerから呼び出してみる
Cloud Composerのコード
from datetime import datetime
from google.cloud.dataform_v1beta1 import WorkflowInvocation
from airflow.operators.dummy import DummyOperator
from airflow import models
from airflow.models.baseoperator import chain
from airflow.providers.google.cloud.operators.dataform import (
DataformCancelWorkflowInvocationOperator,
DataformCreateCompilationResultOperator,
DataformCreateWorkflowInvocationOperator,
DataformGetCompilationResultOperator,
DataformGetWorkflowInvocationOperator,
)
DAG_ID = "dataform"
PROJECT_ID = "suzuki-001"
REPOSITORY_ID = "quickstart-repository"
REGION = "us-central1"
GIT_COMMITISH = "main"
with models.DAG(
DAG_ID,
schedule_interval='@once', # Override to match your needs
start_date=datetime(2023, 6, 12),
catchup=False, # Override to match your needs
tags=['dataform'],
) as dag:
dummy1 = DummyOperator(
task_id="dummy1"
)
dummy2 = DummyOperator(
task_id="dummy2"
)
dummy3 = DummyOperator(
task_id="dummy3"
)
create_compilation_result = DataformCreateCompilationResultOperator(
task_id="create_compilation_result",
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
compilation_result={
"git_commitish": GIT_COMMITISH,
},
)
create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
task_id='create_workflow_invocation',
project_id=PROJECT_ID,
region=REGION,
repository_id=REPOSITORY_ID,
workflow_invocation={
"compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
, "invocation_config": { "included_tags": ["country"], "transitive_dependencies_included": True }
},
)
[dummy1, dummy2] >> dummy3 >> create_compilation_result >> create_workflow_invocation
DAGはこんな感じ
簡単に呼び出せた。
タグの指定
参照
リポジトリ in Dataform
- Dataformには内蔵のリポジトリがついている
- コードをバージョン管理できる
- github や gitlab と連携できる
ワークスペース
Dataformが用意する専用ワークスペース
definitions/: Dataform コアまたは JavaScript のアセット定義用のディレクトリ。
includes/: リポジトリ全体で再利用できるスクリプトと変数の空のディレクトリ。
dataform.json: アセットを公開する Google Cloud プロジェクト ID と BigQuery スキーマを含むデフォルトの Dataform 構成ファイル。デフォルト設定をオーバーライドしてニーズに合わせてカスタマイズできますが、Dataform の使用を開始する必要はありません。
package.json: 最新バージョンの @dataform/core を含むデフォルトの Dataform 依存関係構成ファイル。このファイルを使用して、パッケージをインポートできます。
dataformコア
Dataform コアは、SQL テーブルとワークフローを作成するためのオープンソースのメタ言語です。Dataform コアは、依存関係管理システム、自動データ品質テスト、データ ドキュメントを提供することで SQL を拡張します。
Dataform コアは次の目的で使用できます。
テーブル、ビュー、マテリアライズド ビュー、増分テーブルを定義する。
データ変換ロジックを定義する。
ソースデータの宣言とテーブルの依存関係を管理する。
コード内でテーブルと列の説明を記録する。
異なるクエリ間で関数と変数を再利用する。
データ アサーションを作成して、データの整合性を確保します。
パーティションやクラスタの定義
増分テーブル
データソースの宣言
config {
type: "declaration",
database: "",
schema: "",
name: "",
}
- database に、データソースを含むプロジェクトのプロジェクト ID を入力します。
- schema に、データソースが存在する BigQuery データセットを入力します。
- name に、データソースとして使用するテーブルまたはビューの名前を入力します。その名前を使用して、Dataform でデータソースを参照できます。
タグ
タグ毎に実行できる
テスト assertation
JSでも書けちゃう
使い分けどうするの?(今度調べる)
- ユーザー定義関数
- 手続型言語
- JSの使い分け
with CLOUD COMPOSER