はじめに(読み飛ばしていいよ)
Executorの選択
AirflowにはExecutorがいくつかありますが、今回使うのはkubernetes Executorです。
詳細は省きますが、Airflowには様々なExecutorがあります。 Celery executorを使用してkubernetes上に展開したぜ!というのもありますが、それとは異なるので注意。(まぁ、そもそもkubernetes使っているのにCelery executorを使用する例はなかなか少ないとは思いますが。その時の状況での判断です)
kubernetes Executorの動作(ざっくりと)
-
kubernetes ExecutorではAirflowのscheduler部分とwebserver部分がPod化されます。同じPod内でschedulerコンテナ&webserverコンテナで動作させても良いし、別々のPodにしてもいい。基本schedulerやwebserverはmetadatabase DBへ通信しているだけで互いに直接通信しないので。 ※ちなみにスケジュールするDAG数が何千、タスク数が全部で万単位となるとschedulerの負荷がなかなかの物になるので注意。
-
kubernetes Executorでは、Operator毎に基本worker Podが立ち上がりタスクが処理されます。基本OperatorとPodは1:1です。(kubernetesPodOperatorなど例外はありますが。)
-
kubernets Executorの場合、タスク実行ログがそのworker Podがあった場所にしか残されません。なのでAirflow UIでログを見ようとした時、webserver のPodがそのタスクが実行されたnodeに立っていれば読み取れるかもしれませんが、それ以外の場合、ログがねえと言われます。したがって、Airflowの機能の一つを使って、ログはPod終了時にS3に投げてもらい、基本webseverはローカルにログがなければS3をみるようにします。
※ S3からのログ読み出しの場合、タスク実行途中のログ出力はリアルタイムではweb UIから見れません。S3にログがアップされるのはタスクの処理が終了し、Podが落ちる時だからです。 -
Worker用のPodについて worker用のPodに使われるdocker imageは基本airflowが入ってないと動かんで! (kubernetesPodOperatorでPodから作られるPodを除く!) デフォルトではAirflowを立てた時に使ったimageが使用される。別のimageをしているすることも可能。(airflow.cfgでもOperator内でも指定できる)
構築
構築環境 (AWS を使用します。容量ざっくり。タイプはt系でも動くので安くしたければ変更)
- インスタンス m5.large → 計3台 EBSは30GBぐらいそれぞれにつけとく。
- kubernetes master 用 → 1台/3台
- Airflowのschedulerやwebserver, worker等のPod用 2台/3台
上記でkubernetesのクラスタくんどいてください。(EKS使えとかは言ってはいけない)
- RDS m5.large → 1台 SSD 30GBくらい
- PostgreSQL
- EFS (NFS) 30GBくらい
- kubernetsのnode間でデータ共有するため
(Airflow worker Pod等含め全てのPodが各nodeで同じDAGを読み取れないといけないので。)
- kubernetsのnode間でデータ共有するため
- S3 (Log吐き出し用)
Docker imageの作成 (Airflowのバージョンは1.10.9 でいくぜ)
フォルダ /AAA/BBB/CCC/内に下記のファイルを全部おこう
node全部にこのimageは必要だからね! 各nodeでそれぞれ同じように作ってもいいし、local registry使ってもいいね。
Dockerfile
FROM python:3.6.10-slim
#いらんもんも結構入っているような気がするが、嫌なら消すよろし。
RUN apt-get update --allow-releaseinfo-change -y && apt-get install -y \
--no-install-recommends curl \
--no-install-recommends gnupg2 \
wget \
libczmq-dev \
curl \
libssl-dev \
git \
libpq-dev \
inetutils-telnet \
bind9utils \
zip \
unzip \
gcc \
vim \
mariadb-client \
default-libmysqlclient-dev \
&& apt-get clean
RUN pip install --upgrade pip
RUN pip install -U setuptools && \
pip install kubernetes && \
pip install cryptography && \
pip install psycopg2
RUN pip install apache-airflow==1.10.9 && \
pip install apache-airflow[ssh] && \
pip install apache-airflow[postgres] && \
pip install apache-airflow[password] && \
pip install apache-airflow[crypto] && \
pip install apache-airflow[s3] && \
pip install boto3==1.11.13 && \
pip install awscli --upgrade --user
# ここ無くしていい。ただPod内からawsコマンド打ちたかったんや。
RUN echo export PATH='$PATH:$HOME/bin:$HOME/.local/bin' >> ~/.bashrc
COPY airflow-test-env-init.sh /tmp/airflow-init.sh
COPY bootstrap.sh /bootstrap.sh
RUN chmod +x /bootstrap.sh
ENTRYPOINT ["/bootstrap.sh"]
airflow-init.sh
#!/usr/bin/env bash
set -x
# あらかじめ入っているexampleのdagは消す。(kube上だと色々めんどいねん)
cd /usr/local/lib/python3.6/site-packages/airflow && \
rm -rf example_dags && \
cd /usr/local/lib/python3.6/site-packages/airflow/contrib && \
rm -rf example_dags && \
cd /usr/local/lib/python3.6/site-packages/airflow && \
airflow initdb && \
alembic upgrade heads && \
(airflow create_user --username airflow --lastname airflow --firstname jon --email airflow@apache.org --role Admin --password airflow || true)
bootstrap.sh
#!/usr/bin/env bash
if [[ "$1" = "webserver" ]]
then
exec airflow webserver
fi
if [[ "$1" = "scheduler" ]]
then
exec airflow scheduler
fi
Airflow入りのdocker image作成
docker build -t airflow:1.10.9 /AAA/BBB/CCC/
metadatabase DB接続情報準備!
まず下準備
PostgreSQLの場合:
postgresql+psycopg2://ユーザ名:パスワード@DBのエンドポイント:ポート:番号/DB名
を一例として下記の様にハッシュ化する。
>>import base64
>>dbinfo = postgresql+psycopg2://ユーザ名:パスワード@DBのエンドポイント:ポート:番号/DB名
>>base64.b64encode(dbinfo)
XXXXXX ハッシュ化列 XXXXXXXX
このハッシュ化列は残しとこう!
※ ちゃんとDB作っといてね?
kubeデプロイ用yaml作成
準備するファイルは5つ!
- secret.yaml
- volume.yaml
- scheduler.yaml
- webserver.yaml
- configmaps.yaml
secret.yaml
apiVersion: v1
kind: Secret
metadata:
namespace: airflow01
name: airflow-secrets
type: Opaque
data:
sql_alchemy_conn: !!!!!ここにBD接続情報のハッシュ化列!!!!!
volume.yaml
---
kind: PersistentVolume
apiVersion: v1
metadata:
name: airflow-dags
spec:
accessModes:
- ReadOnlyMany
capacity:
storage: 30Gi
nfs:
server: XXXXXXXX
path: /efs/Vol_airflow/airflow01
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
name: airflow-dags
spec:
accessModes:
- ReadOnlyMany
resources:
requests:
storage: 30Gi
scheduler.yaml
---
kind: Namespace
apiVersion: v1
metadata:
name: airflow01
labels:
name: airflow01
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
name: admin-rbac01
subjects:
- kind: ServiceAccount
name: default
namespace: airflow01
roleRef:
kind: ClusterRole
name: cluster-admin
apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
namespace: airflow01
name: airflow
spec:
replicas: 1
selector:
matchLabels:
name: airflow
app: airflow
template:
metadata:
labels:
name: airflow
app: airflow
spec:
'''これは今は無視で
affinity:
nodeAffinity:
preferredDuringSchedulingIgnoredDuringExecution:
- weight: 1
preference:
matchExpressions:
- key: node
operator: In
values:
- for-scheduler
'''
initContainers:
- name: "init"
image: airflow:1.10.9
imagePullPolicy: IfNotPresent
volumeMounts:
- name: airflow-configmap
mountPath: /root/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
env:
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
command:
- "bash"
args:
- "-cx"
- "./tmp/airflow-init.sh"
containers:
- name: scheduler
image: airflow:1.10.9
imagePullPolicy: IfNotPresent
args: ["scheduler"]
env:
- name: AIRFLOW__KUBERNETES__NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
volumeMounts:
- name: airflow-configmap
mountPath: /root/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags
- name: airflow-dags-fake
emptyDir: {}
- name: airflow-dags-git
emptyDir: {}
- name: airflow-configmap
configMap:
name: airflow-configmap
webserver.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: airflowweb01
namespace: airflow01
spec:
replicas: 1
selector:
matchLabels:
name: airflowweb01
template:
metadata:
labels:
name: airflowweb01
spec:
''' ここは今は無視で
affinity:
podAntiAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: app
operator: In
values:
- airflow
topologyKey: kubernetes.io/hostname
namespaces:
- airflow01
- airflow02
'''
containers:
- name: webserver
image: airflow:1.10.9
imagePullPolicy: IfNotPresent
resources:
requests:
cpu: 200m
memory: 10Mi
ports:
- name: webserver
containerPort: 8080
args: ["webserver"]
env:
- name: AIRFLOW__KUBERNETES__NAMESPACE
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: SQL_ALCHEMY_CONN
valueFrom:
secretKeyRef:
name: airflow-secrets
key: sql_alchemy_conn
volumeMounts:
- name: airflow-configmap
mountPath: /root/airflow/airflow.cfg
subPath: airflow.cfg
- name: airflow-dags
mountPath: /root/airflow/dags
volumes:
- name: airflow-dags
persistentVolumeClaim:
claimName: airflow-dags
- name: airflow-dags-fake
emptyDir: {}
- name: airflow-dags-git
emptyDir: {}
- name: airflow-configmap
configMap:
name: airflow-configmap
---
apiVersion: v1
kind: Service
metadata:
name: airflowweb01
spec:
type: NodePort
ports:
- port: 8080
nodePort: 30809
selector:
name: airflowweb01
configmaps.yaml
comfogmapsはairflow.cfg(airflowの設定ファイル)が記述されているので長いです。ですので今回重要な部分だけ取り出して一部省略します。(公式gitから取ってきてね!)
kind: Namespace
apiVersion: v1
metadata:
name: airflow01
labels:
name: airflow01
---
apiVersion: v1
kind: ConfigMap
metadata:
name: airflow-configmap
namespace: airflow01
data:
airflow.cfg: |
[core]
dags_folder = /root/airflow/dags
#base_log_folder = /root/airflow/logs
#logの保存先にs3を登録するぞ!remote_log_conn_id はconnection登録時のidだぞ!airflow connectionで調べよう!(登録せんでも動くけどな....なので適当でいい。)
remote_logging = True
remote_base_log_folder = s3://XXXXXXXXX/airflow-logs-01
remote_log_conn_id = idap01
encrypt_s3_logs = False
logging_level = INFO
fab_logging_level = INFO
......
... !省略!
..
dag_dir_list_interval = 60
job_heartbeat_sec = 5
#m5.largeなので2でOK? 占有しちゃうけどね。
max_threads = 2
run_duration = -1
num_runs = -1
processor_poll_interval = 1
scheduler_heartbeat_sec = 120
min_file_process_interval = 10
statsd_on = False
statsd_host = localhost
statsd_port = 8125
statsd_prefix = airflow
min_file_parsing_loop_time = 5
print_stats_interval = 30
scheduler_health_check_threshold = 200
scheduler_zombie_task_threshold = 600
max_tis_per_query = 512
authenticate = False
use_job_schedule = True
......
... !省略!
..
airflow_configmap = airflow-configmap
worker_container_repository = airflow
worker_container_tag = 1.10.9
worker_container_image_pull_policy = IfNotPresent
delete_worker_pods = True
dags_in_image = False
dags_volume_claim = airflow-dags
dags_volume_subpath = /efs/Vol_airflow/airflow01
logs_volume_claim =
logs_volume_subpath =
dags_volume_host =
logs_volume_host =
in_cluster = True
namespace = airflow01
gcp_service_account_keys =
worker_pods_creation_batch_size=100
......
... !省略!
..
立てるぞ! kubectl applyコマンドだ!
kubectl apply -f secrets.yaml
kubectl apply -f volumes.yaml
kubectl apply -f airflow_scheduler.yaml
kubectl apply -f airflow_webserver.yaml
下記のようにPodが立っていれば成功だ!
>> kubectl get po -n airflow01
NAME READY STATUS RESTARTS AGE
airflow-7fccbd497-nf9s4 1/1 Running 0 23d
airflowweb01-5f58867588-gds9v 1/1 Running 0 20d