0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

[Tips]Google Cloud Composer(Airflow)を使ってDataprocを利用したい時にだけ使う。

Last updated at Posted at 2020-02-24

概要

  • Google Cloud Composer(Airflow)を使って、スポットインスタンス的にDataprocを使うdagを作ります。
  • メリット
    • Dataprocの起動が3分以内のため、利用する時に立ち上げて、終わったら消すという利用が可能です。

前提

  • Google Cloud Composer(Airflow)の検証バージョンは「composer-1.7.3-airflow-1.10.2」です。
  • Dataprocは消すこと前提なので、外部テーブルとしてGCSを利用します。
  • Dataprocは消すこと前提なので、Hive metastoreによりテーブル情報を保全します。
    • Hive metastoreとしてCloudSQLを利用します。
  • IAMの設定
    • Google Cloud Composer用のサービスアカウントに各種roleを与える必要があります。
      • 「Composer ワーカー」
      • 「Dataproc編集者」
      • 「サービス アカウント ユーザー」
    • Dataprocのサービスアカウントにも各種roleを与える必要があります。
      • 「Cloud SQL 編集者」(Hive metastore用)
      • 「Dataproc編集者」
      • 「サービス アカウント ユーザー」
      • 「ストレージ オブジェクト管理者」

設定内容

Dataprocを起動
  • 事前にHive metastore用のCloud SQLを作成して、情報を記録します。
  • 事前にDataproc用のサービスアカウントの作成が必要です。
  • Hive metastoreは、gs://{$CLUSTER_WAREHOUSE}/datasetsに置くものとします。
def create_dataproc_task():
    return DataprocClusterCreateOperator(
        task_id = 'create_dataproc',
        cluster_name = f'{$CLUSTER_NAME}',
        project_id = f'{$PROJECT_ID}',
        region = f'{$CLUSTER_REGION}',
        zone = f'{$CLUSTER_ZONE}',
        service_account = f'{$DATAPROC_SERVICE_ACCOUNT}',
        master_machine_type = f'{$MASTER_MACHINE_TYPE}',
        master_disk_size = f'{$MASTER_DISK_SIZE}',
        num_workers = f'{$NUM_WORKERS}',
        worker_machine_type = f'{$WORKER_MACHINE_TYPE}',
        worker_disk_size = f'{$WORKER_DISK_SIZE}',
        image_version = '1.4-debian9',
        service_account_scopes = ['https://www.googleapis.com/auth/cloud-platform'],
        init_actions_uris = ['gs://dataproc-initialization-actions/cloud-sql-proxy/cloud-sql-proxy.sh'],
        metadata = {
            'hive-metastore-instance': f'{$PROJECT_ID}:{$CLOUD_SQL_REGION}:{$CLOUD_SQL_NAME}'
        },
        properties = {
            'hive:hive.metastore.warehouse.dir': f'gs://{$CLUSTER_WAREHOUSE}/datasets'
        },
    )
Dataprocを削除
def delete_dataproc_task():
    return DataprocClusterDeleteOperator(
        task_id = 'delete_dataproc',
        cluster_name = f'{$CLUSTER_NAME}',
        project_id = f'{PROJECT_ID}',
        region = f'{$CLUSTER_REGION}',
    )
Dataprocの演算をするdag内の記述
  • start_taskとfinish_taskを置くことで、Dataprocで処理したい複数のtaskを記述できます。
prepare_task = DummyOperator(task_id='prepare')
start_task = DummyOperator(task_id='start')
finish_task = DummyOperator(task_id='finish')
cleanup_task = DummyOperator(task_id='cleanup')

prepare_task >> create_dataproc_task() >> start_task
for task in tasks:
   start_task >> [Dataprocで処理したいtask群] >> finish_task
finish_task >> delete_dataproc_task() >> cleanup_task

  • 実際のAirflow UI上の表示

Airflow_-_DAGs.png

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?