8
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Kubernetes上でDigdagとEmbulkを動かすワークフロー基盤

Last updated at Posted at 2022-02-28

この記事について

Kubernetes上でDigdagを動かし、そのワークフローからEmbulkのバッチを叩くようなワークフロー基盤について書きます。
DigdagのworkflowでBig QueryからElasticsearchへインデックスするEmbulkの例を示します。

全体のアーキテクチャ

Kubernetes digdag embulk.png

こちらの記事のように Digdag は API リクエストを処理する master と、
実際にワークフローを実行する worker に分け、それぞれ Pod で起動します。
digdag workerではembulkを実行するKubernetesのJobを動的に起動してJob上で実際の処理をします。
また、digdag workerはキューに入ったタスク数に応じてオートスケールさせます。

課題

この構成を実現するためには2つの課題があります。

  1. digdag workerからJobの起動
  2. digdag workerのHPA

1.digdag workerからJobの起動

digdag workerでタスクを実行する際はコンテナ上で実行したいです。
DigdagにはDockerのCommand Executorが実装されており、簡単にDocker上でプログラムを実行できるからです。

しかし、digdag worker自体がPodのコンテナ上で動いているため、Docker in Dockerのような状態になってしまいます。
よって、digdag workerで実行しているワークフローから動的にJobを立ち上げてその上でプログラムを実行できるようにします。

kube-job を使うとコマンドラインからKubernetesのJobを立ち上げることができます。
ただし、kube-job v0.5.0以下ではKubernetes内部からkube-jobコマンドを容易に叩けなかったためパッチを当てさせていただきました。

digdag workerの設定は以下になります。

digdag workerのimage

multi-stage buildsでkube-jobのバイナリを配置します。
バージョンは古いため参考にされる場合は適宜アップデートお願いします。

FROM golang:1.15-buster as builder

ENV CGO_ENABLED=0

RUN apt update && apt install -y --no-install-recommends wget unzip make

WORKDIR /app
RUN wget -q https://github.com/h3poteto/kube-job/archive/refs/tags/v0.5.1.zip \
    && unzip -q v0.5.1.zip
RUN make -C /app/kube-job-0.5.1 build

FROM azul/zulu-openjdk:8

RUN apt-get update -qq && apt-get install -y curl

RUN curl -o /usr/local/bin/digdag --create-dirs -L 'https://dl.digdag.io/digdag-0.10.0' \
    && chmod +x /usr/local/bin/digdag

COPY --from=builder /app/kube-job-0.5.1/bin/kube-job /usr/local/bin/

digdag workerのDeployment

embulkのvolumeに関しては後述します。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: digdag-worker
  labels:
    app: digdag-worker
spec:
  selector:
    matchLabels:
      app: digdag-worker
  strategy:
    type: Recreate
  template:
    metadata:
      labels:
        app: digdag-worker
    spec:
      volumes:
      - name: digdag-config-volume
        configMap:
          name: digdag-config
      - name: embulk-config-volume
        configMap:
          name: embulk-config
      containers:
      - name: digdag-worker
        image: 'digdag_on_k8s_worker:latest'
        imagePullPolicy: IfNotPresent
        volumeMounts:
        - name: digdag-config-volume
          mountPath: /etc/config
        - name: embulk-config-volume
          mountPath: /embulk
        command: ['/bin/bash']
        args:
        - '-cx'
        - |
          digdag server \
          --config /etc/config/digdag.properties \
          -X server.bind=0.0.0.0 \
          -X server.port=8080 \
          -X database.type=postgresql \
          -X database.host=db \
          -X database.port=5432 \
          -X database.user=$POSTGRES_USER \
          -X database.database=digdag \
          -X database.password=$POSTGRES_PASSWORD \
          -X digdag.secret-encryption-key=$SECRET_ENCRYPTION_KEY
        env:
        - name: POSTGRES_USER
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: db_user
        - name: POSTGRES_PASSWORD
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: db_password
        - name: SECRET_ENCRYPTION_KEY
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: secret_encryption_key

embulkを実行するdigdagのworkflow

上記のPod上で動かすworkflowです。kube-jobを実行しているのみです。

timezone: UTC

+embulk:
  sh>: kube-job run --template-file=/embulk/embulk.yml --container='embulk'

kube-jobで使うJobのマニフェスト

kube-jobの --template-file で指定するJobのマニフェストを生成するConfigMapです。
digdag workerのDeploymentでこのConfigMapをマウントしています。

apiVersion: v1
kind: ConfigMap
metadata:
  name: embulk-config
  namespace: default
data:
  embulk.yml: |+
    apiVersion: batch/v1
    kind: Job
    metadata:
      name: embulk
      namespace: default
      labels:
        app: embulk
    spec:
      template:
        metadata:
          labels:
            app: embulk
        spec:
          containers:
          - name: embulk
            image: 'digdag_on_k8s_embulk:latest'
            imagePullPolicy: Never
            args: ['embulk', '--version']
          restartPolicy: Never
      backoffLimit: 2

このマニフェストでは簡略化するために embulk --version のみを実行しています。
以上でdigdag workerからJobの起動ができるようになりました。

kube-jobの良いところは以下のように --args で外からJobの args を上書きできるため、
マニフェストを変更することなく workflow から自由にコマンドを打てることです。

timezone: UTC

+embulk:
  sh>: kube-job run --template-file=/embulk/embulk.yml --container='embulk' --args='embulk --version'

2.digdag workerのHPA

digdag workerのPodはdigdagのタスクキューのたまり具合によってオートスケールさせることが理想です。
よって、KubernetesのCustom metricsでキューの情報を得てオートスケールさせます。

digdagのタスクキューをCustom metricsとして扱うには以下の2つを設定します。

  • PostgreSQLで管理されるdigdagでキューイングされたタスク数をpostgres_exporterでPrometheusのメトリクスとしてexportする
  • Prometheusのメトリクスとして取り出したものをprometheus-adapterでCustom metricsとして扱う

postgres_exporter

まず、HelmでPrometheusを入れつつpostgres_exporterを立ち上げます。
動作に必要なもののみインストールしています。

helmfile.yaml
repositories:
- name: prometheus-community
  url: https://prometheus-community.github.io/helm-charts
releases:
  - name: prometheus-operator
    chart: prometheus-community/kube-prometheus-stack
    values:
      - helm-values/prometheus-operator.yml
helm-values/prometheus-operator.yml
alertmanager:
  enabled: false
grafana:
  enabled: false
kubeApiServer:
  enabled: false
kubelet:
  enabled: false
kubeControllerManager:
  enabled: false
coreDns:
  enabled: false
kubeDns:
  enabled: false
kubeEtcd:
  enabled: false
kubeScheduler:
  enabled: false
kubeProxy:
  enabled: false
kubeStateMetrics:
  enabled: false
nodeExporter:
  enabled: false
prometheus-node-exporter:
  hostRootFsMount: false
prometheus:
  prometheusSpec:
    serviceMonitorSelectorNilUsesHelmValues: false
    serviceMonitorSelector: {}

次にpostgres_exporterのServiceを立ち上げます。
今回はdefaultやsettingsのメトリクスは不要なので起動オプションでdisableにします。

apiVersion: apps/v1
kind: Deployment
metadata:
  name: postgres-exporter
spec:
  selector:
    matchLabels:
      app: postgres-exporter
  replicas: 1
  template:
    metadata:
      labels:
        app: postgres-exporter
      name: postgres-exporter
    spec:
      volumes:
      - name: config
        configMap:
          name: postgres-exporter-config-map
      containers:
      - name: postgres-exporter
        image: quay.io/prometheuscommunity/postgres-exporter
        env:
        - name: DATA_SOURCE_URI
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: data_source_uri
        - name: DATA_SOURCE_USER
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: db_user
        - name: DATA_SOURCE_PASS
          valueFrom:
            secretKeyRef:
              name: digdag-secret
              key: db_password
        args:
        - '--disable-default-metrics'
        - '--disable-settings-metrics'
        - '--extend.query-path=/config/queries.yaml'
        ports:
        - containerPort: 9187
          name: exporter
        volumeMounts:
        - name: config
          readOnly: true
          mountPath: /config
apiVersion: v1
kind: Service
metadata:
  name: postgres-exporter
  labels:
    app: postgres-exporter
spec:
  selector:
    app: postgres-exporter
  ports:
  - port: 9187
    name: exporter

このあたりの内容から queued_task_locks のレコード数をキューイングされたタスク数として扱います。
usage はPrometheusのMetrics typesを確認してください。

apiVersion: v1
kind: ConfigMap
metadata:
  name: postgres-exporter-config-map
data:
  queries.yaml: |+
    pg_exporter:
      query: "SELECT count(*) AS queued_tasks FROM queued_task_locks"
      metrics:
        - queued_tasks:
            usage: "GAUGE"
            description: "Number of tasks in queue"

prometheus-adapter

同様にHelmでprometheus-adapterも導入します。

helmfile.yaml
repositories:
- name: prometheus-community
  url: https://prometheus-community.github.io/helm-charts
releases:
  - name: prometheus-operator
    chart: prometheus-community/kube-prometheus-stack
    values:
      - helm-values/prometheus-operator.yml
  - name: prometheus-adapter
    chart: prometheus-community/prometheus-adapter
    values:
      - helm-values/prometheus-adapter.yml

prometheus-adapterの設定についてはこちらのドキュメントを参照してください。 

helm-values/prometheus-adapter.yml
rules:
  default: false
  custom:
  - seriesQuery: 'pg_exporter_queued_tasks{namespace!="",service!=""}'
    resources:
      overrides:
        namespace:
          resource: namespace
        service:
          resource: service
    name:
      as: queued_tasks
    metricsQuery: '<<.Series>>{<<.LabelMatchers>>}'
prometheus:
  url: http://prometheus-operator-kube-p-prometheus.default
logLevel: 2

設定し終わると以下のコマンドでメトリクスが確認できます。

kubectl get --raw /apis/custom.metrics.k8s.io/v1beta1/namespaces/default/services/postgres-exporter/queued_tasks | jq .

これでキューイングされたタスク数をCustom metricsとして扱えるため、あとはHPAを設定するだけです。

HPA

apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
  name: digdag-worker-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: digdag-worker
  minReplicas: 1
  maxReplicas: 2
  metrics:
  - type: Object
    object:
      describedObject:
        apiVersion: v1
        kind: Service
        name: postgres-exporter
      metric:
        name: queued_tasks
      target:
        type: Value
        value: 1

ここで重要なのはtargetのvalueが1であることです。
これはキューイングされたタスク数が0の場合のみスケールインさせるためにそうしています。
digdagはkillが走ったときに実行中のタスクがあった場合、
そのタスクが終わるまでは待ちますが、workflow全体ではそのタスク以降のタスクはキャンセルされます。

よって、workflow実行中にスケールインされて中断されないようにしています。
workflowの最大時間がある程度短く決まっているなら、 terminationGracePeriodSeconds で停止前に待つことでtargetのvalueを1より大きくすることもできるかと思います。

8
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?