Apahe Airflow tutorial編
Apache Airflowとは
有向非巡回グラフ(DAG)理論を応用したタスク管理システム
Apache Airflowでは一連の処理の名称をDAG名とし、処理単位がグラフのノードとなる。
各ノードにtask_idと呼ばれる一意となる名称をつける。
DAGの定義手順は次の2フェーズにわけることができる。
最初のフェーズでは一連の処理に名前をつけ、処理の内容を記述する
次のフェーズで処理群の関係(依存関係)を記述する
Apache Airflowのインストール
概要
とりあえずdocker-airflowのtutorialをやってみる
docker composeをインストール
docker composeが入ってなかったのでdocker composeをいれます
$ pip3 install docker-compose
docker-airflowリポジトリをclone
$ git clone https://github.com/puckel/docker-airflow.git
docker-airflowのdir構成を確認
$ cd docker-airflow
$ ls -la
drwxr-xr-x@ 14 minarai staff 448 5 7 13:24 .
drwxr-xr-x@ 3 minarai staff 96 5 7 13:24 ..
-rw-r--r-- 1 minarai staff 5 5 7 13:04 .dockerignore
drwxr-xr-x@ 12 minarai staff 384 5 7 13:04 .git
drwxr-xr-x@ 3 minarai staff 96 5 7 13:04 .github
-rw-r--r-- 1 minarai staff 475 5 7 13:04 .gitignore
-rw-r--r-- 1 minarai staff 2426 5 7 13:04 Dockerfile
-rw-r--r-- 1 minarai staff 11355 5 7 13:04 LICENSE
-rw-r--r-- 1 minarai staff 8975 5 7 13:04 README.md
drwxr-xr-x@ 3 minarai staff 96 5 7 13:04 config
drwxr-xr-x@ 4 minarai staff 128 5 7 13:24 dags
-rw-r--r-- 1 minarai staff 2851 5 7 13:21 docker-compose-CeleryExecutor.yml
-rw-r--r-- 1 minarai staff 956 5 7 13:04 docker-compose-LocalExecutor.yml
drwxr-xr-x@ 3 minarai staff 96 5 7 13:04 script
# dags dirの中を確認する
$ ls dags
__pycache__ ç
DAG定義
dags/tuto.py
の中身を確認
ここではbash_operator
を利用している
ドキュメント - airflow.operators
ドキュメント - schedule_interval
JinjaテンプレートをサポートしているのでJinjaテンプレート
での宣言が使える
"""
Code that goes along with the Airflow located at:
http://airflow.readthedocs.org/en/latest/tutorial.html
"""
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta
default_args = {
"owner": "airflow",
"depends_on_past": False,
"start_date": datetime(2020, 5, 5),
"email": ["airflow@airflow.com"],
"email_on_failure": False, # Trueにするとtaskが失敗した際にメールが通知
"email_on_retry": False, # Trueにするとtaskがリトライが発生した際にメールが通知
"retries": 1,
"retry_delay": timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
# DAGを定義
# dag_id : dag id
# default_args : 実行タスクに渡されるデフォルト引数
# schedule_interval : start_date以降に実行されるタスクスケジュール
dag = DAG("tutorial", default_args=default_args, schedule_interval=timedelta(days=1))
# DAGのタスク
# task_id : task id
# bash_command : bash command
# dag : dag インスタンスを指定
t1 = BashOperator(task_id="print_date", bash_command="date", dag=dag)
# DAGのタスク
t2 = BashOperator(task_id="sleep", bash_command="sleep 5", retries=3, dag=dag)
templated_command = """
{% for i in range(5) %}
echo "{{ ds }}"
echo "{{ macros.ds_add(ds, 7)}}"
echo "{{ params.my_param }}"
{% endfor %}
"""
# DAGのタスク
t3 = BashOperator(
task_id="templated",
bash_command=templated_command,
params={"my_param": "Parameter I passed in"},
dag=dag,
)
# ワークフロー t2タスクの上流にt1をセット
t2.set_upstream(t1)
# ワークフロー t3タスクの上流にt1をセット
t3.set_upstream(t1)
# ワークフローは下記のような記述もできる
# t1.set_downstream([t1, t3])
# t1 >> [t2, t3]
# [t2, t3] << t1
apache airflowを起動
起動したら管理画面(http://localhost:8080)へアクセス
$ docker-compose -f docker-compose-LocalExecutor.yml up -d
docker composeをみるとわかるが、./dags
以下をボリュームマウントしていて、./dags
に定義されたDAG IDがリスト表示されている。
表の主な項目としてインフォメーションマークのON/OFF切替
項目 | 意味 |
---|---|
インフォマーク | DAGスケジュール ON/OFF |
DAG | DAGのワークフローを確認 |
Schedule | DAG スケジュール |
Recent Tasks | 直近で実行されたタスクが確認 |
Last Run | 最後に実行した日時 |
DAG Runs | 直近のDAGを確認 |
Links | playマーク:DAGを実行 |
よく使いそうなairflow operators
operator | 役割 |
---|---|
python_operator | 引数python_callableにpython methodを指定して実行 *1 |
bash_operator | 引数bash_commandにコマンドを渡してコマンドライン プログラムを実行 *2 |
bigquery_operator | 引数bqlにbqlを指定して実行 |
email_operator | Emailを送信する |
*1 : GCP Cloud Composer
からはgoogle-cloud-bigquery
, google-cloud-dataflow
, google-cloud-storage
, pandas
, pandas-gbq
, tensorflow
が利用可
*2 : GCP Cloud Composer
からはbqコマンド
, gcloudコマンド
, gsutilコマンド
, kubectlコマンド
も実行できる
apache airflowドキュメント - operaters
Cloud Composer
Cloud Composer編
概要
Cloud Composer は、オープンソースのオーケストレーション ツールとして広く使用されている Apache Airflow で構築されている。
Cloud Composer の認証
サービス アカウント
ローカルの開発でも本番環境のアプリケーションでも、ほとんどの場合、サービス アカウントの使用をおすすめします。
本番環境のアプリケーションで認証を設定する方法は「サーバー間での本番環境アプリケーションの認証の設定」が詳しい。
認証情報の自動検
Google Cloud クライアント ライブラリでは、アプリケーションのデフォルト認証情報(ADC)と呼ばれる方式を使用してアプリケーションの認証情報を検出している。クライアント ライブラリが使用される際は以下の順番で認証情報がチェックされます。
- 環境変数
GOOGLE_APPLICATION_CREDENTIALS
を確認 - 環境変数が設定されていない場合、ADC ではサービスで実行されているアプリケーションに応じて、
Compute Engine
、Google Kubernetes Engine
、Cloud Run
、App Engine
、Cloud Functions
によって提供されているデフォルトのサービス アカウントを使用する。
ユーザーアカウント
プリケーションがエンドユーザーの代わりにリソースにアクセスする必要がある場合、そのアプリケーションでユーザーを直接認証できるが、非推奨です。
アクセス制御
ロールを使用して、リソースに対する認証された ID のアクセス権を制限します。
本番環境アプリケーションを作成するときは、該当する Google Cloud APIs
、機能
、 リソースを操作するために必要な権限
のみを ID に付与します。
Cloud Composer のアクセス制御
Cloud Composer API は、アクセス制御に **Cloud Identity and Access Management(Cloud IAM)**を使用する。
基本ロール
プロジェクトの基本ロールはCloud Composerの基本機能の制御に関する権限を提供する。
ロール | 役職 | 権限 | 説明 |
---|---|---|---|
roles/owner | プロジェクトオーナー | composer.operations.list | Cloud Composer リソースを完全に制御できるようにするための基本ロール。 |
composer.operations.get | |||
composer.operations.delete | |||
composer.environments.list | |||
composer.environments.get | |||
composer.environments.delete | |||
composer.environments.update | |||
composer.environments.create | |||
roles/writer | プロジェクト編集者 | composer.operations.list | Cloud Composer リソースを完全に制御できるようにするための基本ロール。 |
composer.operations.get | |||
composer.operations.delete | |||
composer.environments.list | |||
composer.environments.get | |||
composer.environments.delete | |||
composer.environments.update | |||
composer.environments.create | |||
roles/reader | プロジェクト閲覧者 | composer.operations.list | ユーザーが Cloud Composer リソースを一覧表示、取得できるようにするための基本ロール。 |
composer.operations.get | |||
composer.environments.list | |||
composer.environments.get |
ロール
下記のロールはプロジェクト
リソースに対して下記の権限を有する
ロール | 役職 | 権限 | 説明 |
---|---|---|---|
roles/composer.admin | Composer 管理者 | composer.* | Cloud Composer リソースのすべてを管理 |
serviceusage.quotas.get | |||
serviceusage.services.get | |||
serviceusage.services.list | |||
roles/composer.environmentAndStorageObjectAdmin | 環境とストレージ オブジェクトの管理者 | composer.* | Cloud Composer のリソースとすべてのプロジェクト バケットのオブジェクトを管理できる権限を付与します。 |
resourcemanager.projects.get | |||
resourcemanager.projects.list | |||
serviceusage.quotas.get | |||
serviceusage.services.get | |||
serviceusage.services.list | |||
storage.objects.* | |||
roles/composer.environmentAndStorageObjectViewer | 環境ユーザーとストレージ オブジェクトの閲覧者 | composer.environments.get | Cloud Composer 環境とオペレーションを一覧表示して取得するために必要な権限を付与します。 すべてのプロジェクト バケットのオブジェクトに対して読み取り専用アクセスを許可します。 |
composer.environments.list | |||
composer.imageversions.* | |||
composer.operations.get | |||
composer.operations.list | |||
resourcemanager.projects.get | |||
resourcemanager.projects.list | |||
serviceusage.quotas.get | |||
serviceusage.services.get | |||
serviceusage.services.list | |||
storage.objects.get | |||
storage.objects.list | |||
roles/composer.user | Composer ユーザー | composer.environments.get | Cloud Composer 環境とオペレーションの一覧表示して取得するために必要な権限を付与します。 |
composer.environments.list | |||
composer.imageversions.* | |||
composer.operations.get | |||
composer.operations.list | |||
serviceusage.quotas.get | |||
serviceusage.services.get | |||
serviceusage.services.list | |||
roles/composer.worker | Composer ワーカー | 詳細は割愛 | Cloud Composer 環境 VM の実行に必要な権限を付与します(サービス アカウント向け)。 |
タスクと権限
一般的なタスクと権限の関係についてまとめてあるので各自のユースケースによって必要な権限を有するロールをサービスアカウントに設定するとよい。
タスク | 権限/ロール |
---|---|
IAP で保護された Airflow ウェブ インターフェースにアクセスする | composer.environments.get |
gcloud コマンドライン ツールを使って Airflow CLI を実行する | composer.environments.get |
container.clusters.getCredentials | |
roles/container.developer | |
Cloud Console の [環境] ページを表示する | composer.environments.list |
servicemanagement.projectSettings.get | |
Google Cloud のオペレーション スイートのログと指標を表示する | roles/logging.viewer |
roles/monitoring.viewer | |
環境の作成 | composer.environments.create |
環境の更新と削除(環境変数の設定や Python パッケージのインストール、更新を含む) | environments.delete |
environments.update | |
ファイルを DAG と Plugins フォルダにアップロードし、ログフォルダの Airflow ログにアクセスする | バケットまたはプロジェクト レベルで割り当てられる storage.objectAdmin |
DAG 宛先バケットを検索する composer.environments.get |
サービスアカウント作成
まず初めに行いたいのがサービスアカウントの作成です。
サービスアカウントとはユーザーの代わりにシステムがGCP各サービスを利用するためのアカウントのこと。
アカウントの作成
今回はコンソール サービスアカウントから作成をします。
ユーザー名 | ロール1 |
---|---|
minarai-cloud-composer | roles/composer.worker |
minarai-composer-admin | roles/composer.admin |
minarai-composer-adminは環境の作成を行うためのサービスアカウントとして作成してます。
サービスアカウントの鍵を作成
サービスアカウント一覧にminarai-cloud-composer@****
が表示されたら操作
から**鍵を作成(json)**を選択すると鍵が生成されローカルPCにダウンロードされるので適切な場所に配置する
サービス実行アカウントを変更
作成したサービスアカウントをアクティブにする
- 現在の設定一覧とアクティブなアカウントを確認する
% gcloud config configurations list
NAME IS_ACTIVE ACCOUNT PROJECT COMPUTE_DEFAULT_ZONE COMPUTE_DEFAULT_REGION
default True アカウント@gmail.com プロジェクトID
# (省略化) ついでにCOMPUTE_DEFAULT_ZONEも追加した
gcloud config set compute/region asia-northeast1
gcloud config set compute/zone asia-northeast1-a
# (省略化) 改めて確認
% gcloud config configurations list
NAME IS_ACTIVE ACCOUNT PROJECT COMPUTE_DEFAULT_ZONE COMPUTE_DEFAULT_REGION
default True アカウント@gmail.com プロジェクトID asia-northeast1-a asia-northeast1
構文: gcloud config configurations create NAME FLAG
% gcloud config configurations create minarai-cloud-composer --activate
% gcloud config configurations list
NAME IS_ACTIVE ACCOUNT PROJECT COMPUTE_DEFAULT_ZONE COMPUTE_DEFAULT_REGION
default False アカウント@gmail.com プロジェクトID asia-northeast1-a asia-northeast1
minarai-cloud-composer True
構文: gcloud config set core/project プロジェクトID
構文: gcloud config set core/account サービスアカウント
構文: gcloud config set compute/region リージョン
構文: gcloud config set compute/zone ゾーン
% gcloud config set core/project プロジェクトID
% gcloud config set core/account サービスアカウント
% gcloud config set compute/region asia-northeast1
% gcloud config set compute/zone asia-northeast1-a
% gcloud config configurations list
NAME IS_ACTIVE ACCOUNT PROJECT COMPUTE_DEFAULT_ZONE COMPUTE_DEFAULT_REGION
default False アカウント@gmail.com プロジェクトID asia-northeast1-a asia-northeast1
minarai-cloud-composer True minarai-cloud-composer@アカウント.iam.gserviceaccount.com プロジェクトID asia-northeast1-a asia-northeast1
- サービスアカウントの認証
% gcloud auth activate-service-account --key-file ~/Documents/gcp/key/key.json
環境構築
Cloud Composer の有効化
Cloud Composerを利用するためにはCloud Composerサービスの有効かを行う必要がある。
Cloud Composer API
環境の作成
事前準備
Cloud Composer のドメインで制限された共有は、現在ベータ版です。ドメインで制限された共有ポリシーを有効にしている場合、Cloud Composer 環境を作成するときにベータ版 API を使用する必要があります。
ベータ版 API を使用して Cloud Composer 環境をデプロイする方法については、ベータ版機能のサポートを参照してください。
- Cloud Composer 環境を作成するには、
composer.environments.create
権限が必要 - Cloud Composer 環境は デフォルトでは
Compute Engine
のデフォルトのサービス アカウントとして実行される。Cloud Composer 環境でリソースにアクセスするには、少なくとも、composer.worker
ロールによって提供される権限が必要。 - 現在、VPC Service Controls では Cloud Composer はサポートされていません。Cloud Composer プロジェクトをサービス境界に含めてはなりません
環境の作成
1. コンソールから
「環境作成のページ」から作成する
2. gcloudから
詳細については「新しい環境を作成する」を参照。
LOCATION: Compute Engine リージョン名
gcloud composer environments create ENVIRONMENT_NAME \
--location LOCATION \
OTHER_ARGUMENTS
環境変数の利用
環境を作成または更新するときに、Cloud Composer により Apache Airflow のスケジューラ、ワーカー、ウェブサーバー プロセスに提供される環境変数を追加できます。
たとえば、Cloud Composer では、メール通知に Apache Airflow SendGrid モジュールが使用されます。メール通知を構成するには、環境変数 SENDGRID_API_KEY と SENDGRID_MAIL_FROM を設定する必要があります。
要件
環境変数は、正規表現 [a-zA-Z_][a-zA-Z0-9_]*
と一致する必要があります。
環境変数で Apache Airflow
ソフトウェア構成のオーバーライドを指定しないでください。つまり、変数は $AIRFLOW__{SECTION}__{KEY}
の形式や、正規表現 AIRFLOW__[A-Z0-9_]+__[A-Z0-9_]+)
と一致するものを使用できません。
予約済みの環境変数
予約済の環境変数名は「予約済みの環境変数」の通りです。
- 環境変数作成権限:
composer.environments.create
- 環境変数更新権限:
composer.environments.update
1. コンソールから
「環境作成のページ」から設定することができる
2. gcloudから
詳細については「環境変数の追加と更新」を参照のこと。
gcloud composer environments update ENVIRONMENT_NAME \
--location LOCATION \
--update-env-variables=KEY=VALUE,KEY=VALUE...
Airflow接続の管理
- Airflow接続を使用すると、Cloud Composer環境からGoogle Cloudプロジェクトのリソースにアクセスできる。Airflow接続は、ワークフローで使用されるシークレットと認証情報を保存することを推奨。
- Airflow接続を使用すると、Cloud Composer環境が他のAPI(Google Cloudプロジェクト、他のクラウドプロバイダー、サードパーティサービスなど)と通信するために必要な接続情報を保存できます。
- Airflow接続には、認証情報、ホスト名、追加のAPIパラメータなどの詳細を保存できる。Airflow接続を使用して、ワークフロータスクのシークレットと認証情報を保存することを推奨。
「Airflow接続の管理」より抜粋。
Fernetキーと安全な接続
新しい環境を作成すると、Cloud Composerはその環境に固有の永続的なFernetキーを生成し、デフォルトで接続の追加を保護します。fernet_keyAirflow構成でを表示できます。
デフォルトの接続
デフォルトではGoogle Cloud Platform
の次のAirflow接続
を構成。
- bigquery_default
- google_cloud_default
- google_cloud_datastore_default
- google_cloud_storage_default
デフォルトの接続IDを使用して、DAGから接続
bigquery_conn_id
を指定せずともbigquery_default
のIDが利用して接続を試みる
task_default = bigquery_operator.BigQueryOperator(
task_id='task_default_connection',
bql='SELECT 1', use_legacy_sql=False)
# 接続IDを明示的に指定
task_explicit = bigquery_operator.BigQueryOperator(
task_id='task_explicit_connection',
bql='SELECT 1', use_legacy_sql=False,
# Composer creates a 'google_cloud_default' connection by default.
bigquery_conn_id='google_cloud_default')
別のプロジェクトのリソースにアクセスするケースは「別のプロジェクトのリソースにアクセスする」を参考にしたい。
新しいAirflow接続の作成
もし、デフォルトの接続が利用できない場合は新しいAirflow接続を作成することができる。
新しいAirflow接続を作成
gcloud composer environments run test-environment \
--location us-central1 connections -- --add \
--conn_id=my_gcp_connection --conn_type=google_cloud_platform \
--conn_extra '{"extra\__google\_cloud\_platform\__project": "your-storage-project",
"extra\__google\_cloud\_platform\__key_path": "/home/airflow/gcs/data/keyfile.json",
"extra\__google\_cloud\_platform\__scope": "https://www.googleapis.com/auth/cloud-platform"}'
新しいAirflow接続を利用
task_custom = bigquery_operator.BigQueryOperator(
task_id='task_custom_connection',
bql='SELECT 1', use_legacy_sql=False,
# Set a connection ID to use a connection that you have created.
bigquery_conn_id='my_gcp_connection')
外部データベースへの接続の構成
Cloud Composerは、環境内にデフォルトのCloud SQLプロキシを提供し、アプリケーション、クライアント、または他のGoogle Cloudサービスから環境のCloud SQLデータベースへのアクセスをリモートで承認します。
Cloud ComposerをSQLデータベースやプライベートIP Cloud SQLインスタンスなどの外部データベースに接続 するには、新しい SQLプロキシポッド(yaml) を環境のGKEクラスタにデプロイする必要があります。
Cloud Composer でDAGの作成
DAGの定義はGCPでOperaterが実装されている以外は基本的にApache Airflowと同じ。
Cloud Composer - DAG(ワークフロー)の作成
from __future__ import print_function
import datetime
from airflow import models
from airflow.operators import bash_operator
from airflow.operators import python_operator
default_dag_args = {
'start_date': datetime.datetime(2018, 1, 1),
}
with models.DAG(
'composer_sample_simple_greeting',
schedule_interval=datetime.timedelta(days=1),
default_args=default_dag_args) as dag:
def greeting():
import logging
logging.info('Hello World!')
hello_python = python_operator.PythonOperator(
task_id='hello',
python_callable=greeting)
# Likewise, the goodbye_bash task calls a Bash script.
goodbye_bash = bash_operator.BashOperator(
task_id='bye',
bash_command='echo Goodbye.')
hello_python >> goodbye_bash
下記の例のようにBash Operatorではgcloudコマンド
やbqコマンド
などが利用できるし、Python Operatorではpip installによってgoogle-cloud-bigquery
などが利用できる。
from airflow.operators import bash_operator
# Create BigQuery output dataset.
make_bq_dataset = bash_operator.BashOperator(
task_id='make_bq_dataset',
# Executing 'bq' command requires Google Cloud SDK which comes
# preinstalled in Cloud Composer.
bash_command='bq ls {} || bq mk {}'.format(
bq_dataset_name, bq_dataset_name))
Bigquery Operatorを使ってBigQueryを操作するタスクを作成することもできる
from airflow.contrib.operators import bigquery_operator
# Query recent StackOverflow questions.
bq_recent_questions_query = bigquery_operator.BigQueryOperator(
task_id='bq_recent_questions_query',
bql="""
SELECT owner_display_name, title, view_count
FROM `bigquery-public-data.stackoverflow.posts_questions`
WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)
AND creation_date >= CAST('{min_date}' AS TIMESTAMP)
ORDER BY view_count DESC
LIMIT 100
""".format(max_date=max_query_date, min_date=min_query_date),
use_legacy_sql=False,
destination_dataset_table=bq_recent_questions_table_id)
DAG(ワークフロー)の追加と更新
DAGのバケット名を確認
- gcludコマンド
get(config.dagGcsPrefix)
: config.dagGcsPrefixの値(DAG バケット名)を取得
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
DAG の追加または更新
- gcludコマンド
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source LOCAL_FILE_TO_UPLOAD
DAG の削除
- gcludコマンド
DAG を削除しても、DAG メタデータは Airflow ウェブ インターフェースから削除されない。
gcloud composer environments storage dags delete
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_NAME.py
KubernetesPodOperator
概要
本来Airflow
には Executor
という概念があり、DAG 内で定義された各タスクをどのような方式で実行するか、制御方法を指定することができる。(「Airflow Executor」について)
この Executor の 1 つとして Kubernetes Executor というものがあり、実行ノードの指定やボリュームのマウントなどの Kubernetes の機能を享受しながら、各タスクを別の Pod で独立に実行することができるがCloud Composer
は Celery Executor
以外の Executor
を2020年2月現在時点ではサポートしていないため、Kubernetes Executor
を利用したタスクの実行ができない。※「GiXo-Kubernetes Executor」より借用。
そこでKubernetes Executor
の代替としてCloud Composer
のKubernetesPodOperator
を使用することでCloud Composer 環境内の Google Kubernetes Engineクラスタに適切なリソースを確保しGKE Pod
上でタスクを実行することができる。
定義
KubernetesPodOperator
の定義は後ほど紹介するようにおおよそはK8sのマニフェスト定義で宣言する内容をKubernetesPodOperator
に置き換えたものです。
KubernetesPodOperatorのユースケース
- 公開 PyPI リポジトリでは使用できないカスタム Python 依存関係。
- Cloud Composer ワーカー イメージでは使用できないバイナリ依存関係。
要件
- Cloud Composer 1.0.0以降
- 環境に十分なリソースが存在すること
環境に適したリソースの確保
Kubernetes Pod を環境に起動すると、CPU やメモリなどのリソースに対して、プログラム同士が競合する可能性がある。
Airflow スケジューラと Airflow ワーカーは同じ GKE クラスタにあるため、競合によってリソース不足になった場合は、スケジューラとワーカーは正常に動作しない点に注意。
リソース不足の回避策
回避策 1. ノードプールを作成する
新しいノードプールを作成し、そのプールからのリソースのみを使用して実行するように Kubernetes Pod を構成する方法。
ノードプールの作成はコンソールまたはgcloudから行うことができる。
gcloud container node-pools create POOL_NAME \
--cluster CLUSTER_NAME \
--project PROJECT_ID \
--zone ZONE \
ADDITIONAL_FLAGS
回避策 2. 環境内のノード数を増やす
ノード数を増やすと、ワーカーが使用できるコンピューティング能力が増加する。
指定したマシンタイプが提供するよりも多くの CPU または RAM を必要とするタスクには、この増加によって追加リソースは提供されない点に注意。
回避策 3. 適切なマシンタイプを指定する
Cloud Composer 環境の作成の際に、マシンタイプを指定する。
使用可能なリソースを確保するためにCloud Composer 環境で行うコンピューティングの種類に適したマシンタイプを指定します。
KubernetesPodOperator の構成
import datetime
from airflow import models
from airflow.contrib.kubernetes import secret
from airflow.contrib.operators import kubernetes_pod_operator
secret_env = secret.Secret(
deploy_type='env',
deploy_target='SQL_CONN',
secret='airflow-secrets',
key='sql_alchemy_conn')
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
# DAGの宣言
with models.DAG(
dag_id='composer_sample_kubernetes_pod',
schedule_interval=datetime.timedelta(days=1),
start_date=YESTERDAY) as dag:
# K8sオペレーター
kubernetes_min_pod = kubernetes_pod_operator.KubernetesPodOperator(
task_id='pod-ex-minimum',
name='pod-ex-minimum',
cmds=['echo'],
namespace='default',
image='gcr.io/gcp-runtimes/ubuntu_18_0_4')
# K8sオペレーター
kubenetes_template_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id='ex-kube-templates',
name='ex-kube-templates',
namespace='default',
image='bash',
cmds=['echo'],
arguments=['{{ ds }}'],
env_vars={'MY_VALUE': '{{ var.value.my_value }}'},
config_file="{{ conf.get('core', 'kube_config') }}")
# K8sオペレーター
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id='ex-kube-secrets',
name='ex-kube-secrets',
namespace='default',
image='ubuntu',
startup_timeout_seconds=300,
secrets=[secret_env],
env_vars={'EXAMPLE_VAR': '/example/value'})
# K8sオペレーター
kubernetes_affinity_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id='ex-pod-affinity',
name='ex-pod-affinity',
namespace='default',
image='perl',
cmds=['perl'],
arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
affinity={
'nodeAffinity': {
'requiredDuringSchedulingIgnoredDuringExecution': {
'nodeSelectorTerms': [{
'matchExpressions': [{
# When nodepools are created in Google Kubernetes
# Engine, the nodes inside of that nodepool are
# automatically assigned the label
# 'cloud.google.com/gke-nodepool' with the value of
# the nodepool's name.
'key': 'cloud.google.com/gke-nodepool',
'operator': 'In',
# The label key's value that pods can be scheduled
# on.
'values': [
'pool-0',
'pool-1',
]
}]
}]
}
}
})
# K8sオペレーター
kubernetes_full_pod = kubernetes_pod_operator.KubernetesPodOperator(
task_id='ex-all-configs',
name='pi',
namespace='default',
image='perl',
cmds=['perl'],
arguments=['-Mbignum=bpi', '-wle', 'print bpi(2000)'],
secrets=[],
labels={'pod-label': 'label-name'},
startup_timeout_seconds=120,
env_vars={'EXAMPLE_VAR': '/example/value'},
get_logs=True,
image_pull_policy='Always',
annotations={'key1': 'value1'},
resources={'limit_memory': 1, 'limit_cpu': 1},
config_file='/home/airflow/composer_kube_config',
xcom_push=False,
volumes=[],
volume_mounts=[],
affinity={})
jinja テンプレートについて
宣言内で利用した値はgcloud コマンド
や環境変数から与えられる
値 | 設定方法 | 例 |
---|---|---|
env_vars={'MY_VALUE': '{{ var.value.my_value }}'} | gcloudコマンド | gcloud composer environments run ENVIRONMENT --location LOCATION variables -- --set my_value example_value |
config_file="{{ conf.get('core', 'kube_config') }}" | gcloudコマンド | gcloud composer environments update ENVIRONMENT --location LOCATION --update-airflow-configs=core-kube_config=/home/airflow/composer_kube_config |
Secret 変数について
Kubernetes Secret は、少量の機密データを含むオブジェクトのこと。Secret
を Kubernetes Pod
に渡すには、KubernetesPodOperator
を使用してKubernetes
で Secret
を定義する。Secret
を定義しない場合、Pod の起動に失敗する。
- secretの作成例
# 1. Cloud Composer 環境の詳細を表示
# ENVIRONMENT:環境名, LOCATION:リージョン名
gcloud composer environments describe ENVIRONMENT \
--location LOCATION \
--format="value(config.gkeCluster)"
# 2. GKE クラスタに接続
gcloud container clusters get-credentials CLUSTER_ID \
--zone ZONE \
--project PROJECT
# 3. secretを作成
kubectl create secret generic airflow-secrets \
--from-literal sql_alchemy_conn=test_value
- secretオブジェクト生成例
secret_env = secret.Secret(
# secretの公開種別: envであれば環境変数として公開される
deploy_type='env',
# Kubernetes の環境変数 : secretの環境変数名
deploy_target='SQL_CONN',
# secret名
secret='airflow-secrets',
# secrete key
key='sql_alchemy_conn')
- secretオブジェクトの利用例
kubernetes_secret_vars_ex = kubernetes_pod_operator.KubernetesPodOperator(
task_id='ex-kube-secrets',
name='ex-kube-secrets',
namespace='default',
image='ubuntu',
startup_timeout_seconds=300,
secrets=[secret_env],
env_vars={'EXAMPLE_VAR': '/example/value'})