この記事について
Kubernetes上でDigdagを動かし、そのワークフローからEmbulkのバッチを叩くようなワークフロー基盤について書きます。
DigdagのworkflowでBig QueryからElasticsearchへインデックスするEmbulkの例を示します。
全体のアーキテクチャ
こちらの記事のように Digdag は API リクエストを処理する master と、
実際にワークフローを実行する worker に分け、それぞれ Pod で起動します。
digdag workerではembulkを実行するKubernetesのJobを動的に起動してJob上で実際の処理をします。
また、digdag workerはキューに入ったタスク数に応じてオートスケールさせます。
課題
この構成を実現するためには2つの課題があります。
- digdag workerからJobの起動
- digdag workerのHPA
1.digdag workerからJobの起動
digdag workerでタスクを実行する際はコンテナ上で実行したいです。
DigdagにはDockerのCommand Executorが実装されており、簡単にDocker上でプログラムを実行できるからです。
しかし、digdag worker自体がPodのコンテナ上で動いているため、Docker in Dockerのような状態になってしまいます。
よって、digdag workerで実行しているワークフローから動的にJobを立ち上げてその上でプログラムを実行できるようにします。
kube-job を使うとコマンドラインからKubernetesのJobを立ち上げることができます。
ただし、kube-job v0.5.0以下ではKubernetes内部からkube-jobコマンドを容易に叩けなかったためパッチを当てさせていただきました。
digdag workerの設定は以下になります。
digdag workerのimage
multi-stage buildsでkube-jobのバイナリを配置します。
バージョンは古いため参考にされる場合は適宜アップデートお願いします。
FROM golang:1.15-buster as builder
ENV CGO_ENABLED=0
RUN apt update && apt install -y --no-install-recommends wget unzip make
WORKDIR /app
RUN wget -q https://github.com/h3poteto/kube-job/archive/refs/tags/v0.5.1.zip \
&& unzip -q v0.5.1.zip
RUN make -C /app/kube-job-0.5.1 build
FROM azul/zulu-openjdk:8
RUN apt-get update -qq && apt-get install -y curl
RUN curl -o /usr/local/bin/digdag --create-dirs -L 'https://dl.digdag.io/digdag-0.10.0' \
&& chmod +x /usr/local/bin/digdag
COPY --from=builder /app/kube-job-0.5.1/bin/kube-job /usr/local/bin/
digdag workerのDeployment
embulkのvolumeに関しては後述します。
apiVersion: apps/v1
kind: Deployment
metadata:
name: digdag-worker
labels:
app: digdag-worker
spec:
selector:
matchLabels:
app: digdag-worker
strategy:
type: Recreate
template:
metadata:
labels:
app: digdag-worker
spec:
volumes:
- name: digdag-config-volume
configMap:
name: digdag-config
- name: embulk-config-volume
configMap:
name: embulk-config
containers:
- name: digdag-worker
image: 'digdag_on_k8s_worker:latest'
imagePullPolicy: IfNotPresent
volumeMounts:
- name: digdag-config-volume
mountPath: /etc/config
- name: embulk-config-volume
mountPath: /embulk
command: ['/bin/bash']
args:
- '-cx'
- |
digdag server \
--config /etc/config/digdag.properties \
-X server.bind=0.0.0.0 \
-X server.port=8080 \
-X database.type=postgresql \
-X database.host=db \
-X database.port=5432 \
-X database.user=$POSTGRES_USER \
-X database.database=digdag \
-X database.password=$POSTGRES_PASSWORD \
-X digdag.secret-encryption-key=$SECRET_ENCRYPTION_KEY
env:
- name: POSTGRES_USER
valueFrom:
secretKeyRef:
name: digdag-secret
key: db_user
- name: POSTGRES_PASSWORD
valueFrom:
secretKeyRef:
name: digdag-secret
key: db_password
- name: SECRET_ENCRYPTION_KEY
valueFrom:
secretKeyRef:
name: digdag-secret
key: secret_encryption_key
embulkを実行するdigdagのworkflow
上記のPod上で動かすworkflowです。kube-jobを実行しているのみです。
timezone: UTC
+embulk:
sh>: kube-job run --template-file=/embulk/embulk.yml --container='embulk'
kube-jobで使うJobのマニフェスト
kube-jobの --template-file
で指定するJobのマニフェストを生成するConfigMapです。
digdag workerのDeploymentでこのConfigMapをマウントしています。
apiVersion: v1
kind: ConfigMap
metadata:
name: embulk-config
namespace: default
data:
embulk.yml: |+
apiVersion: batch/v1
kind: Job
metadata:
name: embulk
namespace: default
labels:
app: embulk
spec:
template:
metadata:
labels:
app: embulk
spec:
containers:
- name: embulk
image: 'digdag_on_k8s_embulk:latest'
imagePullPolicy: Never
args: ['embulk', '--version']
restartPolicy: Never
backoffLimit: 2
このマニフェストでは簡略化するために embulk --version
のみを実行しています。
以上でdigdag workerからJobの起動ができるようになりました。
kube-jobの良いところは以下のように --args
で外からJobの args
を上書きできるため、
マニフェストを変更することなく workflow から自由にコマンドを打てることです。
timezone: UTC
+embulk:
sh>: kube-job run --template-file=/embulk/embulk.yml --container='embulk' --args='embulk --version'
2.digdag workerのHPA
digdag workerのPodはdigdagのタスクキューのたまり具合によってオートスケールさせることが理想です。
よって、KubernetesのCustom metricsでキューの情報を得てオートスケールさせます。
digdagのタスクキューをCustom metricsとして扱うには以下の2つを設定します。
- PostgreSQLで管理されるdigdagでキューイングされたタスク数をpostgres_exporterでPrometheusのメトリクスとしてexportする
- Prometheusのメトリクスとして取り出したものをprometheus-adapterでCustom metricsとして扱う
postgres_exporter
まず、HelmでPrometheusを入れつつpostgres_exporterを立ち上げます。
動作に必要なもののみインストールしています。
repositories:
- name: prometheus-community
url: https://prometheus-community.github.io/helm-charts
releases:
- name: prometheus-operator
chart: prometheus-community/kube-prometheus-stack
values:
- helm-values/prometheus-operator.yml
alertmanager:
enabled: false
grafana:
enabled: false
kubeApiServer:
enabled: false
kubelet:
enabled: false
kubeControllerManager:
enabled: false
coreDns:
enabled: false
kubeDns:
enabled: false
kubeEtcd:
enabled: false
kubeScheduler:
enabled: false
kubeProxy:
enabled: false
kubeStateMetrics:
enabled: false
nodeExporter:
enabled: false
prometheus-node-exporter:
hostRootFsMount: false
prometheus:
prometheusSpec:
serviceMonitorSelectorNilUsesHelmValues: false
serviceMonitorSelector: {}
次にpostgres_exporterのServiceを立ち上げます。
今回はdefaultやsettingsのメトリクスは不要なので起動オプションでdisableにします。
apiVersion: apps/v1
kind: Deployment
metadata:
name: postgres-exporter
spec:
selector:
matchLabels:
app: postgres-exporter
replicas: 1
template:
metadata:
labels:
app: postgres-exporter
name: postgres-exporter
spec:
volumes:
- name: config
configMap:
name: postgres-exporter-config-map
containers:
- name: postgres-exporter
image: quay.io/prometheuscommunity/postgres-exporter
env:
- name: DATA_SOURCE_URI
valueFrom:
secretKeyRef:
name: digdag-secret
key: data_source_uri
- name: DATA_SOURCE_USER
valueFrom:
secretKeyRef:
name: digdag-secret
key: db_user
- name: DATA_SOURCE_PASS
valueFrom:
secretKeyRef:
name: digdag-secret
key: db_password
args:
- '--disable-default-metrics'
- '--disable-settings-metrics'
- '--extend.query-path=/config/queries.yaml'
ports:
- containerPort: 9187
name: exporter
volumeMounts:
- name: config
readOnly: true
mountPath: /config
apiVersion: v1
kind: Service
metadata:
name: postgres-exporter
labels:
app: postgres-exporter
spec:
selector:
app: postgres-exporter
ports:
- port: 9187
name: exporter
このあたりの内容から queued_task_locks
のレコード数をキューイングされたタスク数として扱います。
usage
はPrometheusのMetrics typesを確認してください。
apiVersion: v1
kind: ConfigMap
metadata:
name: postgres-exporter-config-map
data:
queries.yaml: |+
pg_exporter:
query: "SELECT count(*) AS queued_tasks FROM queued_task_locks"
metrics:
- queued_tasks:
usage: "GAUGE"
description: "Number of tasks in queue"
prometheus-adapter
同様にHelmでprometheus-adapterも導入します。
repositories:
- name: prometheus-community
url: https://prometheus-community.github.io/helm-charts
releases:
- name: prometheus-operator
chart: prometheus-community/kube-prometheus-stack
values:
- helm-values/prometheus-operator.yml
- name: prometheus-adapter
chart: prometheus-community/prometheus-adapter
values:
- helm-values/prometheus-adapter.yml
prometheus-adapterの設定についてはこちらのドキュメントを参照してください。
rules:
default: false
custom:
- seriesQuery: 'pg_exporter_queued_tasks{namespace!="",service!=""}'
resources:
overrides:
namespace:
resource: namespace
service:
resource: service
name:
as: queued_tasks
metricsQuery: '<<.Series>>{<<.LabelMatchers>>}'
prometheus:
url: http://prometheus-operator-kube-p-prometheus.default
logLevel: 2
設定し終わると以下のコマンドでメトリクスが確認できます。
kubectl get --raw /apis/custom.metrics.k8s.io/v1beta1/namespaces/default/services/postgres-exporter/queued_tasks | jq .
これでキューイングされたタスク数をCustom metricsとして扱えるため、あとはHPAを設定するだけです。
HPA
apiVersion: autoscaling/v2beta2
kind: HorizontalPodAutoscaler
metadata:
name: digdag-worker-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: digdag-worker
minReplicas: 1
maxReplicas: 2
metrics:
- type: Object
object:
describedObject:
apiVersion: v1
kind: Service
name: postgres-exporter
metric:
name: queued_tasks
target:
type: Value
value: 1
ここで重要なのはtargetのvalueが1であることです。
これはキューイングされたタスク数が0の場合のみスケールインさせるためにそうしています。
digdagはkillが走ったときに実行中のタスクがあった場合、
そのタスクが終わるまでは待ちますが、workflow全体ではそのタスク以降のタスクはキャンセルされます。
よって、workflow実行中にスケールインされて中断されないようにしています。
workflowの最大時間がある程度短く決まっているなら、 terminationGracePeriodSeconds
で停止前に待つことでtargetのvalueを1より大きくすることもできるかと思います。