LoginSignup
1
0

CronJobやArgo CronWorkflowの、scheduleを変更するAPIを作ってみた

Last updated at Posted at 2023-10-07

本記事で行うこと

Kubernetesで、CronJobと、Argo WorkflowsCronWorkflowの2種類のリソースについて、scheduleを運用時に動的に変更したくなりました。
以下のようにkubectlコマンドを使ってパッチを当てればできますが、

kubectl patch cronworkflow \
  cronworkflow-name \
  --namespace hoge \
  --type='json' \
  -p='[
    {
      "op": "replace",
      "path": "/spec/schedule",
      "value": "* * * * *"
    }
  ]'

本記事では非エンジニアが管理画面から変更することを前提に、schedule変更を行うWeb APIを作成し、そのAPIサーバーをPodで動かして、Ingressで外に公開します。

環境

Kubernetesはkind(Kubernetes in Docker)を使用しました。

kind version

image.png

事前準備

Argo CLIのインストール

このページの手順でインストールしました。
本記事では手順は省略します。

作業手順

本記事では、Ingress NGINXを使用して、ポート80でhttpサーバーを公開します。

Kubernetesクラスタ作成

このページを参考に、kind用にマニフェストファイルを用意します。

foo-cluster.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
  kubeadmConfigPatches:
  - |
    kind: InitConfiguration
    nodeRegistration:
      kubeletExtraArgs:
        node-labels: "ingress-ready=true"
  extraPortMappings:
  - containerPort: 80
    hostPort: 80

kindコマンドでKubernetesクラスタを作成します。
クラスタ名をfoo-clusterにしましたが、何でも良いです。

kind create cluster --name foo-cluster --config=foo-cluster.yaml

image.png

Ingress-Nginx Controllerをインストール

Ingress-Nginx Controllerは、一般にはこのページの手順でインストールします。
しかし、本記事ではkindを使用しますので、このページの手順でkind用にインストールします。

kubectl apply -f https://raw.githubusercontent.com/kubernetes/ingress-nginx/main/deploy/static/provider/kind/deploy.yaml

image.png

コントローラーの起動を待ちます。

kubectl wait --namespace ingress-nginx \
  --for=condition=ready pod \
  --selector=app.kubernetes.io/component=controller \
  --timeout=90s

image.png

起動確認

kubectl get pods -n ingress-nginx

image.png

Argo Workflowsをインストール

このページの手順でインストールします。
本記事執筆時点で、Argo Workflowsの最新バージョンはv3.4.11でした。

kubectl create namespace argo

kubectl apply -n argo -f https://github.com/argoproj/argo-workflows/releases/download/v3.4.11/install.yaml

image.png

作業用namespace作成

作業用namespace名をbarにしましたが、何でも良いです。

kubectl create namespace bar

image.png

CronJob起動

後ほど作るAPIから、CronJobのscheduleを変更できるようにしますので、そのテスト用CronJobを立ち上げておきます。
公式サイトのexamplesのマニフェストファイルをそのまま使用しました。

kubectl -n bar apply -f https://raw.githubusercontent.com/kubernetes/website/main/content/ja/examples/application/job/cronjob.yaml

image.png

起動確認

kubectl -n bar get cronjob

image.png

CronJob名がhelloであることを覚えておきます。

CronWorkflow起動

後ほど作るAPIから、CronWorkflowのscheduleを変更できるようにしますので、そのテスト用CronWorkflowを立ち上げておきます。
Argo公式サイトのexamplesのマニフェストファイルをそのまま使用しました。

kubectl -n bar apply -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cron-workflow.yaml

image.png

起動確認

argo -n bar cron list

image.png

CronWorkflow名がhello-worldであることを覚えておきます。

Web APIサーバー(Go言語)

client-goを使用して、scheduleを変更するAPIを開発します。
こちらのソースコードを参考にしました。

ファイル一覧とソースコード

src directory
  ├── admin_server.go
  ├── go.mod
  └── Dockerfile

admin_server.go

WebフレームワークはEchoを使用しました。
起動時にプログラム引数として、namespace名を1つ渡す必要があります。
APIは以下の4つです。

  • GET /<<namespace>>/cronjobs
    namespaceに属するCronJobの一覧を取得します。

  • PUT /<<namespace>>/cronjobs
    CronJobのscheduleをupdateします。

  • GET /<<namespace>/cronworkflows
    namespaceに属するCronWorkflowの一覧を取得します。

  • PUT /<<namespace>>/cronworkflows
    CronWorkflowのscheduleをupdateします。

admin_server.go
package main

import (
	"context"
	argoclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"net/http"
	"os"
	"path/filepath"
)

type httpError struct {
	Error string `json:"error"`
}

var (
	namespace     string
	clientset     *kubernetes.Clientset
	argoClientset *argoclientset.Clientset
)

func getCronjobs(c echo.Context) error {
	// レスポンス初期化
	type cronjob struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}
	type responseType struct {
		Data []cronjob `json:"data"`
	}
	var response responseType
	response.Data = []cronjob{}

	// CronJobリスト取得
	cjobs, err := clientset.BatchV1().CronJobs(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 各CronJobをレスポンスに追加
	for _, cjob := range cjobs.Items {
		cj := cronjob{
			Name:     cjob.Name,
			Schedule: cjob.Spec.Schedule,
		}
		response.Data = append(response.Data, cj)
	}

	return c.JSON(http.StatusOK, response)
}

func updateCronjob(c echo.Context) error {
	// リクエストボディ&レスポンスの型
	type cronjob struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}

	// リクエストボディを構造体にバインド
	reqb := cronjob{}
	if err := c.Bind(&reqb); err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	cjClient := clientset.BatchV1().CronJobs(namespace)

	// CronJob取得
	cj, err := cjClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
	if err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	// スケジュール変更
	cj.Spec.Schedule = reqb.Schedule

	// 更新適用
	resultCj, err := cjClient.Update(context.TODO(), cj, metav1.UpdateOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 更新された内容をレスポンスにする
	response := cronjob{
		Name:     resultCj.Name,
		Schedule: resultCj.Spec.Schedule,
	}
	return c.JSON(http.StatusOK, response)
}

func getCronworkflows(c echo.Context) error {
	// レスポンス初期化
	type cronworkflow struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}
	type responseType struct {
		Data []cronworkflow `json:"data"`
	}
	var response responseType
	response.Data = []cronworkflow{}

	// CronWorkflowリスト取得
	cworkflows, err := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 各CronWorkflowをレスポンスに追加
	for _, cworkflow := range cworkflows.Items {
		cw := cronworkflow{
			Name:     cworkflow.Name,
			Schedule: cworkflow.Spec.Schedule,
		}
		response.Data = append(response.Data, cw)
	}

	return c.JSON(http.StatusOK, response)
}

func updateCronworkflow(c echo.Context) error {
	// リクエストボディ&レスポンスの型
	type cronworkflow struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}

	// リクエストボディを構造体にバインド
	reqb := cronworkflow{}
	if err := c.Bind(&reqb); err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	cwClient := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace)

	// CronWorkflow取得
	cw, err := cwClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
	if err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	// スケジュール変更
	cw.Spec.Schedule = reqb.Schedule

	// 更新適用
	resultCw, err := cwClient.Update(context.TODO(), cw, metav1.UpdateOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 更新された内容をレスポンスにする
	response := cronworkflow{
		Name:     resultCw.Name,
		Schedule: resultCw.Spec.Schedule,
	}
	return c.JSON(http.StatusOK, response)
}

// 開発用。本番コードでは不要
func outClusterConfig() (*rest.Config, error) {
	// Create client
	kubeconfig, ok := os.LookupEnv("KUBECONFIG")
	if !ok {
		kubeconfig = filepath.Join(homedir.HomeDir(), ".kube", "config")
	}
	return clientcmd.BuildConfigFromFlags("", kubeconfig)
}

func main() {
	// Program argsでKubernetesのnamespaceを指定
	if len(os.Args) != 2 {
		panic("assert len(os.Args) == 2")
	}
	namespace = os.Args[1]

	// creates the in-cluster config
	config, err := rest.InClusterConfig()
	if err != nil {
		config, err = outClusterConfig() // 開発用。本番コードでは消す
		if err != nil {
			panic(err.Error())
		}
	}

	// creates the clientset and argoClientset
	clientset, err = kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	argoClientset, err = argoclientset.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	// echoでAPIサーバー
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	// Routing
	api := e.Group("/" + namespace)
	api.GET("/cronjobs", getCronjobs)
	api.PUT("/cronjobs", updateCronjob)
	api.GET("/cronworkflows", getCronworkflows)
	api.PUT("/cronworkflows", updateCronworkflow)

	e.Logger.Fatal(e.Start(":1323"))
}

go.mod

go.mod
module admin_server

go 1.20

require (
	github.com/argoproj/argo-workflows/v3 v3.4.11
	github.com/labstack/echo/v4 v4.11.1
	k8s.io/apimachinery v0.28.2
	k8s.io/client-go v0.28.2
)

require (
	github.com/davecgh/go-spew v1.1.1 // indirect
	github.com/emicklei/go-restful/v3 v3.10.0 // indirect
	github.com/go-logr/logr v1.2.4 // indirect
	github.com/go-openapi/jsonpointer v0.19.6 // indirect
	github.com/go-openapi/jsonreference v0.20.2 // indirect
	github.com/go-openapi/swag v0.22.3 // indirect
	github.com/gogo/protobuf v1.3.2 // indirect
	github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
	github.com/golang/protobuf v1.5.3 // indirect
	github.com/google/gnostic-models v0.6.8 // indirect
	github.com/google/go-cmp v0.5.9 // indirect
	github.com/google/gofuzz v1.2.0 // indirect
	github.com/google/uuid v1.3.0 // indirect
	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
	github.com/imdario/mergo v0.3.13 // indirect
	github.com/josharian/intern v1.0.0 // indirect
	github.com/json-iterator/go v1.1.12 // indirect
	github.com/labstack/gommon v0.4.0 // indirect
	github.com/mailru/easyjson v0.7.7 // indirect
	github.com/mattn/go-colorable v0.1.13 // indirect
	github.com/mattn/go-isatty v0.0.19 // indirect
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/modern-go/reflect2 v1.0.2 // indirect
	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
	github.com/sirupsen/logrus v1.9.3 // indirect
	github.com/spf13/pflag v1.0.5 // indirect
	github.com/valyala/bytebufferpool v1.0.0 // indirect
	github.com/valyala/fasttemplate v1.2.2 // indirect
	golang.org/x/crypto v0.12.0 // indirect
	golang.org/x/net v0.14.0 // indirect
	golang.org/x/oauth2 v0.11.0 // indirect
	golang.org/x/sys v0.11.0 // indirect
	golang.org/x/term v0.11.0 // indirect
	golang.org/x/text v0.12.0 // indirect
	golang.org/x/time v0.3.0 // indirect
	google.golang.org/appengine v1.6.7 // indirect
	google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
	google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 // indirect
	google.golang.org/genproto/googleapis/rpc v0.0.0-20230720185612-659f7aaaa771 // indirect
	google.golang.org/grpc v1.56.2 // indirect
	google.golang.org/protobuf v1.31.0 // indirect
	gopkg.in/inf.v0 v0.9.1 // indirect
	gopkg.in/yaml.v2 v2.4.0 // indirect
	gopkg.in/yaml.v3 v3.0.1 // indirect
	k8s.io/api v0.28.2 // indirect
	k8s.io/klog/v2 v2.100.1 // indirect
	k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
	k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
	sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
	sigs.k8s.io/yaml v1.3.0 // indirect
)

Dockerfile

FROM golang:bullseye as builder
WORKDIR /app
COPY ./admin_server.go ./
COPY ./go.mod ./
RUN go mod tidy
RUN go build -o admin_server admin_server.go

FROM debian:bullseye
WORKDIR /app
COPY --from=builder /app/admin_server /app/
ENTRYPOINT ["/app/admin_server"]

Dockerビルド

docker build ./ -t admin-server

Dockerイメージをkindに登録

kind load docker-image admin-server --name=foo-cluster

image.png

APIサーバー用サービスアカウントの作成

APIサーバーをPodで動かすために必要な、サービスアカウントを作成します。
以下のマニフェストファイルを用意しました。

※ サービスアカウントの参考サイト(感謝します)

admin-sa.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: admin-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: admin-role
rules:
  - apiGroups:
      - batch
    verbs:
      - list
      - get
      - update
    resources:
      - cronjobs
  - apiGroups:
      - argoproj.io
    verbs:
      - list
      - get
      - update
    resources:
      - cronworkflows
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: admin-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: admin-role
subjects:
  - kind: ServiceAccount
    name: admin-sa

サービスアカウントを作成します。

kubectl -n bar apply -f admin-sa.yaml

image.png

APIサーバー用Serviceの作成

以下のマニフェストファイルを用意します。

  • 先ほど作成したサービスアカウント「admin-sa」を指定します。
  • APIサーバー起動時のプログラム引数として、namespace(本記事では"bar")を渡すように設定します。
  • typeはNodePortにし、ポート8888でAPIサーバーに繋がるようにしました。
bar-admin-nodeport.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: bar-admin-deployment
  namespace: bar
  labels:
    app: bar-admin-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app: bar-admin-server
  template:
    metadata:
      labels:
        app: bar-admin-server
    spec:
      serviceAccountName: admin-sa
      containers:
      - name: bar-admin-container
        image: admin-server:latest
        imagePullPolicy: Never
        args: ["bar"]
---
apiVersion: v1
kind: Service
metadata:
  name: bar-admin-nodeport
  namespace: bar
  labels:
    app: bar-admin-server
spec:
  type: NodePort
  ports:
  - port: 8888
    targetPort: 1323
  selector:
    app: bar-admin-server

Serviceを起動します。

kubectl apply -f bar-admin-nodeport.yaml

image.png

起動確認:NAMEがbar-admin-deployment-で始まるpodが起動していることを確認します。

kubectl -n bar get pod

image.png

Serviceの動作確認

ここで一旦、APIサーバーの動作確認をしておきます。
2つの方法を記載しておきます。

kubectl debugコマンドを使う方法

本記事ではreplicasを1にしたので、以下のコマンドでデバッグ作業に入れます。

kubectl -n bar debug $(kubectl -n bar get --no-headers=true pods -l app=bar-admin-server -o custom-columns=:metadata.name) --image=curlimages/curl -it -- sh

image.png

4つのAPIを呼んでみます。

curl http://bar-admin-nodeport.bar:8888/bar/cronjobs

image.png

curl http://bar-admin-nodeport.bar:8888/bar/cronworkflows

image.png

curl --location --request PUT 'http://bar-admin-nodeport.bar:8888/bar/cronjobs' --header 'Content-Type: application/json' --data '{"name": "hello", "schedule": "1 0 * * *"}'

image.png

curl --location --request PUT 'http://bar-admin-nodeport.bar:8888/bar/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "2 0 * * *"}'

image.png

APIが正常に動くことが確認できました。

kubectl port-forwardコマンドを使う方法

kubectl -n bar port-forward svc/bar-admin-nodeport 30000:8888

image.png

4つのAPIを呼んでみます。

curl http://localhost:30000/bar/cronjobs

image.png

curl http://localhost:30000/bar/cronworkflows

image.png

curl --location --request PUT 'http://localhost:30000/bar/cronjobs' --header 'Content-Type: application/json' --data '{"name": "hello", "schedule": "3 0 * * *"}'

image.png

curl --location --request PUT 'http://localhost:30000/bar/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "4 0 * * *"}'

image.png

APIが正常に動くことが確認できました。

Ingress作成

APIを外に公開するために、Ingressを使用しました。
以下のマニフェストファイルを用意します。

  • 本番ではhttpsで公開すべきですが、本記事ではhttpにしました。
bar-admin-ingress.yaml
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
  name: bar-admin-ingress
  namespace: bar
spec:
  ingressClassName: "nginx"
  rules:
  - http:
      paths:
      - path: /bar
        pathType: Prefix
        backend:
          service:
            name: bar-admin-nodeport
            port:
              number: 8888

Ingressを起動します。

kubectl apply -f bar-admin-ingress.yaml

image.png

起動確認

kubectl -n bar get ingress

image.png

外からIngressの動作確認

your_hostをマシンのホスト名に書き換えて、4つのAPIを呼んでみます。

curl http://your_host/bar/cronjobs

image.png

curl http://your_host/bar/cronworkflows

image.png

curl --location --request PUT 'http://your_host/bar/cronjobs' --header 'Content-Type: application/json' --data '{"name": "hello", "schedule": "5 0 * * *"}'

image.png

curl --location --request PUT 'http://your_host/bar/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "6 0 * * *"}'

image.png

APIが正常に動くことが確認できました。

後始末

最初に作成したKubernetesクラスタを削除します。

kind delete clusters foo-cluster

image.png

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0