Help us understand the problem. What is going on with this article?

Airflowを使ってDataprocを利用したい時にだけ使う。

概要

  • Airflowを使って、スポットインスタンス的にDataprocを使うdagを作ります。
  • メリット
    • Dataprocの起動が3分以内のため、利用する時に立ち上げて、終わったら消すという利用が可能です。

前提

  • GoogleCloudComposer(Airflow)の検証バージョンは「composer-1.7.3-airflow-1.10.2」です。
  • Dataprocは消すこと前提なので、外部テーブルとしてGCSを利用します。
  • Dataprocは消すこと前提なので、Hive metastoreによりテーブル情報を保全します。
    • Hive metastoreとしてCloudSQLを利用します。
  • IAMの設定
    • Google Cloud Composer用のサービスアカウントに各種roleを与える必要があります。
      • 「Composer ワーカー」
      • 「Dataproc編集者」
      • 「サービス アカウント ユーザー」
    • Dataprocのサービスアカウントにも各種roleを与える必要があります。
      • 「Cloud SQL 編集者」(Hive metastore用)
      • 「Dataproc編集者」
      • 「サービス アカウント ユーザー」
      • 「ストレージ オブジェクト管理者」

設定内容

Dataprocを起動
  • 事前にHive metastore用のCloud SQLを作成して、情報を記録します。
  • 事前にDataproc用のサービスアカウントの作成が必要です。
  • Hive metastoreは、gs://{$CLUSTER_WAREHOUSE}/datasetsに置くものとします。
def create_dataproc_task():
    return DataprocClusterCreateOperator(
        task_id = 'create_dataproc',
        cluster_name = f'{$CLUSTER_NAME}',
        project_id = f'{$PROJECT_ID}',
        region = f'{$CLUSTER_REGION}',
        zone = f'{$CLUSTER_ZONE}',
        service_account = f'{$DATAPROC_SERVICE_ACCOUNT}',
        master_machine_type = f'{$MASTER_MACHINE_TYPE}',
        master_disk_size = f'{$MASTER_DISK_SIZE}',
        num_workers = f'{$NUM_WORKERS}',
        worker_machine_type = f'{$WORKER_MACHINE_TYPE}',
        worker_disk_size = f'{$WORKER_DISK_SIZE}',
        image_version = '1.4-debian9',
        service_account_scopes = ['https://www.googleapis.com/auth/cloud-platform'],
        init_actions_uris = ['gs://dataproc-initialization-actions/cloud-sql-proxy/cloud-sql-proxy.sh'],
        metadata = {
            'hive-metastore-instance': f'{$PROJECT_ID}:{$CLOUD_SQL_REGION}:{$CLOUD_SQL_NAME}'
        },
        properties = {
            'hive:hive.metastore.warehouse.dir': f'gs://{$CLUSTER_WAREHOUSE}/datasets'
        },
    )
Dataprocを削除
def delete_dataproc_task():
    return DataprocClusterDeleteOperator(
        task_id = 'delete_dataproc',
        cluster_name = f'{$CLUSTER_NAME}',
        project_id = f'{PROJECT_ID}',
        region = f'{$CLUSTER_REGION}',
    )
Dataprocの演算をするdag内の記述
  • start_taskとfinish_taskを置くことで、Dataprocで処理したい複数のtaskを記述できます。
prepare_task = DummyOperator(task_id='prepare')
start_task = DummyOperator(task_id='start')
finish_task = DummyOperator(task_id='finish')
cleanup_task = DummyOperator(task_id='cleanup')

prepare_task >> create_dataproc_task() >> start_task
for task in tasks:
   start_task >> [Dataprocで処理したいtask] >> finish_task
finish_task >> delete_dataproc_task() >> cleanup_task

  • 実際のAirflow UI上の表示

Airflow_-_DAGs.png

tomo_will
DataEngineerとして、データ分析基盤や分析レポート自動化システム構築など行い、DM/ダッシュボード作成を行なっています。また、Data Analystとして要件を整理した上で、データの可視化、統計・機械学習もやります。 Python/SQL/GCP(BigQuery/Airflow)/Tableau/Rが基本スキル。 レポート系は、hatenablogに書くことにしました。
https://tomo-will.hatenablog.com/
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした