サンプル
tableA のクエリ結果を tableB に入れるサンプル
sample.py
import airflow
from datetime import datetime, timedelta, timezone
from airflow import DAG
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
jst = timezone(timedelta(hours=+9))
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"email": ["hoge@example.com"],
"email_on_failure": True,
"email_on_retry": False,
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
dag = DAG("bq_sample_dag", default_args=default_args, catchup=False, schedule_interval=timedelta(1))
BigQueryOperator(
task_id="bq_sample_task",
sql="SELECT column1, column2 FROM pj_name.dataset_name.original_table_name",
use_legacy_sql=False,
destination_dataset_table="pj_name.dataset_name.new_table_name",
write_disposition="WRITE_TRUNCATE",
allow_large_results=True,
dag=dag
)
Cloud ComposerでBigQuery使う場合は、airflow.contrib.operators.bigquery_operator
のオペレーターを使います。
クエリ実行などのシンプルな処理の場合は、 BigQueryOperator
クラスを使います。
parameterなどはAirflow1.10.2であれば このdoc を参照。
[※注意] 現在のCloud ComposerのイメージのAirflow versionは1.10.3までなので、 airflow.gcp.operators
などはデフォルトでは使えません。