恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。
14回目の今回はKubernetesエグゼキューター(Kubernetes Executor)。
バージョン2.3.3時点のものです。
Kubernetesエグゼキューター(Kubernetes Executor)
Kubernetesは各タスク・インスタンスをKubernetesクラスター上のPodの中で実行します。
KubernetesExecutorそのものはAirflowスケジューラーの中のプロセスとして動きます。スケジューラーそのものが動くのにKubernetesは不要ですが、タスク実行のためにKubernetesクラスターにアクセスできる必要があります。
KubernetesExecutorはバックエンドにSQLite以外の何かしらのデータベースを必要とします。
DAGに含まれるタスクが送信されると、KubernetesExecutorはKubernetes APIを経由してワーカーPodにタスク実行を要求します。ワーカーPodはタスクを実行し、結果を報告し、そして停止します。
Kubernetesクラスター内の独立した5つのノード上で稼働するAirflowの例を示します:
標準的なAirflowアーキテクチャと同じで、ワーカー群はDAGに含まれるタスクを実行するためにDAGファイルにアクセスし、メタデータ・リポジトリとやり取りをします。Kubenetesエグゼキューター固有の構成(コンフィギュレーション)情報、例えばワーカー名前空間やコンテナ・イメージの情報などは、Airflowの構成ファイル内で指定されている必要があります。
加えて、KubenetesエグゼキューターはExecutorの設定によりタスクごとに追加の機能を有効化したり無効化したりできます。
構成(コンフィギュレーション)
pod_template_file
Kubenetesエグゼキューターのワーカー・プロセス群のために使用するPodをカスタマイズするには、まずPodテンプレート・ファイルを用意します。そしてairflow.cfg
のkubenetes
セクション内でpod_template_file
オプションに当該テンプレート・ファイルへのパスを記述します。
Podテンプレート・ファイルについては2つの厳格な要件があります。1つ目はベース・イメージ、2つ目はPod名称です。
ベース・イメージ
pod_template_file
が指し示すPodテンプレート・ファイルではspec.containers[0]
の位置にbase
という名前のコンテナが記述されている必要があります。そしてそのimage
が指定されている必要があります。
この必須のコンテナさえ作成し終えたら、それ以外のサイドカー・コンテナ(補助的なコンテナ)は自由に作成できます。Airflowはコンテナの配列の先頭にワーカー・コンテナが存在していること、そしてそのコンテナの名称がbase
であることを前提とするのです。
Note
Airflowはpod_override
などによりベース・コンテナのimage
を上書きします。しかしそうであるとしても、テンプレート・ファイルではこの項目を空欄にしてはいけません。
Pod名称
テンプレート・ファイル内のPodの metadata.name
は必ず設定します。Podが起動する時点で必ず自動的に設定され、Podの一意性を保証します。ですが、あらためて言います。この項目はテンプレートで指定する必要があります。空白のままにしてはいけません。
Podテンプレートの例
これらの制約を念頭に置きつつ、基本的な pod_template_file
YAMLファイルの例をいくつか見ていきましょう。
Note
以下の例はAirflowのデフォルトの設定を使用して動作します。しかし多くのカスタム設定値はこのテンプレートを通じてPodに明示的に渡してやる必要があります。これらの設定にはSQL構成、必須のAirflow接続情報、DAGフォルダーのパスそしてログ出力の設定などが含まれますが、それだけではありません。詳細は構成リファレンスをご覧ください。
DAGをコンテナ・イメージ内で保管する:
---
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
DAGをpersistentVolume
内で保管する:
---
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/dags
name: airflow-dags
readOnly: true
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: RELEASE-NAME-dags
- emptyDir: {}
name: airflow-logs
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
DAGをGitリポジトリから取得する:
---
apiVersion: v1
kind: Pod
metadata:
name: dummy-name
spec:
initContainers:
- name: git-sync
image: "k8s.gcr.io/git-sync/git-sync:v3.4.0"
env:
- name: GIT_SYNC_BRANCH
value: "v2-2-stable"
- name: GIT_SYNC_REPO
value: "https://github.com/apache/airflow.git"
- name: GIT_SYNC_DEPTH
value: "1"
- name: GIT_SYNC_ROOT
value: "/git"
- name: GIT_SYNC_DEST
value: "repo"
- name: GIT_SYNC_ADD_USER
value: "true"
- name: GIT_SYNC_ONE_TIME
value: "true"
volumeMounts:
- name: airflow-dags
mountPath: /git
containers:
- env:
- name: AIRFLOW__CORE__EXECUTOR
value: LocalExecutor
# Hard Coded Airflow Envs
- name: AIRFLOW__CORE__FERNET_KEY
valueFrom:
secretKeyRef:
name: RELEASE-NAME-fernet-key
key: fernet-key
- name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
- name: AIRFLOW_CONN_AIRFLOW_DB
valueFrom:
secretKeyRef:
name: RELEASE-NAME-airflow-metadata
key: connection
image: dummy_image
imagePullPolicy: IfNotPresent
name: base
volumeMounts:
- mountPath: "/opt/airflow/logs"
name: airflow-logs
- mountPath: /opt/airflow/dags
name: airflow-dags
subPath: repo/airflow/example_dags
readOnly: false
- mountPath: /opt/airflow/airflow.cfg
name: airflow-config
readOnly: true
subPath: airflow.cfg
restartPolicy: Never
securityContext:
runAsUser: 50000
fsGroup: 50000
serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
volumes:
- name: airflow-dags
emptyDir: {}
- name: airflow-logs
emptyDir: {}
- configMap:
name: RELEASE-NAME-airflow-config
name: airflow-config
pod_override
KubenetesExecutorを使用するとき、Airflowはシステム・デフォルトの設定をタスク単位の設定で上書きする機能を提供します。この機能を使用するには、KubenetesのV1Podオブジェクトを生成し、設定値をあなたの望む値で上書きします。スケジューラーがこのPodを起動する前に当該オブジェクトのmetadata.name
を上書きする点には注意してください。
KubenetesExecutorにより起動されるPodのベース・コンテナを上書きするには、V1Podを単一のコンテナとともに生成し、次のようにしてフィールドを上書きします:
executor_config_volume_mount = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[
k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
],
)
],
volumes=[
k8s.V1Volume(
name="example-kubernetes-test-volume",
host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
)
],
)
),
}
@task(executor_config=executor_config_volume_mount)
def test_volume_mount():
"""
Tests whether the volume has been mounted.
"""
with open('/foo/volume_mount_test.txt', 'w') as foo:
foo.write('Hello')
return_code = os.system("cat /foo/volume_mount_test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
volume_task = test_volume_mount()
次のフィールド群は、上書きされるのでなく拡張されるのであるという点に注意してください。specではvolumesとinit_containers。containerではvolume mounts、environment variables、ports、そしてdevicesが該当します。
サイドカー・コンテナ(補助的なコンテナ)を必要とする場合、V1Podを生成するとき、base
という名称の空のコンテナに続く第2のコンテナとしてサイドカー・コンテナを追加します。
executor_config_sidecar = {
"pod_override": k8s.V1Pod(
spec=k8s.V1PodSpec(
containers=[
k8s.V1Container(
name="base",
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
k8s.V1Container(
name="sidecar",
image="ubuntu",
args=["echo \"retrieved from mount\" > /shared/test.txt"],
command=["bash", "-cx"],
volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
),
],
volumes=[
k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
],
)
),
}
@task(executor_config=executor_config_sidecar)
def test_sharedvolume_mount():
"""
Tests whether the volume has been mounted.
"""
for i in range(5):
try:
return_code = os.system("cat /shared/test.txt")
if return_code != 0:
raise ValueError(f"Error when checking volume mount. Return code {return_code}")
except ValueError as e:
if i > 4:
raise e
sidecar_task = test_sharedvolume_mount()
タスクごとにpod_template_file
のカスタマイズ版を作成して使用することで、複数のタスクのあいだで同じ基本の値を使い回すことができます。airflow.cfgの中のpod_template_file
で指定されたデフォルト値を置き換え、pod_override
を使ってテンプレートを上書きします。
これら2つの機能を利用するタスクの例を示します:
import os
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME
from kubernetes.client import models as k8s
with DAG(
dag_id="example_pod_template_file",
schedule_interval=None,
start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
catchup=False,
tags=["example3"],
) as dag:
executor_config_template = {
"pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
"pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
}
@task(executor_config=executor_config_template)
def task_with_template():
print_stuff()
DAGとログの管理
永続ボリュームの利用は任意であり構成(コンフィギュレーション)に依存します。
-
DAG:
DAGをワーカー内に配置するには、次の方法があります:- コンテナ・イメージにDAGを含めてしまう
-
git-sync
を使用して、ワーカー・コンテナが起動する前にリポジトリからDAGをgit pull
させる - 永続ボリュームにDAGを保管し、すべてのワーカーにそれをマウントさせる
-
ログ :
ログをワーカー外に出力するには、次の方法があります:- 永続ボリュームを使用し、Webサーバーとワーカー群双方からマウントさせる
- リモート・ロギングの機能を有効にする
Note
ログの出力のための設定をしないと、ワーカーPodがシャットダウンした時点でログが失われます。
CeleryExecutorとの比較
CeleryExecutorと対照的に、KubenetesExecutorはRedisのような追加のコンポーネントは必要としませんが、Kubenetesクラスターへのアクセスを必要とします。
Pod群の監視も組み込みのKubenetes監視機能で行います。
KubenetesExecutorでは各タスクはそれ専用のPodの中で実行されます。Podはタスクがキューに追加されたときに生成され、タスクが完了したときに終了します。過去においては、タスクの増減が著しいシナリオにおいて、実行するタスクの有無に関わらず一定数のワーカーを運用するCeleryExecutorよりも、リソース使用率観点で優位性がありました。
しかし、公式のApache Airflow Helm Chartでは、キューに入っているタスクの数に応じて、Celeryワーカー数0からNまで自動的に調節します。この公式Chartを使用する場合、KubenetesExecutorの優位性はなくなります。
Celeryワーカーでは、タスクがキューに入ったときにはすでにワーカーが稼動しているため、タスクの実行待ちが少なくなる傾向がありまし。一方で、複数のタスクが同じノードで実行されることから、タスク設計においてリソース使用の多寡をより強く意識する必要が生じます。とくにメモリー消費についてはそうです。
KubenetesExecutorが役に立つシナリオの1つは、長い時間のかかるタスクがあるときです。もしタスクが実行されている最中にデプロイをすると、当該のタスクが完了(あるいはタイムアウトするなど)するまで実行され続けるからです。対してCeleryExecutorの場合、猶予期間が設定され、その期間中だけタスクが実行されて、期間が終わるとタスクも終了させられます。KubenetesExecutorがうまく機能するもう1つのシナリオは、ワーカーのリソース要件やコンテナ・イメージが不均一である場合です。
最後に申し上げておきますが、KubenetesかCeleryか二者択一ではないという点に注意してください。CeleryKubenetesExecutorなら、CeleryExecutorとKubenetesExecutorを同じクラスター上で同時に使用することができます。CeleryKubenetesExecutorはタスクのqueue
を確認してCeleryとKubenetesいずれで実行するか決定します。デフォルトでは、タスクはCeleryワーカーに送信されます。しかしあなたが望むならタスクにそう指定することでKubenetesワーカーPodに送信させることもできます。そしてKubernetesPodOperatorを使用することで同じ効果が得られます。
耐障害性
Tip
KubernetesExecutorにまつわるトラブルシューティングには、airflow kubernetes generate-dag-yaml
コマンドを利用できます。このコマンドはKubernetesクラスター内にPod群を生成し、その内容を分析用のYAML形式のファイルにダンプします。
ワーカーPodのクラッシュに対処する
分散システムを扱う上では、システムを構成するいずれかのコンポーネントが一時的にクラッシュすることはありうるという前提でものごとを考える必要があります。クラッシュの理由はOOM(メモリー不足)からノードのOSないしソフトウェア・アップグレードまでさまざまなものがあります。
あるワーカーが自身のステータスをバックエンドのDBに報告する前に死んだ場合、エグゼキューターはKubernetesの監視スレッドを使用して問題のPodを発見します。
Kubernetes監視スレッドはKubernetesのデータベース内で生じたあらゆる変更を受信することができます。Podが起動し、タスクを処理し、終了し、そしてエラーになる、その都度通知を受けます。このステータス情報の流れを監視することで、KubernetesExecutorはワーカーがクラッシュしたことを検知してタスクの失敗を報告することができるのです。
スケジューラーPodがクラッシュしたらどうなる?
スケジューラーがクラッシュした場合、当該スケジューラーはKubernetesクラスターのウォッチャー(watcher)のresourceVersion
を使用して自身の状態を復元します。
Kubernetesクラスターのウォッチャー・スレッドが監視するイベント情報はみなresourceVersion
というシーケンシャルに割り振られる数値を持っています。エグゼキューターは常にこのresourceVersion
を読み取っており、その最新の値をバックエンドのデータベースに保存しています。この数値が保存されているおかげで、スケジューラーは再起動の後イベント情報のストリームの未処理の部分から処理を再開することができます。タスク群はエグゼキューター(すなわちスケジューラー)のPodとは独立した場所で実行されており、処理結果も直接データベースに登録しているため、スケジューラーにおける問題発生はタスク群に影響しません。
この記事を読むまできちんと整理して理解できていなかったのが、Airflowのシステム構成にDockerやKubernetesを使用する方法にはいくつかあるということです。
代表的なものはこれくらい?:
-
Docker Composeを使用する方法。Airflowを構成するコンポーネントの構築と運用にDocker Composeを使用し、タスク実行には
CeleryExecutor
を利用するもの。「これはあくまでも動作確認用途だ」と繰り返し念押しされています。 -
KubernetesクラスターとKubernetesエグゼキューターを使用する方法。Airflowを構成するコンポーネントの構築と運用、そしてワーカーノードの管理にもKubernetesを使用して、なおかつタスク実行には
KubernetesExecutor
を利用するもの。今回抄訳したもので、後述のもう1つの方法との比較も示されていました。 -
KubernetesクラスターとCeleryエグゼキューターを使用する方法。Airflowを構成するコンポーネントの構築と運用を使用して、タスク実行には
CeleryExecutor
を利用するもの。タスクキューの状況をみてワーカーがスケールするよう構成された公式Helm Chartが提供されています。
Celeryを利用するには複数のバックエンドを準備する必要がありますが、Docker/Kubernetesの提供する仕組みのおかげでそれらを人手で構築する必要はなくなりました。
あとはKubernetesクラスターをどう構築・運用するかですが、これを自前でやるのか、ある程度マネージドな環境を提供するサービスを利用するのか、はたまたAWSが提供するようなすべてひっくるめて完全マネージドなサービスとして利用するのか。
これはケース・バイ・ケースということになりそうですが、Kubernetesクラスターの運用のしんどさを考えると、多くのケースでは多かれ少なかれマネージドなオプションを利用することになりそうですね。