1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

AirflowからDataformにdata_interval_endなどのcontext変数を渡す方法

Posted at

先日GCPのDataformがGAリリースされました。
せっかくなので、まずAirflowにある既存ワークフローの一部をDataformで書き換えようと思いました。

AirflowからDataformをトリッガーする

ドキュメントを調べると、AirflowからDataformをトリッガーするoperatorはすでに存在しています。
https://cloud.google.com/dataform/docs/schedule-executions-composer#create_an_airflow_dag_that_schedules_workflow_invocations

簡単にまとめると

  1. DataformCreateCompilationResultOperator: sqlxをsqlにコンパイルする
  2. DataformCreateWorkflowInvocationOperator: sqlを実行する

しかし、どのようにAirflowからDataformへ変数を渡すかについてはドキュメントに記載されていません。

Dataformに変数を渡す

まず、Dataformの設定ファイルdataform.jsonに変数varsを追加しておきましょう。

{
  "defaultSchema": "dataform",
  "assertionSchema": "dataform_assertions",
  "warehouse": "bigquery",
  "defaultDatabase": "project-stg",
  "defaultLocation": "asia-northeast1",
  "vars": {
    "bq_suffix": "_stg",
    "execution_date": "2023-05-24"
  }
}

DataformCreateCompilationResultOperatorのソースを調べてみたところ、compilation_resultという引数があることを発見しました。

compilation_resultの中身を確認するため、APIの詳細を調べました。
https://cloud.google.com/dataform/reference/rest/v1beta1/CodeCompilationConfig

CodeCompilationConfig内にvarsという変数を指定できるようです。

{
  "defaultDatabase": string,
  "defaultSchema": string,
  "defaultLocation": string,
  "assertionSchema": string,
  "vars": {
    string: string,
    ...
  },
  "databaseSuffix": string,
  "schemaSuffix": string,
  "tablePrefix": string
}

BigQueryのsuffixをcode_compilation_configvarsへ渡してみたら問題なく実行できました。ちなみに、Dataform側からはdataform.projectConfig.vars.bq_suffixで変数を呼び出せます。

DataformCreateCompilationResultOperator(
    task_id="create_compilation_result",
    project_id=PROJECT_ID,
    region=REGION,
    repository_id=REPOSITORY_ID,
    compilation_result={
        "git_commitish": GIT_COMMITISH,
        "code_compilation_config": {
            "vars": {
                "bq_suffix": "_stg",
            }
        },
    },
)

Dataformにcontext変数を渡す

増分処理する際によくdata_interval_endなどのcontext変数を利用して当日の差分だけ取り入れます。
しかし、DataformCreateCompilationResultOperatorではtemplate_fieldsが実装されていないため、直接{{ data_interval_end }}のようなjinjaテンプレートを渡すことはできません。

TaskFlowDataformCreateCompilationResultOperatorをラッピングすれば前述の問題を解決できますdata_interval_endcontextから取得します。ポイントとしてはDataformCreateCompilationResultOperatorを返す際にexecute()を呼び出す必要があります。

from airflow.decorators import task

@task()
def create_compilation_result(**context):
    execute_date = (
        context["data_interval_end"].in_timezone("Asia/Tokyo").strftime("%Y-%m-%d")
    )
    return DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": {
                "vars": {
                    "execute_date": execute_date,
                    "bq_suffix": Variable.get("bq_suffix"),
                }
            },
        },
    ).execute(context=context)

最終的なDAGは以下のようになります。

from datetime import datetime

from airflow import models
from airflow.decorators import task
from airflow.models import Variable
from airflow.providers.google.cloud.operators.dataform import (
    DataformCreateCompilationResultOperator,
    DataformCreateWorkflowInvocationOperator,
)

DAG_ID = "dataform_demo"
PROJECT_ID = "project-stg"
REPOSITORY_ID = "dataform-demo"
REGION = "asia-northeast1"
GIT_COMMITISH = "main"

@task()
def create_compilation_result(**context):
    execute_date = (
        context["data_interval_end"].in_timezone("Asia/Tokyo").strftime("%Y-%m-%d")
    )
    return DataformCreateCompilationResultOperator(
        task_id="create_compilation_result",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        compilation_result={
            "git_commitish": GIT_COMMITISH,
            "code_compilation_config": {
                "vars": {
                    "execute_date": execute_date,
                    "bq_suffix": Variable.get("bq_suffix"),
                }
            },
        },
    ).execute(context=context)

with models.DAG(
    DAG_ID,
    schedule_interval="@once",
    start_date=datetime(2022, 1, 1),
    catchup=False,
    tags=["dataform"],
) as dag:

    create_workflow_invocation = DataformCreateWorkflowInvocationOperator(
        task_id="create_workflow_invocation",
        project_id=PROJECT_ID,
        region=REGION,
        repository_id=REPOSITORY_ID,
        workflow_invocation={
            "compilation_result": "{{ task_instance.xcom_pull('create_compilation_result')['name'] }}"
        },
    )

    create_compilation_result() >> create_workflow_invocation

以上、AirflowからDataformにdata_interval_endなどのcontext変数を渡す方法でした。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?