5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Apache Airflow on kubernetesのおためし構築(とりあえずたてる編)

Last updated at Posted at 2020-03-30

はじめに(読み飛ばしていいよ)

Executorの選択

AirflowにはExecutorがいくつかありますが、今回使うのはkubernetes Executorです。
詳細は省きますが、Airflowには様々なExecutorがあります。 Celery executorを使用してkubernetes上に展開したぜ!というのもありますが、それとは異なるので注意。(まぁ、そもそもkubernetes使っているのにCelery executorを使用する例はなかなか少ないとは思いますが。その時の状況での判断です)

kubernetes Executorの動作(ざっくりと)

  • kubernetes ExecutorではAirflowのscheduler部分とwebserver部分がPod化されます。同じPod内でschedulerコンテナ&webserverコンテナで動作させても良いし、別々のPodにしてもいい。基本schedulerやwebserverはmetadatabase DBへ通信しているだけで互いに直接通信しないので。 ※ちなみにスケジュールするDAG数が何千、タスク数が全部で万単位となるとschedulerの負荷がなかなかの物になるので注意。

  • kubernetes Executorでは、Operator毎に基本worker Podが立ち上がりタスクが処理されます。基本OperatorとPodは1:1です。(kubernetesPodOperatorなど例外はありますが。) 

  • kubernets Executorの場合、タスク実行ログがそのworker Podがあった場所にしか残されません。なのでAirflow UIでログを見ようとした時、webserver のPodがそのタスクが実行されたnodeに立っていれば読み取れるかもしれませんが、それ以外の場合、ログがねえと言われます。したがって、Airflowの機能の一つを使って、ログはPod終了時にS3に投げてもらい、基本webseverはローカルにログがなければS3をみるようにします。
     ※ S3からのログ読み出しの場合、タスク実行途中のログ出力はリアルタイムではweb UIから見れません。S3にログがアップされるのはタスクの処理が終了し、Podが落ちる時だからです。

  • Worker用のPodについて worker用のPodに使われるdocker imageは基本airflowが入ってないと動かんで! (kubernetesPodOperatorでPodから作られるPodを除く!) デフォルトではAirflowを立てた時に使ったimageが使用される。別のimageをしているすることも可能。(airflow.cfgでもOperator内でも指定できる)

構築

構築環境 (AWS を使用します。容量ざっくり。タイプはt系でも動くので安くしたければ変更)

  • インスタンス m5.large → 計3台 EBSは30GBぐらいそれぞれにつけとく。
    • kubernetes master 用 → 1台/3台
    • Airflowのschedulerやwebserver, worker等のPod用 2台/3台 

上記でkubernetesのクラスタくんどいてください。(EKS使えとかは言ってはいけない)

  • RDS m5.large → 1台 SSD 30GBくらい 
    • PostgreSQL
  • EFS (NFS) 30GBくらい
    • kubernetsのnode間でデータ共有するため

      (Airflow worker Pod等含め全てのPodが各nodeで同じDAGを読み取れないといけないので。)
  • S3 (Log吐き出し用)

Docker imageの作成 (Airflowのバージョンは1.10.9 でいくぜ)

フォルダ /AAA/BBB/CCC/内に下記のファイルを全部おこう
node全部にこのimageは必要だからね! 各nodeでそれぞれ同じように作ってもいいし、local registry使ってもいいね。

Dockerfile
FROM python:3.6.10-slim

#いらんもんも結構入っているような気がするが、嫌なら消すよろし。
RUN apt-get update --allow-releaseinfo-change  -y && apt-get install -y \
        --no-install-recommends curl \
        --no-install-recommends gnupg2 \
        wget \
        libczmq-dev \
        curl \
        libssl-dev \
        git \
        libpq-dev \
        inetutils-telnet \
        bind9utils \
        zip \
        unzip \
        gcc \
        vim \
        mariadb-client  \
        default-libmysqlclient-dev  \
    && apt-get clean


RUN pip install --upgrade pip

RUN pip install -U setuptools && \
    pip install kubernetes && \
    pip install cryptography && \
    pip install psycopg2

RUN pip install apache-airflow==1.10.9 && \
    pip install apache-airflow[ssh] && \
    pip install apache-airflow[postgres] && \
    pip install apache-airflow[password] && \
    pip install apache-airflow[crypto] && \
    pip install apache-airflow[s3] && \ 
    pip install boto3==1.11.13 && \ 
    pip install awscli --upgrade --user 

# ここ無くしていい。ただPod内からawsコマンド打ちたかったんや。
RUN echo export PATH='$PATH:$HOME/bin:$HOME/.local/bin' >> ~/.bashrc

COPY airflow-test-env-init.sh /tmp/airflow-init.sh

COPY bootstrap.sh /bootstrap.sh
RUN chmod +x /bootstrap.sh

ENTRYPOINT ["/bootstrap.sh"]
airflow-init.sh
#!/usr/bin/env bash
set -x

# あらかじめ入っているexampleのdagは消す。(kube上だと色々めんどいねん)
cd /usr/local/lib/python3.6/site-packages/airflow && \
rm -rf example_dags && \
cd /usr/local/lib/python3.6/site-packages/airflow/contrib && \
rm -rf example_dags && \

cd /usr/local/lib/python3.6/site-packages/airflow && \
airflow initdb && \
alembic upgrade heads && \
(airflow create_user --username airflow --lastname airflow --firstname jon --email airflow@apache.org --role Admin --password airflow || true)  
bootstrap.sh
#!/usr/bin/env bash
if [[ "$1" = "webserver" ]]
then
    exec airflow webserver
fi

if [[ "$1" = "scheduler" ]]
then
    exec airflow scheduler
fi
Airflow入りのdocker image作成
docker build -t airflow:1.10.9 /AAA/BBB/CCC/

metadatabase DB接続情報準備!

まず下準備

PostgreSQLの場合:


postgresql+psycopg2://ユーザ名:パスワード@DBのエンドポイント:ポート:番号/DB名
を一例として下記の様にハッシュ化する。

>>import base64
>>dbinfo =  postgresql+psycopg2://ユーザ名:パスワード@DBのエンドポイント:ポート:番号/DB名
>>base64.b64encode(dbinfo)
XXXXXX ハッシュ化列 XXXXXXXX

このハッシュ化列は残しとこう!
※ ちゃんとDB作っといてね?

kubeデプロイ用yaml作成

準備するファイルは5つ!

  1. secret.yaml
  2. volume.yaml
  3. scheduler.yaml
  4. webserver.yaml
  5. configmaps.yaml
secret.yaml
apiVersion: v1
kind: Secret
metadata:
  namespace: airflow01
  name: airflow-secrets
type: Opaque
data:
  sql_alchemy_conn: !!!!!ここにBD接続情報のハッシュ化列!!!!!
volume.yaml
---
kind: PersistentVolume
apiVersion: v1
metadata:
  name: airflow-dags
spec:
  accessModes:
    - ReadOnlyMany
  capacity:
    storage: 30Gi
  nfs:
    server: XXXXXXXX
    path: /efs/Vol_airflow/airflow01
---
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: airflow-dags
spec:
  accessModes:
    - ReadOnlyMany
  resources:
    requests:
      storage: 30Gi
scheduler.yaml
---
kind: Namespace
apiVersion: v1
metadata:
  name: airflow01
  labels:
    name: airflow01
---
apiVersion: rbac.authorization.k8s.io/v1beta1
kind: ClusterRoleBinding
metadata:
  name: admin-rbac01
subjects:
  - kind: ServiceAccount
    name: default
    namespace: airflow01
roleRef:
  kind: ClusterRole
  name: cluster-admin
  apiGroup: rbac.authorization.k8s.io
---
apiVersion: apps/v1
kind: Deployment
metadata:
  namespace: airflow01
  name: airflow
spec:
  replicas: 1
  selector:
    matchLabels:
      name: airflow
      app: airflow
  template:
    metadata:
      labels:
        name: airflow
        app: airflow
    spec:
     '''これは今は無視で
      affinity:
        nodeAffinity: 
          preferredDuringSchedulingIgnoredDuringExecution: 
          - weight: 1 
            preference:
              matchExpressions:
              - key: node 
                operator: In 
                values:
                - for-scheduler
      '''
      initContainers:
      - name: "init"
        image: airflow:1.10.9
        imagePullPolicy: IfNotPresent
        volumeMounts:
        - name: airflow-configmap
          mountPath: /root/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-dags
          mountPath: /root/airflow/dags
        env:
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        command:
          - "bash"
        args:
          - "-cx"
          - "./tmp/airflow-init.sh"
      containers:
      - name: scheduler
        image: airflow:1.10.9
        imagePullPolicy: IfNotPresent
        args: ["scheduler"]
        env:
        - name: AIRFLOW__KUBERNETES__NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        volumeMounts:
        - name: airflow-configmap
          mountPath: /root/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-dags
          mountPath: /root/airflow/dags
      volumes:
      - name: airflow-dags
        persistentVolumeClaim:
          claimName: airflow-dags
      - name: airflow-dags-fake
        emptyDir: {}
      - name: airflow-dags-git
        emptyDir: {}
      - name: airflow-configmap
        configMap:
          name: airflow-configmap
webserver.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: airflowweb01
  namespace: airflow01
spec:
  replicas: 1
  selector:
    matchLabels:
      name: airflowweb01
  template:
    metadata:
      labels:
        name: airflowweb01
    spec:
     ''' ここは今は無視で
      affinity:
        podAntiAffinity:
          requiredDuringSchedulingIgnoredDuringExecution:
          - labelSelector:
              matchExpressions:
              - key: app
                operator: In
                values:
                - airflow
            topologyKey: kubernetes.io/hostname
            namespaces:
            - airflow01
            - airflow02
      '''
      containers:
      - name: webserver
        image: airflow:1.10.9
        imagePullPolicy: IfNotPresent
        resources:
          requests: 
            cpu: 200m
            memory: 10Mi
        ports:
        - name: webserver
          containerPort: 8080
        args: ["webserver"]
        env:
        - name: AIRFLOW__KUBERNETES__NAMESPACE
          valueFrom:
            fieldRef:
              fieldPath: metadata.namespace
        - name: SQL_ALCHEMY_CONN
          valueFrom:
            secretKeyRef:
              name: airflow-secrets
              key: sql_alchemy_conn
        volumeMounts:
        - name: airflow-configmap
          mountPath: /root/airflow/airflow.cfg
          subPath: airflow.cfg
        - name: airflow-dags
          mountPath: /root/airflow/dags
      volumes:
      - name: airflow-dags
        persistentVolumeClaim:
          claimName: airflow-dags
      - name: airflow-dags-fake
        emptyDir: {}
      - name: airflow-dags-git
        emptyDir: {}
      - name: airflow-configmap
        configMap:
          name: airflow-configmap
---
apiVersion: v1
kind: Service
metadata:
  name: airflowweb01
spec:
  type: NodePort
  ports:
    - port: 8080
      nodePort: 30809
  selector:
    name: airflowweb01
configmaps.yaml

comfogmapsはairflow.cfg(airflowの設定ファイル)が記述されているので長いです。ですので今回重要な部分だけ取り出して一部省略します。(公式gitから取ってきてね!)

kind: Namespace
apiVersion: v1
metadata:
  name: airflow01
  labels:
    name: airflow01
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: airflow-configmap
  namespace: airflow01
data:
  airflow.cfg: |
    [core]
    dags_folder = /root/airflow/dags
    #base_log_folder = /root/airflow/logs

    #logの保存先にs3を登録するぞ!remote_log_conn_id はconnection登録時のidだぞ!airflow connectionで調べよう!(登録せんでも動くけどな....なので適当でいい。)
    remote_logging = True
    remote_base_log_folder = s3://XXXXXXXXX/airflow-logs-01
    remote_log_conn_id = idap01
    encrypt_s3_logs = False

    logging_level = INFO
    fab_logging_level = INFO

......
... !省略!
..

    dag_dir_list_interval = 60
    job_heartbeat_sec = 5

#m5.largeなので2でOK? 占有しちゃうけどね。
    max_threads = 2
    run_duration = -1
    num_runs = -1
    processor_poll_interval = 1
    scheduler_heartbeat_sec = 120
    min_file_process_interval = 10

    statsd_on = False
    statsd_host = localhost
    statsd_port = 8125
    statsd_prefix = airflow

    min_file_parsing_loop_time = 5
    print_stats_interval = 30
    scheduler_health_check_threshold = 200
    scheduler_zombie_task_threshold = 600
    max_tis_per_query = 512
    authenticate = False
    use_job_schedule = True

......
... !省略!
..


    airflow_configmap = airflow-configmap
    worker_container_repository = airflow
    worker_container_tag = 1.10.9
    worker_container_image_pull_policy = IfNotPresent
    delete_worker_pods = True
    dags_in_image = False
    dags_volume_claim = airflow-dags
    dags_volume_subpath = /efs/Vol_airflow/airflow01
    logs_volume_claim = 
    logs_volume_subpath =
    dags_volume_host = 
    logs_volume_host = 
    in_cluster = True
    namespace = airflow01
    gcp_service_account_keys =
    worker_pods_creation_batch_size=100

......
... !省略!
..

立てるぞ! kubectl applyコマンドだ!

kubectl apply -f secrets.yaml
kubectl apply -f volumes.yaml
kubectl apply -f airflow_scheduler.yaml
kubectl apply -f airflow_webserver.yaml

下記のようにPodが立っていれば成功だ!

>> kubectl get po -n airflow01
NAME                            READY   STATUS    RESTARTS   AGE
airflow-7fccbd497-nf9s4         1/1     Running   0          23d
airflowweb01-5f58867588-gds9v   1/1     Running   0          20d

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?