2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

S3からBigQuery への転送をBigqueryTransferService + Cloud Composer(Airflow2.0)でやってみた

2
Last updated at Posted at 2021-06-29

背景

  • s3にあるJSONファイルを、BigQueryにいい感じにロードしたい
  • 外部データをBigQueryにマネージドでロードしてくれる BigqueryTranserServiceという便利サービスがあるのでそれを使いたい
  • CloudComposer(ver2.0)でジョブを管理したい

方針

実装

from airflow.models import DAG
from airflow.utils.dates import days_ago
from airflow.providers.google.cloud.operators.bigquery_dts import BigQueryCreateDataTransferOperator, BigQueryDataTransferServiceStartTransferRunsOperator

schedule_options = {"disable_auto_scheduling": True}
GCP_DTS_BQ_DATASET = 'dataset_name'
GCP_DTS_BQ_TABLE = 'table_name'
BUCKET_URI = 's3://bucket/path/06/25/*'
PROJECT = 'YOUR_GCP_PROJECT'
KEY = "YOUR_AWS_KEY"
SECRET = "YOUR_AWS_SECRET"

default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'project_id': PROJECT,
    'retries': 2
}

PARAMS = {
    "max_bad_records": "0",
    "data_path": BUCKET_URI,
    "destination_table_name_template": GCP_DTS_BQ_TABLE,
    "access_key_id": KEY,
    "secret_access_key": SECRET,
    "file_format": "JSON",
    "ignore_unkown_values": True,
}

TRANSFER_CONFIG = {
    "destination_dataset_id": GCP_DTS_BQ_DATASET,
    "display_name": "s3_to_bq_test",
    "data_source_id": "amazon_s3",
    "schedule_options": schedule_options,
    "params": PARAMS,
}

with DAG(
    dag_id='s3_to_bq_dag',
    default_args=default_args,
    schedule_interval=None,
    catchup=False
) as dag:
    
    gcp_bigquery_create_transfer = BigQueryCreateDataTransferOperator(
        transfer_config=TRANSFER_CONFIG,
        project_id=PROJECT,
        task_id="create_s3_to_bq",
    )

    transfer_config_id = (
        "{{ task_instance.xcom_pull('create_s3_to_bq', key='transfer_config_id') }}"
    )

    gcp_bigquery_start_transfer = BigQueryDataTransferServiceStartTransferRunsOperator(
        task_id="execute_s3_to_bq",
        transfer_config_id=transfer_config_id,
        requested_run_time={"seconds": int(time.time() + 60)},
    )

    gcp_bigquery_create_transfer >> gcp_bigquery_start_transfe

注意点

  • 権限でちょいちょい引っかかるの

  • BigQueryCreateDataTransferOperatorではジョブを作成するとタスク完了となるので、後続処理がある場合はジョブの完了を感知する必要がある. PubSubでできるらしいので今度触ってみる (Pub/Sub通知)

  • BQTransferServiceでは write_append のみをサポートしているので、write_truncateなジョブを行いたい場合は諦めて s3 -> gcs -> bqの流れにする

2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?