背景
BigQueryにはDECLARE + SETを使って変数を定義できる機能がある。
標準 SQL のスクリプト
これを使ったSQLの結果でテーブルを作成するBigqueryOperatorのタスクを作成したい。
例えば以下のようなもの
failing_dag.py
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import datetime
default_args = {"owner": "airflow", "start_date": datetime(2020, 12, 7, 0, 0, 0)}
dag = DAG("z_declare_test", default_args=default_args, schedule_interval=None)
SQL = '''
DECLARE max_date DATE
SET max_date = (SELECT MAX(exam_date) FROM `some_table1`)
SELECT * FROM `warehouse.gmap__bf_old_results` WHERE test_date > max_date
'''
BigQueryOperator(task_id='bq_op',
use_legacy_sql=False,
sql=SQL,
destination_dataset_table='some_table2',
dag=dag
)
このタスクを実行するとエラーとなる。
Syntax error: Expected end of input but got keyword SET
そもそも、DECLARE + SETを使うようなSQLの実行結果からテーブルに作成するような機能はBigQueryではまだサポートされていない模様。
Cannot set destination table with BigQuery Python API
確かに、手元のBigQueryコンソールでもDECLARE + SETを使ったSQLの実行結果からテーブル作成はできない。
UDFを使って回避
同様のSQLはUDFを使えば作成できるのでAirflowを使う場合は、UDFに書き換えれば問題なく動いた。
imp_by_udf.py
CREATE TEMP FUNCTION maxdate() as ((SELECT max(exam_date) from `some_table1`));
SELECT * FROM `some_table2` WHERE test_date > maxdate()