11
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

【Airflow on Kubernetes】KubernetesPodOperatorの使い方

Last updated at Posted at 2019-08-15

概要

KubernetesPodOperatorを使って、DAGでPodを起動させる。

目次

バージョン

DAGを作成

airflow.template.yamlのschedulerと同じ構成のPodを起動するようにしてみる。

sample_k8s.py
# -*- coding: utf-8 -*-
import airflow
from datetime import timedelta
from airflow import DAG
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.secret import Secret


default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': airflow.utils.dates.days_ago(2),
    'email': ['airflow@example.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

dag = DAG(
    dag_id='sample_k8s',
    default_args=default_args,
    description='A sample DAG for k8s',
    schedule_interval=timedelta(days=1),
    dagrun_timeout=timedelta(minutes=60),
)

# 環境変数
secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
secret_env  = Secret('env', 'SQL_ALCHEMY_CONN', 'airflow-secrets', 'sql_alchemy_conn')
env_vars = {'AIRFLOW_KUBE_NAMESPACE': 'airflow'}

# Volume
volume_mounts = [VolumeMount('airflow-configmap',
                             mount_path='/root/airflow/airflow.cfg',
                             sub_path='airflow.cfg',
                             read_only=True),
                 VolumeMount('airflow-dags',
                             mount_path='/root/airflow/dags',
                             sub_path=None,
                             read_only=True),
                 VolumeMount('airflow-logs',
                             mount_path='/root/airflow/logs',
                             sub_path=None,
                             read_only=True)]

volumes = [Volume(name='airflow-dags', configs={'persistentVolumeClaim': {'claimName': 'airflow-dags'}}),
           Volume(name='airflow-dags-fake', configs={'emptyDir': {}}),
           Volume(name='airflow-dags-git', configs={'emptyDir': {}}),
           Volume(name='test-volume', configs={'persistentVolumeClaim': {'claimName': 'test-volume'}}),
           Volume(name='airflow-logs', configs={'persistentVolumeClaim': {'claimName': 'airflow-logs'}}),
           Volume(name='airflow-configmap', configs={'configMap': {'name': 'airflow-configmap'}})]

k1 = KubernetesPodOperator(namespace='airflow', # コンテナを起動するk8sのnamespace
                           task_id='test-task001',
                           name='test-container001', # コンテナ名に使用される
                           image="airflow:latest", # 使用するDockerイメージ
                           image_pull_policy='IfNotPresent',
                           env_vars=env_vars,
                           secrets=[secret_file, secret_env],
                           volumes=volumes,
                           volume_mounts=volume_mounts,
                           cmds=["bash", "-cx"],
                           arguments=['echo "[SQL_ALCHEMY_CONN] $SQL_ALCHEMY_CONN" && echo "[AIRFLOW_KUBE_NAMESPACE] $AIRFLOW_KUBE_NAMESPACE" && sleep 300'],
                           in_cluster=True, # 同一clusterにコンテナを起動するか
                           is_delete_operator_pod=True, # DAG完了後にPodを削除するか
                           dag=dag
                          )

k1

2019/08/21現在、env_varsでvalueFromは利用できない。
以下で対応中のようだが、まだマージされていない。

DAGを実行

以下の画像のように、sample_k8sをOnにし、Trigger Dagの再生ボタンをクリックして、DAGを実行する。

Screen Shot 2019-08-14 at 16.30.02.png

Podを確認する

samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6というPodと、
test-container001-8ac39563という、DAGの中で定義したPodの2つが起動している。

前者のPodはDAGを実行するためのもの、後者はDAGによって起動されたものとなる。
(Airflow on Kubernetesでは1つのDAG実行につき1つのPodが起動する)

$ sudo kubectl get pod -w
NAME                                READY   STATUS    RESTARTS   AGE
airflow-75b8fff687-xrsbr            2/2     Running   0          48s
postgres-airflow-84dfd85977-6tpdh   1/1     Running   0          23m
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6   0/1     Pending   0          0s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6   0/1     Pending   0          0s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6   0/1     ContainerCreating   0          0s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6   1/1     Running             0          2s
test-container001-8ac39563                              0/1     Pending             0          0s
test-container001-8ac39563                              0/1     Pending             0          0s
test-container001-8ac39563                              0/1     ContainerCreating   0          0s
test-container001-8ac39563                              1/1     Running             0          2s
test-container001-8ac39563                              0/1     Completed           0          61s
test-container001-8ac39563                              0/1     Terminating         0          63s
test-container001-8ac39563                              0/1     Terminating         0          63s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6   0/1     Completed           0          68s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6   0/1     Terminating         0          70s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6   0/1     Terminating         0          70s

DAGの完了を確認

Stateがsuccessになっていれば完了。
Screen Shot 2019-08-14 at 16.42.14.png

Log

ログの確認方法は以下を参照。
DAGの実行結果ログを確認する

以下の様にログに環境変数の内容が出力されていればOK。

[2019-08-21 01:46:34,651] {logging_mixin.py:95} INFO - [[34m2019-08-21 01:46:34,651[0m] {[34mpod_launcher.py:[0m105} INFO[0m - [SQL_ALCHEMY_CONN] postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
[2019-08-21 01:46:34,652] {logging_mixin.py:95} INFO - [[34m2019-08-21 01:46:34,652[0m] {[34mpod_launcher.py:[0m105} INFO[0m - [AIRFLOW_KUBE_NAMESPACE] airflow

cannot import name KubernetesOperatorというエラーが出る

以下のサンプルスクリプトで、from airflow.contrib.operators import KubernetesOperatorという一行がある。
https://airflow.apache.org/kubernetes.html#kubernetes-operator

しかし、これを実行すると、以下のエラーが出力される。

Broken DAG: [/root/airflow/dags/sample_k8s.py] cannot import name KubernetesOperator

以下のソースを調べてみると、KubernetesOperatorは実装されていないように見える。
また、ドキュメントのサンプルスクリプトでも、importされているだけで、呼び出されている箇所はない。

AirflowにIssueを出しました

参考

11
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?