32
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

GCP Cloud Composerを使いこなすまでの道のり

Last updated at Posted at 2020-05-07

Apahe Airflow tutorial編

Apache Airflowとは

有向非巡回グラフ(DAG)理論を応用したタスク管理システム
Apache Airflowでは一連の処理の名称をDAG名とし、処理単位がグラフのノードとなる。
各ノードにtask_idと呼ばれる一意となる名称をつける。

DAGの定義手順は次の2フェーズにわけることができる。
最初のフェーズでは一連の処理に名前をつけ、処理の内容を記述する
次のフェーズで処理群の関係(依存関係)を記述する

Apache Airflowのインストール

概要

とりあえずdocker-airflowのtutorialをやってみる

docker composeをインストール

docker composeが入ってなかったのでdocker composeをいれます

$ pip3 install docker-compose

docker-airflowリポジトリをclone

$ git clone https://github.com/puckel/docker-airflow.git

docker-airflowのdir構成を確認

$ cd docker-airflow
$ ls -la
drwxr-xr-x@ 14 minarai  staff    448  5  7 13:24 .
drwxr-xr-x@  3 minarai  staff     96  5  7 13:24 ..
-rw-r--r--   1 minarai  staff      5  5  7 13:04 .dockerignore
drwxr-xr-x@ 12 minarai  staff    384  5  7 13:04 .git
drwxr-xr-x@  3 minarai  staff     96  5  7 13:04 .github
-rw-r--r--   1 minarai  staff    475  5  7 13:04 .gitignore
-rw-r--r--   1 minarai  staff   2426  5  7 13:04 Dockerfile
-rw-r--r--   1 minarai  staff  11355  5  7 13:04 LICENSE
-rw-r--r--   1 minarai  staff   8975  5  7 13:04 README.md
drwxr-xr-x@  3 minarai  staff     96  5  7 13:04 config
drwxr-xr-x@  4 minarai  staff    128  5  7 13:24 dags
-rw-r--r--   1 minarai  staff   2851  5  7 13:21 docker-compose-CeleryExecutor.yml
-rw-r--r--   1 minarai  staff    956  5  7 13:04 docker-compose-LocalExecutor.yml
drwxr-xr-x@  3 minarai  staff     96  5  7 13:04 script
# dags dirの中を確認する
$ ls dags
__pycache__	ç

DAG定義

dags/tuto.pyの中身を確認
ここではbash_operatorを利用している
ドキュメント - airflow.operators
ドキュメント - schedule_interval
JinjaテンプレートをサポートしているのでJinjaテンプレートでの宣言が使える

dags/tuto.py
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta


default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 5, 5),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False, # Trueにするとtaskが失敗した際にメールが通知
    "email_on_retry": False, # Trueにするとtaskがリトライが発生した際にメールが通知
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    # 'queue': 'bash_queue',
    # 'pool': 'backfill',
    # 'priority_weight': 10,
    # 'end_date': datetime(2016, 1, 1),
}

# DAGを定義
# dag_id : dag id
# default_args : 実行タスクに渡されるデフォルト引数
# schedule_interval : start_date以降に実行されるタスクスケジュール
dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(days=1))

# DAGのタスク
# task_id : task id
# bash_command : bash command
# dag : dag インスタンスを指定
t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)
# DAGのタスク
t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)

templated_command = """
    {% for i in range(5) %}
        echo "{{ ds }}"
        echo "{{ macros.ds_add(ds, 7)}}"
        echo "{{ params.my_param }}"
    {% endfor %}
"""
# DAGのタスク
t3 = BashOperator(
    task_id="templated",
    bash_command=templated_command,
    params={"my_param": "Parameter I passed in"},
    dag=dag,
)

# ワークフロー t2タスクの上流にt1をセット
t2.set_upstream(t1)
# ワークフロー t3タスクの上流にt1をセット
t3.set_upstream(t1)
# ワークフローは下記のような記述もできる
# t1.set_downstream([t1, t3])
# t1 >> [t2, t3]
# [t2, t3] << t1

apache airflowを起動

起動したら管理画面(http://localhost:8080)へアクセス

$ docker-compose -f docker-compose-LocalExecutor.yml up -d
スクリーンショット 2020-05-07 15.39.21.png

docker composeをみるとわかるが、./dags以下をボリュームマウントしていて、./dagsに定義されたDAG IDがリスト表示されている。
表の主な項目としてインフォメーションマークのON/OFF切替

項目 意味
インフォマーク DAGスケジュール ON/OFF
DAG  DAGのワークフローを確認
Schedule DAG スケジュール
Recent Tasks 直近で実行されたタスクが確認
Last Run 最後に実行した日時
DAG Runs 直近のDAGを確認
Links playマーク:DAGを実行

よく使いそうなairflow operators

operator 役割
python_operator 引数python_callableにpython methodを指定して実行 *1
bash_operator 引数bash_commandにコマンドを渡してコマンドライン プログラムを実行 *2
bigquery_operator 引数bqlにbqlを指定して実行
email_operator Emailを送信する

*1 : GCP Cloud Composerからはgoogle-cloud-bigquery, google-cloud-dataflow, google-cloud-storage, pandas, pandas-gbq, tensorflowが利用可
*2 : GCP Cloud Composerからはbqコマンド, gcloudコマンド, gsutilコマンド, kubectlコマンドも実行できる

apache airflowドキュメント - operaters
Cloud Composer

Cloud Composer編

概要

Cloud Composer は、オープンソースのオーケストレーション ツールとして広く使用されている Apache Airflow で構築されている。

Cloud Composer の認証

サービス アカウント

ローカルの開発でも本番環境のアプリケーションでも、ほとんどの場合、サービス アカウントの使用をおすすめします。
本番環境のアプリケーションで認証を設定する方法は「サーバー間での本番環境アプリケーションの認証の設定」が詳しい。

認証情報の自動検

Google Cloud クライアント ライブラリでは、アプリケーションのデフォルト認証情報(ADC)と呼ばれる方式を使用してアプリケーションの認証情報を検出している。クライアント ライブラリが使用される際は以下の順番で認証情報がチェックされます。

  1. 環境変数 GOOGLE_APPLICATION_CREDENTIALSを確認
  2. 環境変数が設定されていない場合、ADC ではサービスで実行されているアプリケーションに応じて、Compute EngineGoogle Kubernetes EngineCloud RunApp EngineCloud Functions によって提供されているデフォルトのサービス アカウントを使用する。

ユーザーアカウント

プリケーションがエンドユーザーの代わりにリソースにアクセスする必要がある場合、そのアプリケーションでユーザーを直接認証できるが、非推奨です。

アクセス制御

ロールを使用して、リソースに対する認証された ID のアクセス権を制限します。
本番環境アプリケーションを作成するときは、該当する Google Cloud APIs機能リソースを操作するために必要な権限のみを ID に付与します。

Cloud Composer のアクセス制御

Cloud Composer API は、アクセス制御に **Cloud Identity and Access Management(Cloud IAM)**を使用する。

基本ロール

プロジェクトの基本ロールはCloud Composerの基本機能の制御に関する権限を提供する。

ロール 役職 権限 説明
roles/owner プロジェクトオーナー composer.operations.list Cloud Composer リソースを完全に制御できるようにするための基本ロール。
composer.operations.get
composer.operations.delete
composer.environments.list
composer.environments.get
composer.environments.delete
composer.environments.update
composer.environments.create
roles/writer プロジェクト編集者 composer.operations.list Cloud Composer リソースを完全に制御できるようにするための基本ロール。
composer.operations.get
composer.operations.delete
composer.environments.list
composer.environments.get
composer.environments.delete
composer.environments.update
composer.environments.create
roles/reader プロジェクト閲覧者 composer.operations.list ユーザーが Cloud Composer リソースを一覧表示、取得できるようにするための基本ロール。
composer.operations.get
composer.environments.list
composer.environments.get
ロール

下記のロールはプロジェクトリソースに対して下記の権限を有する

ロール 役職 権限 説明
roles/composer.admin Composer 管理者 composer.* Cloud Composer リソースのすべてを管理
serviceusage.quotas.get
serviceusage.services.get
serviceusage.services.list
roles/composer.environmentAndStorageObjectAdmin 環境とストレージ オブジェクトの管理者 composer.* Cloud Composer のリソースとすべてのプロジェクト バケットのオブジェクトを管理できる権限を付与します。
resourcemanager.projects.get
resourcemanager.projects.list
serviceusage.quotas.get
serviceusage.services.get
serviceusage.services.list
storage.objects.*
roles/composer.environmentAndStorageObjectViewer 環境ユーザーとストレージ オブジェクトの閲覧者 composer.environments.get Cloud Composer 環境とオペレーションを一覧表示して取得するために必要な権限を付与します。 すべてのプロジェクト バケットのオブジェクトに対して読み取り専用アクセスを許可します。
composer.environments.list
composer.imageversions.*
composer.operations.get
composer.operations.list
resourcemanager.projects.get
resourcemanager.projects.list
serviceusage.quotas.get
serviceusage.services.get
serviceusage.services.list
storage.objects.get
storage.objects.list
roles/composer.user Composer ユーザー composer.environments.get Cloud Composer 環境とオペレーションの一覧表示して取得するために必要な権限を付与します。
composer.environments.list
composer.imageversions.*
composer.operations.get
composer.operations.list
serviceusage.quotas.get
serviceusage.services.get
serviceusage.services.list
roles/composer.worker Composer ワーカー 詳細は割愛 Cloud Composer 環境 VM の実行に必要な権限を付与します(サービス アカウント向け)。
タスクと権限

一般的なタスクと権限の関係についてまとめてあるので各自のユースケースによって必要な権限を有するロールをサービスアカウントに設定するとよい。

タスク 権限/ロール
IAP で保護された Airflow ウェブ インターフェースにアクセスする composer.environments.get
gcloud コマンドライン ツールを使って Airflow CLI を実行する composer.environments.get
container.clusters.getCredentials
roles/container.developer
Cloud Console の [環境] ページを表示する composer.environments.list
servicemanagement.projectSettings.get
Google Cloud のオペレーション スイートのログと指標を表示する roles/logging.viewer
roles/monitoring.viewer
環境の作成 composer.environments.create
環境の更新と削除(環境変数の設定や Python パッケージのインストール、更新を含む) environments.delete
environments.update
ファイルを DAG と Plugins フォルダにアップロードし、ログフォルダの Airflow ログにアクセスする バケットまたはプロジェクト レベルで割り当てられる storage.objectAdmin
DAG 宛先バケットを検索する composer.environments.get

サービスアカウント作成

まず初めに行いたいのがサービスアカウントの作成です。
サービスアカウントとはユーザーの代わりにシステムがGCP各サービスを利用するためのアカウントのこと。

アカウントの作成

今回はコンソール サービスアカウントから作成をします。

ユーザー名 ロール1
minarai-cloud-composer roles/composer.worker
minarai-composer-admin roles/composer.admin

minarai-composer-adminは環境の作成を行うためのサービスアカウントとして作成してます。

サービスアカウントの鍵を作成

サービスアカウント一覧にminarai-cloud-composer@****が表示されたら操作から**鍵を作成(json)**を選択すると鍵が生成されローカルPCにダウンロードされるので適切な場所に配置する

サービス実行アカウントを変更

作成したサービスアカウントをアクティブにする

  • 現在の設定一覧とアクティブなアカウントを確認する
% gcloud config configurations list
NAME     IS_ACTIVE  ACCOUNT                 PROJECT      COMPUTE_DEFAULT_ZONE  COMPUTE_DEFAULT_REGION
default  True       アカウント@gmail.com  プロジェクトID

# (省略化) ついでにCOMPUTE_DEFAULT_ZONEも追加した
gcloud config set compute/region asia-northeast1
gcloud config set compute/zone asia-northeast1-a

# (省略化) 改めて確認
% gcloud config configurations list               
NAME     IS_ACTIVE  ACCOUNT                 PROJECT      COMPUTE_DEFAULT_ZONE  COMPUTE_DEFAULT_REGION
default  True       アカウント@gmail.com  プロジェクトID  asia-northeast1-a     asia-northeast1

構文: gcloud config configurations create NAME FLAG

% gcloud config configurations create minarai-cloud-composer --activate

% gcloud config configurations list                                
NAME                IS_ACTIVE  ACCOUNT                 PROJECT      COMPUTE_DEFAULT_ZONE  COMPUTE_DEFAULT_REGION
default             False      アカウント@gmail.com  プロジェクトID  asia-northeast1-a     asia-northeast1
minarai-cloud-composer     True

構文: gcloud config set core/project プロジェクトID
構文: gcloud config set core/account サービスアカウント
構文: gcloud config set compute/region リージョン
構文: gcloud config set compute/zone ゾーン

% gcloud config set core/project プロジェクトID
% gcloud config set core/account サービスアカウント
% gcloud config set compute/region asia-northeast1
% gcloud config set compute/zone asia-northeast1-a

% gcloud config configurations list 
NAME                IS_ACTIVE  ACCOUNT                                                 PROJECT      COMPUTE_DEFAULT_ZONE  COMPUTE_DEFAULT_REGION
default             False      アカウント@gmail.com                                  プロジェクトID  asia-northeast1-a     asia-northeast1
minarai-cloud-composer  True   minarai-cloud-composer@アカウント.iam.gserviceaccount.com  プロジェクトID  asia-northeast1-a     asia-northeast1
  • サービスアカウントの認証
% gcloud auth activate-service-account --key-file ~/Documents/gcp/key/key.json

環境構築

Cloud Composer の有効化

Cloud Composerを利用するためにはCloud Composerサービスの有効かを行う必要がある。
Cloud Composer API

環境の作成

事前準備

Cloud Composer のドメインで制限された共有は、現在ベータ版です。ドメインで制限された共有ポリシーを有効にしている場合、Cloud Composer 環境を作成するときにベータ版 API を使用する必要があります。
ベータ版 API を使用して Cloud Composer 環境をデプロイする方法については、ベータ版機能のサポートを参照してください。

  • Cloud Composer 環境を作成するには、composer.environments.create 権限が必要
  • Cloud Composer 環境は デフォルトではCompute Engine のデフォルトのサービス アカウントとして実行される。Cloud Composer 環境でリソースにアクセスするには、少なくとも、composer.worker ロールによって提供される権限が必要。
  • 現在、VPC Service Controls では Cloud Composer はサポートされていません。Cloud Composer プロジェクトをサービス境界に含めてはなりません

環境の作成

1. コンソールから
環境作成のページ」から作成する

2. gcloudから
詳細については「新しい環境を作成する」を参照。

LOCATION: Compute Engine リージョン名

gcloud composer environments create ENVIRONMENT_NAME \
        --location LOCATION \
        OTHER_ARGUMENTS

環境変数の利用

環境を作成または更新するときに、Cloud Composer により Apache Airflow のスケジューラ、ワーカー、ウェブサーバー プロセスに提供される環境変数を追加できます。
たとえば、Cloud Composer では、メール通知に Apache Airflow SendGrid モジュールが使用されます。メール通知を構成するには、環境変数 SENDGRID_API_KEY と SENDGRID_MAIL_FROM を設定する必要があります。

要件

環境変数は、正規表現 [a-zA-Z_][a-zA-Z0-9_]* と一致する必要があります。
環境変数で Apache Airflow ソフトウェア構成のオーバーライドを指定しないでください。つまり、変数は $AIRFLOW__{SECTION}__{KEY} の形式や、正規表現 AIRFLOW__[A-Z0-9_]+__[A-Z0-9_]+) と一致するものを使用できません。

予約済みの環境変数

予約済の環境変数名は「予約済みの環境変数」の通りです。

  • 環境変数作成権限: composer.environments.create
  • 環境変数更新権限: composer.environments.update

1. コンソールから
環境作成のページ」から設定することができる

2. gcloudから
詳細については「環境変数の追加と更新」を参照のこと。

gcloud composer environments update ENVIRONMENT_NAME \
      --location LOCATION \
      --update-env-variables=KEY=VALUE,KEY=VALUE...

Airflow接続の管理

  1. Airflow接続を使用すると、Cloud Composer環境からGoogle Cloudプロジェクトのリソースにアクセスできる。Airflow接続は、ワークフローで使用されるシークレットと認証情報を保存することを推奨。
  2. Airflow接続を使用すると、Cloud Composer環境が他のAPI(Google Cloudプロジェクト、他のクラウドプロバイダー、サードパーティサービスなど)と通信するために必要な接続情報を保存できます。
  3. Airflow接続には、認証情報、ホスト名、追加のAPIパラメータなどの詳細を保存できる。Airflow接続を使用して、ワークフロータスクのシークレットと認証情報を保存することを推奨。

Airflow接続の管理」より抜粋。

Fernetキーと安全な接続

新しい環境を作成すると、Cloud Composerはその環境に固有の永続的なFernetキーを生成し、デフォルトで接続の追加を保護します。fernet_keyAirflow構成でを表示できます。

デフォルトの接続

デフォルトではGoogle Cloud Platformの次のAirflow接続を構成。

  • bigquery_default
  • google_cloud_default
  • google_cloud_datastore_default
  • google_cloud_storage_default

デフォルトの接続IDを使用して、DAGから接続

bigquery_conn_idを指定せずともbigquery_defaultのIDが利用して接続を試みる

task_default = bigquery_operator.BigQueryOperator(
    task_id='task_default_connection',
    bql='SELECT 1', use_legacy_sql=False)

# 接続IDを明示的に指定
task_explicit = bigquery_operator.BigQueryOperator(
    task_id='task_explicit_connection',
    bql='SELECT 1', use_legacy_sql=False,
    # Composer creates a 'google_cloud_default' connection by default.
    bigquery_conn_id='google_cloud_default')

別のプロジェクトのリソースにアクセスするケースは「別のプロジェクトのリソースにアクセスする」を参考にしたい。

新しいAirflow接続の作成

もし、デフォルトの接続が利用できない場合は新しいAirflow接続を作成することができる。

新しいAirflow接続を作成

gcloud composer environments run test-environment \
 --location us-central1 connections -- --add \
 --conn_id=my_gcp_connection --conn_type=google_cloud_platform \
 --conn_extra '{"extra\__google\_cloud\_platform\__project": "your-storage-project",
 "extra\__google\_cloud\_platform\__key_path": "/home/airflow/gcs/data/keyfile.json",
 "extra\__google\_cloud\_platform\__scope": "https://www.googleapis.com/auth/cloud-platform"}'

新しいAirflow接続を利用

task_custom = bigquery_operator.BigQueryOperator(
    task_id='task_custom_connection',
    bql='SELECT 1', use_legacy_sql=False,
    # Set a connection ID to use a connection that you have created.
    bigquery_conn_id='my_gcp_connection')

外部データベースへの接続の構成

Cloud Composerは、環境内にデフォルトのCloud SQLプロキシを提供し、アプリケーション、クライアント、または他のGoogle Cloudサービスから環境のCloud SQLデータベースへのアクセスをリモートで承認します。

Cloud ComposerをSQLデータベースやプライベートIP Cloud SQLインスタンスなどの外部データベースに接続 するには、新しい SQLプロキシポッド(yaml) を環境のGKEクラスタにデプロイする必要があります。

Cloud Composer でDAGの作成

DAGの定義はGCPでOperaterが実装されている以外は基本的にApache Airflowと同じ。
Cloud Composer - DAG(ワークフロー)の作成

sample.py
from __future__ import print_function

    import datetime

    from airflow import models
    from airflow.operators import bash_operator
    from airflow.operators import python_operator

    default_dag_args = {
        'start_date': datetime.datetime(2018, 1, 1),
    }

    with models.DAG(
            'composer_sample_simple_greeting',
            schedule_interval=datetime.timedelta(days=1),
            default_args=default_dag_args) as dag:
        def greeting():
            import logging
            logging.info('Hello World!')

        hello_python = python_operator.PythonOperator(
            task_id='hello',
            python_callable=greeting)

        # Likewise, the goodbye_bash task calls a Bash script.
        goodbye_bash = bash_operator.BashOperator(
            task_id='bye',
            bash_command='echo Goodbye.')

        hello_python >> goodbye_bash     

下記の例のようにBash Operatorではgcloudコマンドbqコマンドなどが利用できるし、Python Operatorではpip installによってgoogle-cloud-bigqueryなどが利用できる。

sample.py
from airflow.operators import bash_operator
        # Create BigQuery output dataset.
        make_bq_dataset = bash_operator.BashOperator(
            task_id='make_bq_dataset',
            # Executing 'bq' command requires Google Cloud SDK which comes
            # preinstalled in Cloud Composer.
            bash_command='bq ls {} || bq mk {}'.format(
                bq_dataset_name, bq_dataset_name))

Bigquery Operatorを使ってBigQueryを操作するタスクを作成することもできる

sample.py
from airflow.contrib.operators import bigquery_operator
        # Query recent StackOverflow questions.
        bq_recent_questions_query = bigquery_operator.BigQueryOperator(
            task_id='bq_recent_questions_query',
            bql="""
            SELECT owner_display_name, title, view_count
            FROM `bigquery-public-data.stackoverflow.posts_questions`
            WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
                AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
            ORDER BY view_count DESC
            LIMIT 100
            """.format(max_date=max_query_date, min_date=min_query_date),
            use_legacy_sql=False,
            destination_dataset_table=bq_recent_questions_table_id)

DAG(ワークフロー)の追加と更新

DAGのバケット名を確認

  • gcludコマンド

get(config.dagGcsPrefix): config.dagGcsPrefixの値(DAG バケット名)を取得

gcloud composer environments describe ENVIRONMENT_NAME \
      --location LOCATION \
      --format="get(config.dagGcsPrefix)"

DAG の追加または更新

  • gcludコマンド
gcloud composer environments storage dags import \
        --environment ENVIRONMENT_NAME \
        --location LOCATION \
        --source LOCAL_FILE_TO_UPLOAD

DAG の削除

  • gcludコマンド

DAG を削除しても、DAG メタデータは Airflow ウェブ インターフェースから削除されない。

gcloud composer environments storage dags delete 
        --environment ENVIRONMENT_NAME  \
        --location LOCATION  \
        DAG_NAME.py

KubernetesPodOperator

概要

本来Airflow には Executor という概念があり、DAG 内で定義された各タスクをどのような方式で実行するか、制御方法を指定することができる。(「Airflow Executor」について)
この Executor の 1 つとして Kubernetes Executor というものがあり、実行ノードの指定やボリュームのマウントなどの Kubernetes の機能を享受しながら、各タスクを別の Pod で独立に実行することができるがCloud ComposerCelery Executor 以外の Executor を2020年2月現在時点ではサポートしていないため、Kubernetes Executorを利用したタスクの実行ができない。※「GiXo-Kubernetes Executor」より借用。

そこでKubernetes Executorの代替としてCloud ComposerKubernetesPodOperator を使用することでCloud Composer 環境内Google Kubernetes Engineクラスタに適切なリソースを確保しGKE Pod上でタスクを実行することができる。
alt

定義

KubernetesPodOperatorの定義は後ほど紹介するようにおおよそはK8sのマニフェスト定義で宣言する内容をKubernetesPodOperatorに置き換えたものです。

KubernetesPodOperatorのユースケース

  • 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
  • Cloud Composer ワーカー イメージでは使用できないバイナリ依存関係。

要件

  • Cloud Composer 1.0.0以降
  • 環境に十分なリソースが存在すること

環境に適したリソースの確保

Kubernetes Pod を環境に起動すると、CPU やメモリなどのリソースに対して、プログラム同士が競合する可能性がある。
Airflow スケジューラと Airflow ワーカーは同じ GKE クラスタにあるため、競合によってリソース不足になった場合は、スケジューラとワーカーは正常に動作しない点に注意。

リソース不足の回避策

回避策 1. ノードプールを作成する

新しいノードプールを作成し、そのプールからのリソースのみを使用して実行するように Kubernetes Pod を構成する方法。
ノードプールの作成はコンソールまたはgcloudから行うことができる。

gcloud container node-pools create POOL_NAME \
        --cluster CLUSTER_NAME \
        --project PROJECT_ID \
        --zone ZONE \
        ADDITIONAL_FLAGS 
回避策 2. 環境内のノード数を増やす

ノード数を増やすと、ワーカーが使用できるコンピューティング能力が増加する。
指定したマシンタイプが提供するよりも多くの CPU または RAM を必要とするタスクには、この増加によって追加リソースは提供されない点に注意。

回避策 3. 適切なマシンタイプを指定する

Cloud Composer 環境の作成の際に、マシンタイプを指定する。
使用可能なリソースを確保するためにCloud Composer 環境で行うコンピューティングの種類に適したマシンタイプを指定します。

KubernetesPodOperator の構成

import datetime
    from airflow import models
    from airflow.contrib.kubernetes import secret
    from airflow.contrib.operators import kubernetes_pod_operator

    secret_env = secret.Secret(
        deploy_type='env',
        deploy_target='SQL_CONN',
        secret='airflow-secrets',
        key='sql_alchemy_conn')
   
    YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

    # DAGの宣言
    with models.DAG(
            dag_id='composer_sample_kubernetes_pod',
            schedule_interval=datetime.timedelta(days=1),
            start_date=YESTERDAY) as dag:
        # K8sオペレーター
        kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
            task_id='pod-ex-minimum',
            name='pod-ex-minimum',
            cmds=['echo'],
            namespace='default',
            image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
        # K8sオペレーター
        kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
            task_id='ex-kube-templates',
            name='ex-kube-templates',
            namespace='default',
            image='bash',
            cmds=['echo'],
            arguments=['{{ ds }}'],
            env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
            config_file="{{ conf.get('core', 'kube_config') }}")
        # K8sオペレーター
        kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
            task_id='ex-kube-secrets',
            name='ex-kube-secrets',
            namespace='default',
            image='ubuntu',
            startup_timeout_seconds=300,
            secrets=[secret_env],
            env_vars={'EXAMPLE_VAR': '/example/value'})
        # K8sオペレーター
        kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
            task_id='ex-pod-affinity',
            name='ex-pod-affinity',
            namespace='default',
            image='perl',
            cmds=['perl'],
            arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
            affinity={
                'nodeAffinity': {
                    'requiredDuringSchedulingIgnoredDuringExecution': {
                        'nodeSelectorTerms': [{
                            'matchExpressions': [{
                                # When nodepools are created in Google Kubernetes
                                # Engine, the nodes inside of that nodepool are
                                # automatically assigned the label
                                # 'cloud.google.com/gke-nodepool' with the value of
                                # the nodepool's name.
                                'key': 'cloud.google.com/gke-nodepool',
                                'operator': 'In',
                                # The label key's value that pods can be scheduled
                                # on.
                                'values': [
                                    'pool-0',
                                    'pool-1',
                                ]
                            }]
                        }]
                    }
                }
            })
        # K8sオペレーター
        kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
            task_id='ex-all-configs',
            name='pi',
            namespace='default',
            image='perl',
            cmds=['perl'],
            arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
            secrets=[],
            labels={'pod-label': 'label-name'},
            startup_timeout_seconds=120,
            env_vars={'EXAMPLE_VAR': '/example/value'},
            get_logs=True,
            image_pull_policy='Always',
            annotations={'key1': 'value1'},
            resources={'limit_memory': 1, 'limit_cpu': 1},
            config_file='/home/airflow/composer_kube_config',
            xcom_push=False,
            volumes=[],
            volume_mounts=[],
            affinity={})

jinja テンプレートについて

宣言内で利用した値はgcloud コマンドや環境変数から与えられる

設定方法
env_vars={'MY_VALUE': '{{ var.value.my_value }}'} gcloudコマンド gcloud composer environments run ENVIRONMENT --location LOCATION variables -- --set my_value example_value
config_file="{{ conf.get('core', 'kube_config') }}" gcloudコマンド gcloud composer environments update ENVIRONMENT --location LOCATION --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config

Secret 変数について

Kubernetes Secret は、少量の機密データを含むオブジェクトのこと。SecretKubernetes Pod に渡すには、KubernetesPodOperator を使用してKubernetesSecret を定義する。Secret を定義しない場合、Pod の起動に失敗する。

  • secretの作成例
# 1. Cloud Composer 環境の詳細を表示
# ENVIRONMENT:環境名, LOCATION:リージョン名
gcloud composer environments describe ENVIRONMENT \
        --location LOCATION \
        --format="value(config.gkeCluster)"

# 2. GKE クラスタに接続
gcloud container clusters get-credentials CLUSTER_ID \
    --zone ZONE \
    --project PROJECT

# 3. secretを作成
kubectl create secret generic airflow-secrets \
    --from-literal sql_alchemy_conn=test_value
  • secretオブジェクト生成例
secret_env = secret.Secret(
        # secretの公開種別: envであれば環境変数として公開される
        deploy_type='env',
        # Kubernetes の環境変数 : secretの環境変数名
        deploy_target='SQL_CONN',
        # secret名
        secret='airflow-secrets',
        # secrete key
        key='sql_alchemy_conn')
  • secretオブジェクトの利用例
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
        task_id='ex-kube-secrets',
        name='ex-kube-secrets',
        namespace='default',
        image='ubuntu',
        startup_timeout_seconds=300,
        secrets=[secret_env],
        env_vars={'EXAMPLE_VAR': '/example/value'})
32
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
32
15

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?