1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Airflow資料抄訳(14):Kubernetesエグゼキューター(Kubernetes Executor)

Posted at

恥ずかしながら最近になって知ったワークフローエンジン Apache Airflow。日本語の紹介記事もちらほら出てきていますが、公式ドキュメントをちょっとずつ抄訳しながら読んでいこうと思います。

14回目の今回はKubernetesエグゼキューター(Kubernetes Executor)。
バージョン2.3.3時点のものです。


Kubernetesエグゼキューター(Kubernetes Executor)

Kubernetesは各タスク・インスタンスをKubernetesクラスター上のPodの中で実行します。

KubernetesExecutorそのものはAirflowスケジューラーの中のプロセスとして動きます。スケジューラーそのものが動くのにKubernetesは不要ですが、タスク実行のためにKubernetesクラスターにアクセスできる必要があります。

KubernetesExecutorはバックエンドにSQLite以外の何かしらのデータベースを必要とします。

DAGに含まれるタスクが送信されると、KubernetesExecutorはKubernetes APIを経由してワーカーPodにタスク実行を要求します。ワーカーPodはタスクを実行し、結果を報告し、そして停止します。

image.png

Kubernetesクラスター内の独立した5つのノード上で稼働するAirflowの例を示します:

image.png

標準的なAirflowアーキテクチャと同じで、ワーカー群はDAGに含まれるタスクを実行するためにDAGファイルにアクセスし、メタデータ・リポジトリとやり取りをします。Kubenetesエグゼキューター固有の構成(コンフィギュレーション)情報、例えばワーカー名前空間やコンテナ・イメージの情報などは、Airflowの構成ファイル内で指定されている必要があります。

加えて、KubenetesエグゼキューターはExecutorの設定によりタスクごとに追加の機能を有効化したり無効化したりできます。

image.png

構成(コンフィギュレーション)

pod_template_file

Kubenetesエグゼキューターのワーカー・プロセス群のために使用するPodをカスタマイズするには、まずPodテンプレート・ファイルを用意します。そしてairflow.cfgkubenetesセクション内でpod_template_fileオプションに当該テンプレート・ファイルへのパスを記述します。

Podテンプレート・ファイルについては2つの厳格な要件があります。1つ目はベース・イメージ、2つ目はPod名称です。

ベース・イメージ

pod_template_file が指し示すPodテンプレート・ファイルではspec.containers[0]の位置にbaseという名前のコンテナが記述されている必要があります。そしてそのimageが指定されている必要があります。

この必須のコンテナさえ作成し終えたら、それ以外のサイドカー・コンテナ(補助的なコンテナ)は自由に作成できます。Airflowはコンテナの配列の先頭にワーカー・コンテナが存在していること、そしてそのコンテナの名称がbaseであることを前提とするのです。

Note
Airflowはpod_overrideなどによりベース・コンテナのimageを上書きします。しかしそうであるとしても、テンプレート・ファイルではこの項目を空欄にしてはいけません。

Pod名称

テンプレート・ファイル内のPodの metadata.name は必ず設定します。Podが起動する時点で必ず自動的に設定され、Podの一意性を保証します。ですが、あらためて言います。この項目はテンプレートで指定する必要があります。空白のままにしてはいけません。

Podテンプレートの例

これらの制約を念頭に置きつつ、基本的な pod_template_file YAMLファイルの例をいくつか見ていきましょう。

Note
以下の例はAirflowのデフォルトの設定を使用して動作します。しかし多くのカスタム設定値はこのテンプレートを通じてPodに明示的に渡してやる必要があります。これらの設定にはSQL構成、必須のAirflow接続情報、DAGフォルダーのパスそしてログ出力の設定などが含まれますが、それだけではありません。詳細は構成リファレンスをご覧ください。

DAGをコンテナ・イメージ内で保管する:

---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

DAGをpersistentVolume内で保管する:

---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          readOnly: true
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - name: airflow-dags
      persistentVolumeClaim:
        claimName: RELEASE-NAME-dags
    - emptyDir: {}
      name: airflow-logs
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

DAGをGitリポジトリから取得する:

---
apiVersion: v1
kind: Pod
metadata:
  name: dummy-name
spec:
  initContainers:
    - name: git-sync
      image: "k8s.gcr.io/git-sync/git-sync:v3.4.0"
      env:
        - name: GIT_SYNC_BRANCH
          value: "v2-2-stable"
        - name: GIT_SYNC_REPO
          value: "https://github.com/apache/airflow.git"
        - name: GIT_SYNC_DEPTH
          value: "1"
        - name: GIT_SYNC_ROOT
          value: "/git"
        - name: GIT_SYNC_DEST
          value: "repo"
        - name: GIT_SYNC_ADD_USER
          value: "true"
        - name: GIT_SYNC_ONE_TIME
          value: "true"
      volumeMounts:
        - name: airflow-dags
          mountPath: /git
  containers:
    - env:
        - name: AIRFLOW__CORE__EXECUTOR
          value: LocalExecutor
        # Hard Coded Airflow Envs
        - name: AIRFLOW__CORE__FERNET_KEY
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-fernet-key
              key: fernet-key
        - name: AIRFLOW__DATABASE__SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
        - name: AIRFLOW_CONN_AIRFLOW_DB
          valueFrom:
            secretKeyRef:
              name: RELEASE-NAME-airflow-metadata
              key: connection
      image: dummy_image
      imagePullPolicy: IfNotPresent
      name: base
      volumeMounts:
        - mountPath: "/opt/airflow/logs"
          name: airflow-logs
        - mountPath: /opt/airflow/dags
          name: airflow-dags
          subPath: repo/airflow/example_dags
          readOnly: false
        - mountPath: /opt/airflow/airflow.cfg
          name: airflow-config
          readOnly: true
          subPath: airflow.cfg
  restartPolicy: Never
  securityContext:
    runAsUser: 50000
    fsGroup: 50000
  serviceAccountName: "RELEASE-NAME-worker-serviceaccount"
  volumes:
    - name: airflow-dags
      emptyDir: {}
    - name: airflow-logs
      emptyDir: {}
    - configMap:
        name: RELEASE-NAME-airflow-config
      name: airflow-config

pod_override

KubenetesExecutorを使用するとき、Airflowはシステム・デフォルトの設定をタスク単位の設定で上書きする機能を提供します。この機能を使用するには、KubenetesのV1Podオブジェクトを生成し、設定値をあなたの望む値で上書きします。スケジューラーがこのPodを起動する前に当該オブジェクトのmetadata.nameを上書きする点には注意してください。

KubenetesExecutorにより起動されるPodのベース・コンテナを上書きするには、V1Podを単一のコンテナとともに生成し、次のようにしてフィールドを上書きします:

airflow/example_dags/example_kubernetes_executor.py
        executor_config_volume_mount = {
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            volume_mounts=[
                                k8s.V1VolumeMount(mount_path="/foo/", name="example-kubernetes-test-volume")
                            ],
                        )
                    ],
                    volumes=[
                        k8s.V1Volume(
                            name="example-kubernetes-test-volume",
                            host_path=k8s.V1HostPathVolumeSource(path="/tmp/"),
                        )
                    ],
                )
            ),
        }

        @task(executor_config=executor_config_volume_mount)
        def test_volume_mount():
            """
            Tests whether the volume has been mounted.
            """

            with open('/foo/volume_mount_test.txt', 'w') as foo:
                foo.write('Hello')

            return_code = os.system("cat /foo/volume_mount_test.txt")
            if return_code != 0:
                raise ValueError(f"Error when checking volume mount. Return code {return_code}")

        volume_task = test_volume_mount()

次のフィールド群は、上書きされるのでなく拡張されるのであるという点に注意してください。specではvolumesとinit_containers。containerではvolume mounts、environment variables、ports、そしてdevicesが該当します。

サイドカー・コンテナ(補助的なコンテナ)を必要とする場合、V1Podを生成するとき、baseという名称の空のコンテナに続く第2のコンテナとしてサイドカー・コンテナを追加します。

airflow/example_dags/example_kubernetes_executor.py
        executor_config_sidecar = {
            "pod_override": k8s.V1Pod(
                spec=k8s.V1PodSpec(
                    containers=[
                        k8s.V1Container(
                            name="base",
                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
                        ),
                        k8s.V1Container(
                            name="sidecar",
                            image="ubuntu",
                            args=["echo \"retrieved from mount\" > /shared/test.txt"],
                            command=["bash", "-cx"],
                            volume_mounts=[k8s.V1VolumeMount(mount_path="/shared/", name="shared-empty-dir")],
                        ),
                    ],
                    volumes=[
                        k8s.V1Volume(name="shared-empty-dir", empty_dir=k8s.V1EmptyDirVolumeSource()),
                    ],
                )
            ),
        }

        @task(executor_config=executor_config_sidecar)
        def test_sharedvolume_mount():
            """
            Tests whether the volume has been mounted.
            """
            for i in range(5):
                try:
                    return_code = os.system("cat /shared/test.txt")
                    if return_code != 0:
                        raise ValueError(f"Error when checking volume mount. Return code {return_code}")
                except ValueError as e:
                    if i > 4:
                        raise e

        sidecar_task = test_sharedvolume_mount()

タスクごとにpod_template_fileのカスタマイズ版を作成して使用することで、複数のタスクのあいだで同じ基本の値を使い回すことができます。airflow.cfgの中のpod_template_fileで指定されたデフォルト値を置き換え、pod_overrideを使ってテンプレートを上書きします。

これら2つの機能を利用するタスクの例を示します:

import os

import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.example_dags.libs.helper import print_stuff
from airflow.settings import AIRFLOW_HOME

from kubernetes.client import models as k8s

with DAG(
    dag_id="example_pod_template_file",
    schedule_interval=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["example3"],
) as dag:
    executor_config_template = {
        "pod_template_file": os.path.join(AIRFLOW_HOME, "pod_templates/basic_template.yaml"),
        "pod_override": k8s.V1Pod(metadata=k8s.V1ObjectMeta(labels={"release": "stable"})),
    }

    @task(executor_config=executor_config_template)
    def task_with_template():
        print_stuff()

DAGとログの管理

永続ボリュームの利用は任意であり構成(コンフィギュレーション)に依存します。

  • DAG
    DAGをワーカー内に配置するには、次の方法があります:
    • コンテナ・イメージにDAGを含めてしまう
    • git-syncを使用して、ワーカー・コンテナが起動する前にリポジトリからDAGをgit pullさせる
    • 永続ボリュームにDAGを保管し、すべてのワーカーにそれをマウントさせる
  • ログ
    ログをワーカー外に出力するには、次の方法があります:
    • 永続ボリュームを使用し、Webサーバーとワーカー群双方からマウントさせる
    • リモート・ロギングの機能を有効にする

Note
ログの出力のための設定をしないと、ワーカーPodがシャットダウンした時点でログが失われます。

CeleryExecutorとの比較

CeleryExecutorと対照的に、KubenetesExecutorはRedisのような追加のコンポーネントは必要としませんが、Kubenetesクラスターへのアクセスを必要とします。

Pod群の監視も組み込みのKubenetes監視機能で行います。

KubenetesExecutorでは各タスクはそれ専用のPodの中で実行されます。Podはタスクがキューに追加されたときに生成され、タスクが完了したときに終了します。過去においては、タスクの増減が著しいシナリオにおいて、実行するタスクの有無に関わらず一定数のワーカーを運用するCeleryExecutorよりも、リソース使用率観点で優位性がありました。

しかし、公式のApache Airflow Helm Chartでは、キューに入っているタスクの数に応じて、Celeryワーカー数0からNまで自動的に調節します。この公式Chartを使用する場合、KubenetesExecutorの優位性はなくなります。

Celeryワーカーでは、タスクがキューに入ったときにはすでにワーカーが稼動しているため、タスクの実行待ちが少なくなる傾向がありまし。一方で、複数のタスクが同じノードで実行されることから、タスク設計においてリソース使用の多寡をより強く意識する必要が生じます。とくにメモリー消費についてはそうです。

KubenetesExecutorが役に立つシナリオの1つは、長い時間のかかるタスクがあるときです。もしタスクが実行されている最中にデプロイをすると、当該のタスクが完了(あるいはタイムアウトするなど)するまで実行され続けるからです。対してCeleryExecutorの場合、猶予期間が設定され、その期間中だけタスクが実行されて、期間が終わるとタスクも終了させられます。KubenetesExecutorがうまく機能するもう1つのシナリオは、ワーカーのリソース要件やコンテナ・イメージが不均一である場合です。

最後に申し上げておきますが、KubenetesかCeleryか二者択一ではないという点に注意してください。CeleryKubenetesExecutorなら、CeleryExecutorとKubenetesExecutorを同じクラスター上で同時に使用することができます。CeleryKubenetesExecutorはタスクのqueueを確認してCeleryとKubenetesいずれで実行するか決定します。デフォルトでは、タスクはCeleryワーカーに送信されます。しかしあなたが望むならタスクにそう指定することでKubenetesワーカーPodに送信させることもできます。そしてKubernetesPodOperatorを使用することで同じ効果が得られます。

耐障害性

Tip
KubernetesExecutorにまつわるトラブルシューティングには、airflow kubernetes generate-dag-yamlコマンドを利用できます。このコマンドはKubernetesクラスター内にPod群を生成し、その内容を分析用のYAML形式のファイルにダンプします。

ワーカーPodのクラッシュに対処する

分散システムを扱う上では、システムを構成するいずれかのコンポーネントが一時的にクラッシュすることはありうるという前提でものごとを考える必要があります。クラッシュの理由はOOM(メモリー不足)からノードのOSないしソフトウェア・アップグレードまでさまざまなものがあります。

あるワーカーが自身のステータスをバックエンドのDBに報告する前に死んだ場合、エグゼキューターはKubernetesの監視スレッドを使用して問題のPodを発見します。

image.png

Kubernetes監視スレッドはKubernetesのデータベース内で生じたあらゆる変更を受信することができます。Podが起動し、タスクを処理し、終了し、そしてエラーになる、その都度通知を受けます。このステータス情報の流れを監視することで、KubernetesExecutorはワーカーがクラッシュしたことを検知してタスクの失敗を報告することができるのです。

スケジューラーPodがクラッシュしたらどうなる?

スケジューラーがクラッシュした場合、当該スケジューラーはKubernetesクラスターのウォッチャー(watcher)のresourceVersionを使用して自身の状態を復元します。

Kubernetesクラスターのウォッチャー・スレッドが監視するイベント情報はみなresourceVersion というシーケンシャルに割り振られる数値を持っています。エグゼキューターは常にこのresourceVersionを読み取っており、その最新の値をバックエンドのデータベースに保存しています。この数値が保存されているおかげで、スケジューラーは再起動の後イベント情報のストリームの未処理の部分から処理を再開することができます。タスク群はエグゼキューター(すなわちスケジューラー)のPodとは独立した場所で実行されており、処理結果も直接データベースに登録しているため、スケジューラーにおける問題発生はタスク群に影響しません。


この記事を読むまできちんと整理して理解できていなかったのが、Airflowのシステム構成にDockerやKubernetesを使用する方法にはいくつかあるということです。

代表的なものはこれくらい?:

  • Docker Composeを使用する方法。Airflowを構成するコンポーネントの構築と運用にDocker Composeを使用し、タスク実行にはCeleryExecutorを利用するもの。「これはあくまでも動作確認用途だ」と繰り返し念押しされています。
  • KubernetesクラスターとKubernetesエグゼキューターを使用する方法。Airflowを構成するコンポーネントの構築と運用、そしてワーカーノードの管理にもKubernetesを使用して、なおかつタスク実行にはKubernetesExecutorを利用するもの。今回抄訳したもので、後述のもう1つの方法との比較も示されていました。
  • KubernetesクラスターとCeleryエグゼキューターを使用する方法。Airflowを構成するコンポーネントの構築と運用を使用して、タスク実行にはCeleryExecutorを利用するもの。タスクキューの状況をみてワーカーがスケールするよう構成された公式Helm Chartが提供されています。

Celeryを利用するには複数のバックエンドを準備する必要がありますが、Docker/Kubernetesの提供する仕組みのおかげでそれらを人手で構築する必要はなくなりました。

あとはKubernetesクラスターをどう構築・運用するかですが、これを自前でやるのか、ある程度マネージドな環境を提供するサービスを利用するのか、はたまたAWSが提供するようなすべてひっくるめて完全マネージドなサービスとして利用するのか。

これはケース・バイ・ケースということになりそうですが、Kubernetesクラスターの運用のしんどさを考えると、多くのケースでは多かれ少なかれマネージドなオプションを利用することになりそうですね。

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?