GCP Cloud Composerを使いこなすまでの道のりからの続き
前回はGCPのドキュメンを参考にAirflow 及び Cloud Composer の概要をまとめましたが今回は実際にCloud Composer サービスを使ったサンプルアプリケーションを実装してみます。
システム構成
DAGフローチャート
今回はstartとfrom_exsample-composer-gcs_import_bigqueryとendの3つのタスクを作ります。
タスクリスト
| タスク ID | 説明 | 
|---|---|
| start | タスク開始を示すダミータスク | 
| from_exsample-composer-gcs_import_bigquery | GCSからBigQueryへデータを流すタスク | 
| end | タスク終了を示すダミータスク | 
システム構成図
サンプルアプリケーション
サンプルアプリケーションはGithubリポジトリに上がっています。
準備
サービスアカウントにIAMロール追加
| ユーザー名 | ロール | 
|---|---|
| minarai-cloud-composer | roles/composer.worker | 
| minarai-cloud-composer | ストレージ オブジェクト閲覧者 | 
| minarai-composer-admin | roles/composer.admin | 
| minarai-composer-admin | BigQuery 管理者 | 
BigQueryテーブルを作成
- 
minarai-composer-adminアカウントをアクティベートして下記を実行します
bq --location asia-northeast1 mk \
     --dataset \
     --description "exsample composer GCS to BQ" \
     ${PROJECT_ID}:exsample_composer_dataset
bq --location asia-northeast1  mk -t \
      --schema ./dags/schemas/exsample_composer_table.json \
      --time_partitioning_type DAY \
      --description "example composer GCS to BQ" \
      ${PROJECT_ID}:exsample_composer_dataset.exsample_composer_table
Cloud Storageバケットを作成する
- BigQuery Schemaを置くバケットとBiqQueryへ流し込むデータを置くバケットを作成します
Cloud Composer環境構築または環境更新
- Cloud Composer環境構築がまだの場合は下記コマンドのupdateをcreateに読み替えてください
gcloud composer environments update ${CLOUD_ENV} \
      --location asia-northeast1 \
      --update-env-variables=PROJECT_ID=${PROJECT_ID},BQ_LOCATION=${LOCATION}
DAGを登録(アップロード)
Cloud Composer環境が作成されるとGCSにCloud Composer dagsバケットが作成されます。
GCSバケットへDAGをアップロードすることでAirflow dagsディレクトリへマウントされます。
gcloudコマンドからアップロードするには下記コマンドを実行します。
gcloud composer environments storage dags import \
        --environment ${CLOUD_ENV} \
        --location ${LOCATION} \
        --source ./dags/gcs_to_bq.py