概要
- Google Cloud Composer(Airflow)を使って、スポットインスタンス的にDataprocを使うdagを作ります。
- メリット
- Dataprocの起動が3分以内のため、利用する時に立ち上げて、終わったら消すという利用が可能です。
前提
- Google Cloud Composer(Airflow)の検証バージョンは「composer-1.7.3-airflow-1.10.2」です。
- Dataprocは消すこと前提なので、外部テーブルとしてGCSを利用します。
- Dataprocは消すこと前提なので、Hive metastoreによりテーブル情報を保全します。
- Hive metastoreとしてCloudSQLを利用します。
- IAMの設定
-
Google Cloud Composer用のサービスアカウントに各種roleを与える必要があります。
- 「Composer ワーカー」
- 「Dataproc編集者」
- 「サービス アカウント ユーザー」
-
Dataprocのサービスアカウントにも各種roleを与える必要があります。
- 「Cloud SQL 編集者」(Hive metastore用)
- 「Dataproc編集者」
- 「サービス アカウント ユーザー」
- 「ストレージ オブジェクト管理者」
-
Google Cloud Composer用のサービスアカウントに各種roleを与える必要があります。
設定内容
Dataprocを起動
- 事前にHive metastore用のCloud SQLを作成して、情報を記録します。
- 事前にDataproc用のサービスアカウントの作成が必要です。
- Hive metastoreは、gs://{$CLUSTER_WAREHOUSE}/datasetsに置くものとします。
def create_dataproc_task():
return DataprocClusterCreateOperator(
task_id = 'create_dataproc',
cluster_name = f'{$CLUSTER_NAME}',
project_id = f'{$PROJECT_ID}',
region = f'{$CLUSTER_REGION}',
zone = f'{$CLUSTER_ZONE}',
service_account = f'{$DATAPROC_SERVICE_ACCOUNT}',
master_machine_type = f'{$MASTER_MACHINE_TYPE}',
master_disk_size = f'{$MASTER_DISK_SIZE}',
num_workers = f'{$NUM_WORKERS}',
worker_machine_type = f'{$WORKER_MACHINE_TYPE}',
worker_disk_size = f'{$WORKER_DISK_SIZE}',
image_version = '1.4-debian9',
service_account_scopes = ['https://www.googleapis.com/auth/cloud-platform'],
init_actions_uris = ['gs://dataproc-initialization-actions/cloud-sql-proxy/cloud-sql-proxy.sh'],
metadata = {
'hive-metastore-instance': f'{$PROJECT_ID}:{$CLOUD_SQL_REGION}:{$CLOUD_SQL_NAME}'
},
properties = {
'hive:hive.metastore.warehouse.dir': f'gs://{$CLUSTER_WAREHOUSE}/datasets'
},
)
Dataprocを削除
def delete_dataproc_task():
return DataprocClusterDeleteOperator(
task_id = 'delete_dataproc',
cluster_name = f'{$CLUSTER_NAME}',
project_id = f'{PROJECT_ID}',
region = f'{$CLUSTER_REGION}',
)
Dataprocの演算をするdag内の記述
- start_taskとfinish_taskを置くことで、Dataprocで処理したい複数のtaskを記述できます。
prepare_task = DummyOperator(task_id='prepare')
start_task = DummyOperator(task_id='start')
finish_task = DummyOperator(task_id='finish')
cleanup_task = DummyOperator(task_id='cleanup')
prepare_task >> create_dataproc_task() >> start_task
for task in tasks:
start_task >> [Dataprocで処理したいtask群] >> finish_task
finish_task >> delete_dataproc_task() >> cleanup_task
- 実際のAirflow UI上の表示