Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
1
Help us understand the problem. What is going on with this article?
@tomo_will

[Tips]Google Cloud Composer(Airflow)を使ってDataprocを利用したい時にだけ使う。

概要

  • Google Cloud Composer(Airflow)を使って、スポットインスタンス的にDataprocを使うdagを作ります。
  • メリット
    • Dataprocの起動が3分以内のため、利用する時に立ち上げて、終わったら消すという利用が可能です。

前提

  • Google Cloud Composer(Airflow)の検証バージョンは「composer-1.7.3-airflow-1.10.2」です。
  • Dataprocは消すこと前提なので、外部テーブルとしてGCSを利用します。
  • Dataprocは消すこと前提なので、Hive metastoreによりテーブル情報を保全します。
    • Hive metastoreとしてCloudSQLを利用します。
  • IAMの設定
    • Google Cloud Composer用のサービスアカウントに各種roleを与える必要があります。
      • 「Composer ワーカー」
      • 「Dataproc編集者」
      • 「サービス アカウント ユーザー」
    • Dataprocのサービスアカウントにも各種roleを与える必要があります。
      • 「Cloud SQL 編集者」(Hive metastore用)
      • 「Dataproc編集者」
      • 「サービス アカウント ユーザー」
      • 「ストレージ オブジェクト管理者」

設定内容

Dataprocを起動
  • 事前にHive metastore用のCloud SQLを作成して、情報を記録します。
  • 事前にDataproc用のサービスアカウントの作成が必要です。
  • Hive metastoreは、gs://{$CLUSTER_WAREHOUSE}/datasetsに置くものとします。
def create_dataproc_task():
    return DataprocClusterCreateOperator(
        task_id = 'create_dataproc',
        cluster_name = f'{$CLUSTER_NAME}',
        project_id = f'{$PROJECT_ID}',
        region = f'{$CLUSTER_REGION}',
        zone = f'{$CLUSTER_ZONE}',
        service_account = f'{$DATAPROC_SERVICE_ACCOUNT}',
        master_machine_type = f'{$MASTER_MACHINE_TYPE}',
        master_disk_size = f'{$MASTER_DISK_SIZE}',
        num_workers = f'{$NUM_WORKERS}',
        worker_machine_type = f'{$WORKER_MACHINE_TYPE}',
        worker_disk_size = f'{$WORKER_DISK_SIZE}',
        image_version = '1.4-debian9',
        service_account_scopes = ['https://www.googleapis.com/auth/cloud-platform'],
        init_actions_uris = ['gs://dataproc-initialization-actions/cloud-sql-proxy/cloud-sql-proxy.sh'],
        metadata = {
            'hive-metastore-instance': f'{$PROJECT_ID}:{$CLOUD_SQL_REGION}:{$CLOUD_SQL_NAME}'
        },
        properties = {
            'hive:hive.metastore.warehouse.dir': f'gs://{$CLUSTER_WAREHOUSE}/datasets'
        },
    )
Dataprocを削除
def delete_dataproc_task():
    return DataprocClusterDeleteOperator(
        task_id = 'delete_dataproc',
        cluster_name = f'{$CLUSTER_NAME}',
        project_id = f'{PROJECT_ID}',
        region = f'{$CLUSTER_REGION}',
    )
Dataprocの演算をするdag内の記述
  • start_taskとfinish_taskを置くことで、Dataprocで処理したい複数のtaskを記述できます。
prepare_task = DummyOperator(task_id='prepare')
start_task = DummyOperator(task_id='start')
finish_task = DummyOperator(task_id='finish')
cleanup_task = DummyOperator(task_id='cleanup')

prepare_task >> create_dataproc_task() >> start_task
for task in tasks:
   start_task >> [Dataprocで処理したいtask群] >> finish_task
finish_task >> delete_dataproc_task() >> cleanup_task

  • 実際のAirflow UI上の表示

Airflow_-_DAGs.png

1
Help us understand the problem. What is going on with this article?
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
tomo_will
DataEngineerのBI/DWH領域を中心に、データ分析基盤やテーブル/ダッシュボード作成をしています。また、Data Analystとして要件を整理した上で、データの可視化、統計・機械学習もやります。 Python/SQL/GCP(BigQuery/Cloud Composer)/Tableau/Rが基本スキル。 勉強会参加レポート系は、hatenablogに書きます。

Comments

No comments
Sign up for free and join this conversation.
Sign Up
If you already have a Qiita account Login
1
Help us understand the problem. What is going on with this article?