背景
- Airflowで.sqlファイルにパラメータを使ってSQLを動的に変更させたい
- BigqueryOperatorのquery_params引数を使った方法や、DAGのparams引数を使ったやり方がある
- BigqueryOperatorのquery_params引数を使ったやり方(参照)がやや冗長に感じたので、今回はDAGのparams引数を使ったやり方を紹介する
実装
参考
dag.py
from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
default_args = {
'owner': 'airflow',
'start_date': days_ago(1),
'project_id': 'some_project',
}
with DAG(
dag_id='dag',
default_args=default_args,
schedule_interval=None,
catchup=False,
params={'project_id': 'some_project'} # これ
) as dag:
BigQueryOperator(
task_id='task',
sql='query.sql',
destination_dataset_table="hoge.fuga.hige",
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
use_legacy_sql=False,
)
query.sql
SELECT * FROM `{{ params.project_id }}.fuga.hige`
BigqueryOperatorのquery_paramsの方法参照