GCP Cloud Composerを使いこなすまでの道のりからの続き
前回はGCPのドキュメンを参考にAirflow
及び Cloud Composer
の概要をまとめましたが今回は実際にCloud Composer
サービスを使ったサンプルアプリケーションを実装してみます。
システム構成
DAGフローチャート
今回はstartとfrom_exsample-composer-gcs_import_bigqueryとendの3つのタスクを作ります。
タスクリスト
タスク ID | 説明 |
---|---|
start | タスク開始を示すダミータスク |
from_exsample-composer-gcs_import_bigquery | GCSからBigQueryへデータを流すタスク |
end | タスク終了を示すダミータスク |
システム構成図
サンプルアプリケーション
サンプルアプリケーションはGithubリポジトリに上がっています。
準備
サービスアカウントにIAMロール追加
ユーザー名 | ロール |
---|---|
minarai-cloud-composer | roles/composer.worker |
minarai-cloud-composer | ストレージ オブジェクト閲覧者 |
minarai-composer-admin | roles/composer.admin |
minarai-composer-admin | BigQuery 管理者 |
BigQueryテーブルを作成
-
minarai-composer-admin
アカウントをアクティベートして下記を実行します
bq --location asia-northeast1 mk \
--dataset \
--description "exsample composer GCS to BQ" \
${PROJECT_ID}:exsample_composer_dataset
bq --location asia-northeast1 mk -t \
--schema ./dags/schemas/exsample_composer_table.json \
--time_partitioning_type DAY \
--description "example composer GCS to BQ" \
${PROJECT_ID}:exsample_composer_dataset.exsample_composer_table
Cloud Storageバケットを作成する
- BigQuery Schemaを置くバケットとBiqQueryへ流し込むデータを置くバケットを作成します
Cloud Composer環境構築または環境更新
- Cloud Composer環境構築がまだの場合は下記コマンドの
update
をcreate
に読み替えてください
gcloud composer environments update ${CLOUD_ENV} \
--location asia-northeast1 \
--update-env-variables=PROJECT_ID=${PROJECT_ID},BQ_LOCATION=${LOCATION}
DAGを登録(アップロード)
Cloud Composer環境が作成されるとGCSにCloud Composer dagsバケットが作成されます。
GCSバケットへDAGをアップロードすることでAirflow dagsディレクトリへマウントされます。
gcloudコマンドからアップロードするには下記コマンドを実行します。
gcloud composer environments storage dags import \
--environment ${CLOUD_ENV} \
--location ${LOCATION} \
--source ./dags/gcs_to_bq.py