gcs.to_bq.GoogleCloudStorageToBigQueryOperatorを使うと簡単。
import airflow
from airflow import DAG
from airflow.contrib.operators import gcs_to_bq
from datetime import timedelta
import pendulum
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": airflow.utils.dates.days_ago(1),
"execution_timeout": timedelta(minutes=30),
"retries": 3,
"retry_delay": timedelta(minutes=5)
}
dag = DAG(
"test_dag",
default_args=default_args,
catchup=True,
schedule_interval="0 0 * * *"
)
target_date = pendulum.now('Asia/Tokyo')
gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
task_id='test_task',
bucket='your-bucket',
destination_project_dataset_table='table_name${}'.format(
target_date.strftime('%Y%m%d')
),
source_objects=['some_dir/{}/*.gz'.format(target_date.to_date_string())],
source_format='NEWLINE_DELIMITED_JSON',
compression='GZIP',
write_disposition='WRITE_TRUNCATE',
dag=dag
)