概要
KubernetesPodOperatorを使って、DAGでPodを起動させる。
目次
バージョン
DAGを作成
airflow.template.yamlのschedulerと同じ構成のPodを起動するようにしてみる。
# -*- coding: utf-8 -*-
import airflow
from datetime import timedelta
from airflow import DAG
from airflow.contrib.kubernetes.volume import Volume
from airflow.contrib.kubernetes.volume_mount import VolumeMount
from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator
from airflow.contrib.kubernetes.secret import Secret
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': airflow.utils.dates.days_ago(2),
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
dag = DAG(
dag_id='sample_k8s',
default_args=default_args,
description='A sample DAG for k8s',
schedule_interval=timedelta(days=1),
dagrun_timeout=timedelta(minutes=60),
)
# 環境変数
secret_file = Secret('volume', '/etc/sql_conn', 'airflow-secrets', 'sql_alchemy_conn')
secret_env = Secret('env', 'SQL_ALCHEMY_CONN', 'airflow-secrets', 'sql_alchemy_conn')
env_vars = {'AIRFLOW_KUBE_NAMESPACE': 'airflow'}
# Volume
volume_mounts = [VolumeMount('airflow-configmap',
mount_path='/root/airflow/airflow.cfg',
sub_path='airflow.cfg',
read_only=True),
VolumeMount('airflow-dags',
mount_path='/root/airflow/dags',
sub_path=None,
read_only=True),
VolumeMount('airflow-logs',
mount_path='/root/airflow/logs',
sub_path=None,
read_only=True)]
volumes = [Volume(name='airflow-dags', configs={'persistentVolumeClaim': {'claimName': 'airflow-dags'}}),
Volume(name='airflow-dags-fake', configs={'emptyDir': {}}),
Volume(name='airflow-dags-git', configs={'emptyDir': {}}),
Volume(name='test-volume', configs={'persistentVolumeClaim': {'claimName': 'test-volume'}}),
Volume(name='airflow-logs', configs={'persistentVolumeClaim': {'claimName': 'airflow-logs'}}),
Volume(name='airflow-configmap', configs={'configMap': {'name': 'airflow-configmap'}})]
k1 = KubernetesPodOperator(namespace='airflow', # コンテナを起動するk8sのnamespace
task_id='test-task001',
name='test-container001', # コンテナ名に使用される
image="airflow:latest", # 使用するDockerイメージ
image_pull_policy='IfNotPresent',
env_vars=env_vars,
secrets=[secret_file, secret_env],
volumes=volumes,
volume_mounts=volume_mounts,
cmds=["bash", "-cx"],
arguments=['echo "[SQL_ALCHEMY_CONN] $SQL_ALCHEMY_CONN" && echo "[AIRFLOW_KUBE_NAMESPACE] $AIRFLOW_KUBE_NAMESPACE" && sleep 300'],
in_cluster=True, # 同一clusterにコンテナを起動するか
is_delete_operator_pod=True, # DAG完了後にPodを削除するか
dag=dag
)
k1
2019/08/21現在、env_varsでvalueFrom
は利用できない。
以下で対応中のようだが、まだマージされていない。
- Issue
- Pull Request
DAGを実行
以下の画像のように、sample_k8sをOnにし、Trigger Dagの再生ボタンをクリックして、DAGを実行する。
Podを確認する
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6
というPodと、
test-container001-8ac39563
という、DAGの中で定義したPodの2つが起動している。
前者のPodはDAGを実行するためのもの、後者はDAGによって起動されたものとなる。
(Airflow on Kubernetesでは1つのDAG実行につき1つのPodが起動する)
$ sudo kubectl get pod -w
NAME READY STATUS RESTARTS AGE
airflow-75b8fff687-xrsbr 2/2 Running 0 48s
postgres-airflow-84dfd85977-6tpdh 1/1 Running 0 23m
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6 0/1 Pending 0 0s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6 0/1 Pending 0 0s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6 0/1 ContainerCreating 0 0s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6 1/1 Running 0 2s
test-container001-8ac39563 0/1 Pending 0 0s
test-container001-8ac39563 0/1 Pending 0 0s
test-container001-8ac39563 0/1 ContainerCreating 0 0s
test-container001-8ac39563 1/1 Running 0 2s
test-container001-8ac39563 0/1 Completed 0 61s
test-container001-8ac39563 0/1 Terminating 0 63s
test-container001-8ac39563 0/1 Terminating 0 63s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6 0/1 Completed 0 68s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6 0/1 Terminating 0 70s
samplek8stesttask001-a3149539979d4f10aee024c4a09e09f6 0/1 Terminating 0 70s
DAGの完了を確認
Log
ログの確認方法は以下を参照。
DAGの実行結果ログを確認する
以下の様にログに環境変数の内容が出力されていればOK。
[2019-08-21 01:46:34,651] {logging_mixin.py:95} INFO - [[34m2019-08-21 01:46:34,651[0m] {[34mpod_launcher.py:[0m105} INFO[0m - [SQL_ALCHEMY_CONN] postgresql+psycopg2://root:root@postgres-airflow:5432/airflow
[2019-08-21 01:46:34,652] {logging_mixin.py:95} INFO - [[34m2019-08-21 01:46:34,652[0m] {[34mpod_launcher.py:[0m105} INFO[0m - [AIRFLOW_KUBE_NAMESPACE] airflow
cannot import name KubernetesOperatorというエラーが出る
以下のサンプルスクリプトで、from airflow.contrib.operators import KubernetesOperator
という一行がある。
https://airflow.apache.org/kubernetes.html#kubernetes-operator
しかし、これを実行すると、以下のエラーが出力される。
Broken DAG: [/root/airflow/dags/sample_k8s.py] cannot import name KubernetesOperator
以下のソースを調べてみると、KubernetesOperator
は実装されていないように見える。
また、ドキュメントのサンプルスクリプトでも、importされているだけで、呼び出されている箇所はない。
- https://github.com/apache/airflow/tree/1.10.4/airflow/contrib/operators
- https://github.com/apache/airflow/tree/1.10.4/airflow/operators
AirflowにIssueを出しました