はじめに
昨年末にAirflowをさわってみてなかなか便利だと思いました。
【Airflow】最近よく聞くAirflowに入門!EC2で動かしてみた【CI/CD】
そこで次のステップとしてKubernetesとの連携に挑戦してみました。検索してみると「Airflow on Kubernetes」と呼ばれているようです。調べたかぎり、日本語の記事があまりなかったのでこの記事が役にたてばいいなと思います。
=====2020.1.7 追記=====
関連記事をリンクします。
【Airflow on Kubernetes】DockerイメージのbuildとPodのdeployの仕組みについて
【Airflow on Kubernetes】KubernetesPodOperatorの使い方
Airflow on Kubernetesとは ?
タスクスケジューリングやGUIでの実行管理が可能なAirflowはクラスタを組んで構築したDAGの処理を行うことができるのですが、Airflow on KubernetsではKubernetes上にAirflowのポッドをデプロイすることでAirflowで管理している処理をKubernetesクラスタを使って実現できます。
Airflowのためにクラスタをわざわざ組まなくても既存のKubernetes環境を利用してETL基盤を構築することが可能になります。
環境構築
ざっくり2stepで環境が作れるのですが、色々落とし穴があったのでそこらへんもまとめていきます。
step1. Kubernetesのクラスタ構築
step2. KubernetesにAirflowをデプロイ
step1. Kubernetesのクラスタ構築
これに関しては色々な方法があると思うので好みの方法で行ってもえればと思います。
自分は以下の記事でkubeadmで構築しました。
AWSでCentOS 7にKubernetes 1.11をkubeadmでインストール
注意点
最初にt2.microで構築しようとしたらメモリーエラーでpodがEvictedになってうまく動かないことがあったので、最低でもt2.smallでmaster/workerを構築するといいと思います。
step2. KubernetesにAirflowをデプロイ
まず、masterにgitをインストールします。
sudo yum install git
gitコマンドをつかって公式のairflowのリポジトリをクローンしてきます。
git clone https://github.com/apache/incubator-airflow.git
続いて、ディレクトリを移動して、以下のスクリプトを実行します。
cd incubator-airflow/scripts/ci/kubernetes
./docker/build.sh
このスクリプトを実行するとairflowのイメージができます。当然つぎにデプロイをするのですがここで注意点。
注意点
masterノードだけでなく、workerノード全てにairflowのイメージが必要になります。
このイメージがないと、永遠とデプロイができません。
全ノードにairflowのイメージができたら、masterノード以下のコマンドを実行します。
./kube/deploy.sh -d persistent_mode
最終的にairflowのpodのデプロイ状況がechoで出力され続けます。全ノードにairflowのイメージができていれば、airflowとpostgreのポッドが無事にデプロイできているかと思います。
うまくいかない場合は、Ctl+Cで出力をとめてkubectl describe pod/[aiflowのpod名]
で何が原因でデプロイできていないか確認しましょう。
これで環境構築は終了です。次に実際に動かしていきます。
実践!Airflow on Kubernetes
ここまできたらあとは簡単です。Airflowの管理画面から作成したDAGを実行するのみです。
Airflow管理画面
デプロイしたAirflowポッドにポートフォワードすることで管理画面にアクセスすることが可能です。
kubectl port-forward [Airflowのポッド名] 8080:8080 --address="0.0.0.0"
Airflowのポッドの8080で管理アプリが起動しているため、8080にポートフォワードします。さらに外部からのアクセスを受け入れるために---address="0.0.0.0"
をオプションでつけます。
http://[kubernetesのmasterのIP]:8080/ にアクセスして以下の画面がひらけば成功です。
主にここでDAGの管理を行うのですが、ここにリストされているexample群のスクリプトはどこにあるんでしょうか??
正解はAirflowのポッドの中にあります。
DAGスクリプトの管理
Airflowポッド内に入るコマンドは以下です。
kubectl exec -it [Airflowポッド名] /bin/bash
DAGスクリプトの管理は以下のディレクトリパスにあります。
root@[Airflowポッド名]:~#cd root/airflow/dags/
root@[Airflowポッド名]:~/airflow/das/# ls
__init__.py example_docker_operator.py example_passing_params_via_test_command.py example_xcom.py
__init__.pyc example_docker_operator.pyc example_passing_params_via_test_command.pyc example_xcom.pyc
dev_etl.py example_http_operator.py example_python_operator.py libs
dev_etl.pyc example_http_operator.pyc example_python_
上記のファイル名が管理画面のDAG名と一致するかと思います。なので、生成したDAGスクリプトをこのディレクトリにいれると管理画面で実行することが可能になります。(dev_etl.pyは自作DAGです。)
airflowポッド内ではvimも使えないので、apt-get install vim
でvimをインストールするか外部で作ったスクリプトをkubectl cp [スクリプト名] [airflowポッド名]:/root/airflow/dags/
でポッドに送る方法をおすすめします。
DAG実行
あとは実行するのみです。
実行は左のスイッチをonにするだけです。onにするとSchedule欄のタイミングでDAGが実行されます。もし、その場で実行したい場合はLinks欄の一番左のボタンを押すとScheduleに関係なく実行されます。
試しにexample_bash_operatorというDAG名のDAGを実行してみます。
DAGの実行が開始してからkubectl get pod
でpodを確認してみると以下のようになっていると思います。
DAG名タスクid-~というポッド名がデプロイされているのがわかります。
ちなみにこのDAGスクリプトは以下のようなpythonです。
from builtins import range
from datetime import timedelta
import airflow
from airflow.models import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
}
dag = DAG(
dag_id='example_bash_operator',
default_args=args,
schedule_interval='0 0 * * *',
dagrun_timeout=timedelta(minutes=60),
)
run_this_last = DummyOperator(
task_id='run_this_last',
dag=dag,
)
# [START howto_operator_bash]
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag,
)
# [END howto_operator_bash]
run_this >> run_this_last
for i in range(3):
task = BashOperator(
task_id='runme_' + str(i),
bash_command='echo "{{ task_instance_key_str }}" && sleep 1',
dag=dag,
)
task >> run_this
# [START howto_operator_bash_template]
also_run_this = BashOperator(
task_id='also_run_this',
bash_command='echo "run_id={{ run_id }} | dag_run={{ dag_run }}"',
dag=dag,
)
# [END howto_operator_bash_template]
also_run_this >> run_this_last
if __name__ == "__main__":
dag.cli()
このスクリプトからわかるように特殊な設定をしなくてもAirflowで管理・実行したDAGがKubernetes上にデプロイされます。
しかし、上記でデプロイされたコンテナイメージはなんなんでしょうか??
Airflowでイメージを指定したDAG生成
自分の調査結果としては「airflow:latest」です。
あの!airflowをデプロイに必要だったイメージがAirflow on Kubernetsで使用されるデフォルトのイメージだったのです。
airflow:latestを指定したDAGスクリプトが以下になります。
from __future__ import print_function
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from libs.helper import print_stuff
from airflow.models import DAG
import os
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='dev_etl', default_args=args,
schedule_interval=None
)
def test_volume_mount():
print()
# You can use annotations on your kubernetes pods!
start_task = PythonOperator(
task_id="start_task", python_callable=print_stuff, dag=dag,
executor_config={
"KubernetesExecutor": {
"annotations": {"test": "annotation"}
}
}
)
# You can mount volume or secret to the worker pod
second_task = BashOperator(
task_id="four_task", bash_command='python /tmp/script/airflow_on_kubernetes.py', dag=dag,
executor_config={"KubernetesExecutor": {"image": "airflow:latest"}}
)
start_task.set_downstream(second_task)
実行結果は以前のものと変化はありません。次にこのimageをdockerhubにある別のイメージに変えて実行してみると、おそらく実行がエラーになると思います。(自分も何度もエラーになりました。)
Airflowの残念なところの一つにpodがデプロイされないとログが出力されない点(Airflowの実行したDAGをクリック→TreeViewの赤い四角をクリック→View Log)です。なので、なぜ違うイメージを指定すると失敗するのか原因をつかむのに時間がかかりました。
たまたまpodのデプロイが失敗しているときにdescribeでログを出力してみると「airflowコマンドがつかない」的なエラーがでていたので、この結論にいたりました。
なので、今後イメージを指定したい場合はこのairflowイメージをベースにするか、airflowコマンドが使えるイメージをつくるかになってくるのかなと思います。
イメージの指定ができると次はホストにマウントすることが可能か試したくなってきますね。
Airflowでホストにマウント設定したDAG生成
最後にホストにマウントしたディレクトリにデータベースサーバーから取ってきた値を書き込んだファイルを保存するという簡単なETLのDAGを例としてのせます。
from __future__ import print_function
import airflow
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
from libs.helper import print_stuff
from airflow.models import DAG
import os
args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2)
}
dag = DAG(
dag_id='dev_etl', default_args=args,
schedule_interval=None
)
# You can use annotations on your kubernetes pods!
start_task = PythonOperator(
task_id="start_task", python_callable=print_stuff, dag=dag,
executor_config={
"KubernetesExecutor": {
"annotations": {"test": "annotation"}
}
}
)
# You can mount volume or secret to the worker pod
second_task = BashOperator(
task_id="run_python", bash_command='python /tmp/script/airflow_on_kubernetes.py', dag=dag,
executor_config={
"KubernetesExecutor": {
"image": "airflow:latest",
"volumes": [
{
"name": "example-kubernetes-test-volume",
"hostPath": {"path": "/home/centos/output/"},
},
],
"volume_mounts": [
{
"mountPath": "/tmp/",
"name": "example-kubernetes-test-volume",
},
]
}
}
)
start_task.set_downstream(second_task)
上記がDAGスクリプトになります。ちなみにAirflow管理画面に出てくる名前はdag_idになります。(スクリプト名ではありません)
ポイントはvolumesとvolume_mountsの設定になります。
volumesのhostPathでマウントしたいホストのパスを設定します。valume_mountsのmountPathでポッドのパスを指定します。
あらかじめ、ホスト(workerノード全て)の/home/centos/outputにairflow_on_kubernetes.pyを置いておきます。
# coding:utf-8
import sqlalchemy
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
from logip import *
from setting2 import session as se2
if __name__=='__main__':
with open('/tmp/db.txt', 'w') as f:
for record in se2.query(LogIP).all():
f.write(record.ip)
f.write('\n')
スクリプトの中では/tmp/db.txtにファイルを保存するように書きます。
これを実行すると/home/centos/output/にdb.txtができているので、ポッドがホストにマウントできていることが確認できました。(sqlalchemyの部分はよしなに変えてください)ちなみにポッドから別のサーバーにあるデータベースにアクセスすることも確認できました。
さいごに
Airflow on KubernetesでAirflowで実行したDAGがKubernetes上にデプロイされるとこまでやってみました。
個人的にPodがデプロイされないとAirflow上のログに出力されない点が残念でしたがyaml書く手間が省けるのはありがいかと思います。
マウントも簡単なので、workerノードにスクリプトをばらまいてポッド側から参照して実行するという流れが効率的でかつ、ポッド上のエラーはAirflowのViewのログから確認できるためCI/CDにも活きてくるなと感じました。
Airflow on Kubernetes 触ってみたい人の手助けになればと思います。