2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Argo WorkflowsとRabbitMQを利用して、Kubernetesでバッチ処理基盤を構築(その1)

2
Last updated at Posted at 2023-10-13

本記事で行うこと

前回の記事を発展させて、Kubernetesでバッチ処理基盤を構築してみます。
その第1弾として、以下の構成図を考えました。
緑の四角はオープンソース、紫の四角は本記事で実装するアプリです。

image.png

緑の四角(オープンソース)について

  • CronWorkflow
    Argo Workflowsが提供するKubernetesリソースの一つで、スケジュール実行を行うワークフローエンジンです。

  • argo-server(Webアプリ)
    Argo Workflowsが提供する、ワークフローを管理するためのUIです。

  • RabbitMQ
    メッセージキューイングシステムです。
    本記事では、バッチジョブがメッセージをパブリッシュし、それをフロントエンドアプリケーションが受け取るように作ります。
    将来は、バッチ処理のTODOをキューに積んで、バッチ処理を駆動する等に利用したいです。

  • RabbitMQ Management UI(Webアプリ)
    RabbitMQが提供する、キューイングシステム管理のためのUIです。

紫の四角(本記事で実装)について

  • batch job
    バッチジョブ本体です。
    本記事ではRabbitMQにメッセージをパブリッシュするだけの処理を行います。
    Pythonで実装しましたが(send.py)、Dockerコンテナで動かすので言語は何でも良いです。

  • Frontend Application
    本記事では、RabbitMQからメッセージを受け取ってコンソールに表示するだけのPythonアプリ(receive.py)を実装します。
    将来はWebフロントエンドアプリにしたいです。

  • Admin Server(Web APIサーバー)
    処理内容は前回の記事と同じで、CronWorkflowのscheduleを確認したり変更したりするAPIを提供します。
    Go言語でclient-goを使用して実装します。
    将来は、Workflowを手動実行するAPIを追加する等、機能を充実させたいです。
    また、APIだけでなく、運用担当者向けに管理画面も作りたいです。

Webアプリの外部公開方法について

本記事では、上記の構成図のポート番号でWebアプリを外部公開します。

  • 本番ではhttpsにすべきですが、本記事では全てhttpで統一しました。
  • KubernetesでWebアプリを公開する時、Ingressリソースを使うことが多いと思います。
    しかしIngressの標準機能では、80/443以外のポートでは公開できないそうです。
    ただしこのページによれば、テクニックを使えば80/443以外のポートで公開できるそうですが、IngressよりもGateway APIが主流になる可能性を考慮し、本記事では手間を省いてNodePortタイプのServiceを使うに留めます。

主な参考サイト(感謝します)

環境

Kubernetesはkind(Kubernetes in Docker)を使用しました。

kind version

image.png

事前準備

Argo CLIのインストール

このページの手順でインストールしました。
本記事では手順は省略します。

作業手順

Kubernetesクラスタ作成

このページを参考に、kind用にマニフェストファイルを用意します。
hostPortに、構成図に記載した公開用portを設定します。
containerPortには、後にNodePortタイプのServiceを作るときに、マニフェストで設定する番号を入れます。

batch-cluster.yaml
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
  extraPortMappings:
  - containerPort: 31323
    hostPort: 1323
    listenAddress: "0.0.0.0"
  - containerPort: 32746
    hostPort: 2746
    listenAddress: "0.0.0.0"
  - containerPort: 32094
    hostPort: 5672
    listenAddress: "0.0.0.0"
  - containerPort: 32095
    hostPort: 15672
    listenAddress: "0.0.0.0"

kindコマンドでKubernetesクラスタを作成します。
クラスタ名をbatch-clusterにしましたが、何でも良いです。

kind create cluster --name batch-cluster --config=batch-cluster.yaml

image.png

Kubernetesクラスタを作成できました。

namespace作成

RabbitMQ用、Argo Workflows用、バッチ処理用のnamespaceをそれぞれ作成します。
バッチ処理用namespace名をfooにしましたが、何でも良いです。

kubectl create namespace rabbitmq
kubectl create namespace argo
kubectl create namespace foo

image.png

namespaceを作成できました。

RabbitMQ(その1):導入~管理UI起動まで

Deploymentを起動

このリポジトリのマニフェストを、そのまま使わせていただきました。感謝します。

kubectl -n rabbitmq apply -f https://raw.githubusercontent.com/latonaio/rabbitmq-on-kubernetes/main/deployment.yml

image.png

起動確認

kubectl -n rabbitmq get deployment

image.png

RabbitMQ Management UI(port 15672)を起動

外部からブラウザで起動します。
your_hostには、Kubernetesを動かしているホストを指定します。

http://your_host:15672

image.png

Username、Pasword共に「guest」でログインできます。

image.png

RabbitMQの導入とUI起動が成功しました。

batch job(その1):docker buildまで

CronWorkflowの中で実行する各ジョブを開発します。
CronWorkflowは複数の種類のジョブを実行できますが、本記事では一種類のみ作りました。

ファイル一覧とソースコード

src directory
  ├── send.py
  ├── requirements.txt
  └── Dockerfile

send.py

pikaライブラリを使用して、RabbitMQにメッセージをパブリッシュします。
3つのプログラム引数を渡す必要があります。

  • host:RabbitMQのホスト名
  • queue:キュー名
  • message:メッセージ
send.py
import json
import sys
import traceback
import pika


def main():
    host = sys.argv[1]
    queue = sys.argv[2]
    message = sys.argv[3]

    param = pika.ConnectionParameters(host=host)
    connection = pika.BlockingConnection(param)
    channel = connection.channel()
    channel.queue_declare(queue=queue)

    channel.basic_publish(exchange="", routing_key=queue, body=json.dumps({"message": message}))
    connection.close()


if __name__ == "__main__":
    try:
        main()
    except Exception:
        traceback.print_exc()

requirements.txt

requirements.txt
pika

Dockerfile

FROM python:3-slim-bullseye

RUN set -e; \
    apt-get update -y && apt-get install -y \
    lsb-release; \
    apt-get clean

RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
ENV PYTHONUNBUFFERED 1

ENV APP_HOME /app
WORKDIR $APP_HOME
COPY ./send.py ./
COPY ./requirements.txt ./

RUN pip install -r requirements.txt

ENV PORT 5672
ENTRYPOINT ["python", "send.py"]
CMD ["dummy"]

Dockerビルド

docker build ./ -t batch-job

Dockerイメージをkindに登録

kind load docker-image batch-job --name=batch-cluster

image.png

batch jobの開発&docker buildまでできました。

RabbitMQ(その2)& batch job(その2):RabbitMQとbatch jobを同時に動作確認

パブリッシュ側(send.py)の動作確認

kubectl debugコマンドを使って動作確認を行います。
記事執筆時点のマニフェストを使った場合は、以下のコマンドでデバッグ作業に入れました。
マニフェストが更新されたら、コマンドを調整する必要があるかもしれません。

kubectl -n rabbitmq debug \
    $(kubectl -n rabbitmq get --no-headers=true pods -l run=rabbitmq -o custom-columns=:metadata.name) \
    --image=batch-job \
    --image-pull-policy=Never \
    -it \
    -- sh

image.png

デバッグに入ってのシェルで、send.pyを実行します。

python send.py rabbitmq.rabbitmq foo-job test-message

image.png

RabbitMQのUIを見て、キューにメッセージが溜まれば、ひとまずOKです。

image.png

サブスクライブ側(receive.py)の開発と動作確認

外部のマシンで以下のコードを用意します。
pikaライブラリを使用して、RabbitMQからメッセージを受信します。
2つのプログラム引数を渡す必要があります。

  • host:RabbitMQのホスト名
  • queue:キュー名
receive.py
import sys
import traceback
import pika


def main():
    host = sys.argv[1]
    queue = sys.argv[2]

    def callback(ch, method, properties, body):
        print(" [x] Received %r" % body)

    param = pika.ConnectionParameters(host=host)
    connection = pika.BlockingConnection(param)
    channel = connection.channel()
    channel.queue_declare(queue=queue)

    channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
    channel.start_consuming()


if __name__ == "__main__":
    try:
        main()
    except Exception:
        traceback.print_exc()

pip等でpikaライブラリをインストール後、以下を実行します。
your_hostには、Kubernetesを動かしているホストを指定します。

python receive.py your_host foo-job

RabbitMQのキューに溜まっていたテストメッセージが受信されればOKです。

image.png

管理UIでも、キューのメッセージが0件になったことを確認します。

image.png

RabbitMQとbatch jobの動作確認ができました。
receive.pyを終了させ、Kerbernetes側のホストでOSのシェルに戻っておきます。

Argo Workflowsのインストールとargo-serverのUI起動

インストール

このページの手順でインストールします。
本記事執筆時点で、Argo Workflowsの最新バージョンはv3.4.11でした。

kubectl -n argo apply -f https://github.com/argoproj/argo-workflows/releases/download/v3.4.11/install.yaml

image.png

argo-serverの設定変更

本記事ではhttpsでなくhttpに統一しますので、kubectl patchコマンドを使って、argo-serverの設定をそのように変更します。

kubectl patch deployment \
  argo-server \
  --namespace argo \
  --type='json' \
  -p='[
          {
              "op": "replace",
              "path": "/spec/template/spec/containers/0/readinessProbe/httpGet/scheme",
              "value": "HTTP"
          },
          {
              "op": "replace",
              "path": "/spec/template/spec/containers/0/args",
              "value": [
                  "server",
                  "--auth-mode=server",
                  "--secure=false"
              ]
          }
      ]'

image.png

変更内容確認

kubectl -n argo get deployment argo-server -o yaml

image.png

argo-server用のServiceを起動

以下のマニフェストファイルを用意します。
記事冒頭で書きましたように、NodePortタイプのServiceで外部公開します。

argo-server-nodeport.yaml
apiVersion: v1
kind: Service
metadata:
  name: argo-server-nodeport
  namespace: argo
spec:
  type: NodePort
  ports:
  - name: web
    port: 2746
    targetPort: 2746
    nodePort: 32746
  selector:
    app: argo-server

Serviceを起動します。

kubectl apply -f argo-server-nodeport.yaml

image.png

起動確認

kubectl -n argo get svc

image.png

argo-server(port 2746)を起動

外部からブラウザで起動します。
your_hostには、Kubernetesを動かしているホストを指定します。

http://your_host:2746

image.png

argo-serverのUIを起動できました。

Admin Server(Go言語)

この記事と内容がダブります。

client-goを使用して、CronJobやCronWorkflowのscheduleを変更するAPIを開発します。
こちらのソースコードを参考にしました。

ファイル一覧とソースコード

src directory
  ├── admin_server.go
  ├── go.mod
  └── Dockerfile

admin_server.go

WebフレームワークはEchoを使用しました。
起動時にプログラム引数として、namespace名を1つ渡す必要があります。
APIは以下の4つです。

  • GET /<<namespace>>/cronjobs
    namespaceに属するCronJobの一覧を取得します。

  • PUT /<<namespace>>/cronjobs
    CronJobのscheduleをupdateします。

  • GET /<<namespace>/cronworkflows
    namespaceに属するCronWorkflowの一覧を取得します。

  • PUT /<<namespace>>/cronworkflows
    CronWorkflowのscheduleをupdateします。

※ 本記事ではCronJobは使用しません。

admin_server.go
package main

import (
	"context"
	argoclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
	"github.com/labstack/echo/v4"
	"github.com/labstack/echo/v4/middleware"
	metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
	"k8s.io/client-go/kubernetes"
	"k8s.io/client-go/rest"
	"k8s.io/client-go/tools/clientcmd"
	"k8s.io/client-go/util/homedir"
	"net/http"
	"os"
	"path/filepath"
)

type httpError struct {
	Error string `json:"error"`
}

var (
	namespace     string
	clientset     *kubernetes.Clientset
	argoClientset *argoclientset.Clientset
)

func getCronjobs(c echo.Context) error {
	// レスポンス初期化
	type cronjob struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}
	type responseType struct {
		Data []cronjob `json:"data"`
	}
	var response responseType
	response.Data = []cronjob{}

	// CronJobリスト取得
	cjobs, err := clientset.BatchV1().CronJobs(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 各CronJobをレスポンスに追加
	for _, cjob := range cjobs.Items {
		cj := cronjob{
			Name:     cjob.Name,
			Schedule: cjob.Spec.Schedule,
		}
		response.Data = append(response.Data, cj)
	}

	return c.JSON(http.StatusOK, response)
}

func updateCronjob(c echo.Context) error {
	// リクエストボディ&レスポンスの型
	type cronjob struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}

	// リクエストボディを構造体にバインド
	reqb := cronjob{}
	if err := c.Bind(&reqb); err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	cjClient := clientset.BatchV1().CronJobs(namespace)

	// CronJob取得
	cj, err := cjClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
	if err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	// スケジュール変更
	cj.Spec.Schedule = reqb.Schedule

	// 更新適用
	resultCj, err := cjClient.Update(context.TODO(), cj, metav1.UpdateOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 更新された内容をレスポンスにする
	response := cronjob{
		Name:     resultCj.Name,
		Schedule: resultCj.Spec.Schedule,
	}
	return c.JSON(http.StatusOK, response)
}

func getCronworkflows(c echo.Context) error {
	// レスポンス初期化
	type cronworkflow struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}
	type responseType struct {
		Data []cronworkflow `json:"data"`
	}
	var response responseType
	response.Data = []cronworkflow{}

	// CronWorkflowリスト取得
	cworkflows, err := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace).List(context.TODO(), metav1.ListOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 各CronWorkflowをレスポンスに追加
	for _, cworkflow := range cworkflows.Items {
		cw := cronworkflow{
			Name:     cworkflow.Name,
			Schedule: cworkflow.Spec.Schedule,
		}
		response.Data = append(response.Data, cw)
	}

	return c.JSON(http.StatusOK, response)
}

func updateCronworkflow(c echo.Context) error {
	// リクエストボディ&レスポンスの型
	type cronworkflow struct {
		Name     string `json:"name"`
		Schedule string `json:"schedule"`
	}

	// リクエストボディを構造体にバインド
	reqb := cronworkflow{}
	if err := c.Bind(&reqb); err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	cwClient := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace)

	// CronWorkflow取得
	cw, err := cwClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
	if err != nil {
		return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
	}

	// スケジュール変更
	cw.Spec.Schedule = reqb.Schedule

	// 更新適用
	resultCw, err := cwClient.Update(context.TODO(), cw, metav1.UpdateOptions{})
	if err != nil {
		return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
	}

	// 更新された内容をレスポンスにする
	response := cronworkflow{
		Name:     resultCw.Name,
		Schedule: resultCw.Spec.Schedule,
	}
	return c.JSON(http.StatusOK, response)
}

// 開発用。本番コードでは不要
func outClusterConfig() (*rest.Config, error) {
	// Create client
	kubeconfig, ok := os.LookupEnv("KUBECONFIG")
	if !ok {
		kubeconfig = filepath.Join(homedir.HomeDir(), ".kube", "config")
	}
	return clientcmd.BuildConfigFromFlags("", kubeconfig)
}

func main() {
	// Program argsでKubernetesのnamespaceを指定
	if len(os.Args) != 2 {
		panic("assert len(os.Args) == 2")
	}
	namespace = os.Args[1]

	// creates the in-cluster config
	config, err := rest.InClusterConfig()
	if err != nil {
		config, err = outClusterConfig() // 開発用。本番コードでは消す
		if err != nil {
			panic(err.Error())
		}
	}

	// creates the clientset and argoClientset
	clientset, err = kubernetes.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}
	argoClientset, err = argoclientset.NewForConfig(config)
	if err != nil {
		panic(err.Error())
	}

	// EchoでAPIサーバー
	e := echo.New()
	e.Use(middleware.Logger())
	e.Use(middleware.Recover())

	// Routing
	api := e.Group("/" + namespace)
	api.GET("/cronjobs", getCronjobs)
	api.PUT("/cronjobs", updateCronjob)
	api.GET("/cronworkflows", getCronworkflows)
	api.PUT("/cronworkflows", updateCronworkflow)

	e.Logger.Fatal(e.Start(":1323"))
}

go.mod

go.mod
module admin_server

go 1.20

require (
	github.com/argoproj/argo-workflows/v3 v3.4.11
	github.com/labstack/echo/v4 v4.11.1
	k8s.io/apimachinery v0.28.2
	k8s.io/client-go v0.28.2
)

require (
	github.com/davecgh/go-spew v1.1.1 // indirect
	github.com/emicklei/go-restful/v3 v3.10.0 // indirect
	github.com/go-logr/logr v1.2.4 // indirect
	github.com/go-openapi/jsonpointer v0.19.6 // indirect
	github.com/go-openapi/jsonreference v0.20.2 // indirect
	github.com/go-openapi/swag v0.22.3 // indirect
	github.com/gogo/protobuf v1.3.2 // indirect
	github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
	github.com/golang/protobuf v1.5.3 // indirect
	github.com/google/gnostic-models v0.6.8 // indirect
	github.com/google/go-cmp v0.5.9 // indirect
	github.com/google/gofuzz v1.2.0 // indirect
	github.com/google/uuid v1.3.0 // indirect
	github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
	github.com/imdario/mergo v0.3.13 // indirect
	github.com/josharian/intern v1.0.0 // indirect
	github.com/json-iterator/go v1.1.12 // indirect
	github.com/labstack/gommon v0.4.0 // indirect
	github.com/mailru/easyjson v0.7.7 // indirect
	github.com/mattn/go-colorable v0.1.13 // indirect
	github.com/mattn/go-isatty v0.0.19 // indirect
	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
	github.com/modern-go/reflect2 v1.0.2 // indirect
	github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
	github.com/sirupsen/logrus v1.9.3 // indirect
	github.com/spf13/pflag v1.0.5 // indirect
	github.com/valyala/bytebufferpool v1.0.0 // indirect
	github.com/valyala/fasttemplate v1.2.2 // indirect
	golang.org/x/crypto v0.12.0 // indirect
	golang.org/x/net v0.14.0 // indirect
	golang.org/x/oauth2 v0.11.0 // indirect
	golang.org/x/sys v0.11.0 // indirect
	golang.org/x/term v0.11.0 // indirect
	golang.org/x/text v0.12.0 // indirect
	golang.org/x/time v0.3.0 // indirect
	google.golang.org/appengine v1.6.7 // indirect
	google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
	google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 // indirect
	google.golang.org/genproto/googleapis/rpc v0.0.0-20230720185612-659f7aaaa771 // indirect
	google.golang.org/grpc v1.56.2 // indirect
	google.golang.org/protobuf v1.31.0 // indirect
	gopkg.in/inf.v0 v0.9.1 // indirect
	gopkg.in/yaml.v2 v2.4.0 // indirect
	gopkg.in/yaml.v3 v3.0.1 // indirect
	k8s.io/api v0.28.2 // indirect
	k8s.io/klog/v2 v2.100.1 // indirect
	k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
	k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
	sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
	sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
	sigs.k8s.io/yaml v1.3.0 // indirect
)

Dockerfile

FROM golang:bullseye as builder
WORKDIR /app
COPY ./admin_server.go ./
COPY ./go.mod ./
RUN go mod tidy
RUN go build -o admin_server admin_server.go

FROM debian:bullseye
WORKDIR /app
COPY --from=builder /app/admin_server /app/
ENTRYPOINT ["/app/admin_server"]

Dockerビルド

docker build ./ -t admin-server

Dockerイメージをkindに登録

kind load docker-image admin-server --name=batch-cluster

image.png

Admin Server用サービスアカウントの作成

Admin ServerをPodで動かすために必要な、サービスアカウントを作成します。
以下のマニフェストファイルを用意します。

admin-sa.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
  name: admin-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
  name: admin-role
rules:
  - apiGroups:
      - batch
    verbs:
      - list
      - get
      - update
    resources:
      - cronjobs
  - apiGroups:
      - argoproj.io
    verbs:
      - list
      - get
      - update
    resources:
      - cronworkflows
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
  name: admin-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: Role
  name: admin-role
subjects:
  - kind: ServiceAccount
    name: admin-sa

サービスアカウントを作成します。

kubectl -n foo apply -f admin-sa.yaml

image.png

Admin Server用Serviceの作成

以下のマニフェストファイルを用意します。

  • 先ほど作成したサービスアカウント「admin-sa」を指定します。
  • Admin Server起動時のプログラム引数として、namespace名(本記事では"foo")を渡すように設定します。
  • 記事冒頭で書きましたように、NodePortタイプのServiceで外部公開します。
foo-admin-nodeport.yaml
---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: foo-admin-deployment
  namespace: foo
  labels:
    app: foo-admin-server
spec:
  replicas: 1
  selector:
    matchLabels:
      app: foo-admin-server
  template:
    metadata:
      labels:
        app: foo-admin-server
    spec:
      serviceAccountName: admin-sa
      containers:
      - name: foo-admin-container
        image: admin-server:latest
        imagePullPolicy: Never
        args: ["foo"]
---
apiVersion: v1
kind: Service
metadata:
  name: foo-admin-nodeport
  namespace: foo
  labels:
    app: foo-admin-server
spec:
  type: NodePort
  ports:
  - port: 1323
    targetPort: 1323
    nodePort: 31323
  selector:
    app: foo-admin-server

Serviceを起動します。

kubectl apply -f foo-admin-nodeport.yaml

image.png

起動確認

kubectl -n foo get pod

image.png

Serviceの動作確認

ここで一旦、Admin ServerのAPIを動作確認しておきます。

動作確認用CronWorkflowを起動

公式サイトのexamplesのマニフェストファイルをそのまま使用しました。

kubectl -n foo apply -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cron-workflow.yaml

image.png

起動確認

argo -n foo cron list

image.png

CronWorkflow名がhello-worldであることを覚えておきます。

APIを呼んでみる

外から2つのAPIを呼んでみます。
your_hostには、Kubernetesを動かしているホストを指定します。

curl http://your_host:1323/foo/cronworkflows

image.png

curl --location --request PUT 'http://your_host:1323/foo/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "1 0 * * *"}'

image.png

APIが正常に動くことが確認できました。

動作確認用CronWorkflowを削除

kubectl applyコマンドで作ったCronWorkflowを、kubectl deleteコマンドで削除します。

kubectl -n foo delete -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cron-workflow.yaml

image.png

別の方法として、argo-serverのUIから削除することもできます。

image.png

Admin Serverの一旦の動作確認まで完了しました。

CronWorkflow

CronWorkflowを起動する前に、あらかじめreceive.pyを外部で起動しておきます。
your_hostには、Kubernetesを動かしているホストを指定します。

python receive.py your_host foo-job

image.png

次に、1分毎にスケジュール実行を行うCronWorkflowを作成します。
以下のマニフェストファイルを用意します。
既に構築したbatch-jobを2回、直列に実行します。

foo-cronworkflow.yaml
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
  name: foo-cronworkflow
  namespace: foo
spec:
  schedule: "* * * * *"
  timezone: "Asia/Tokyo"
  concurrencyPolicy: "Forbid"
  startingDeadlineSeconds: 0
  workflowSpec:
    entrypoint: foo-entrypoint
    onExit: finally-container
    templates:
    - name: foo-entrypoint
      dag:
        tasks:
        - name: job1
          template: job1-container
        - name: job2
          depends: job1
          template: job2-container
    - name: job1-container
      container:
        image: batch-job
        imagePullPolicy: Never
        command: ["python", "send.py"]
        args: ["rabbitmq.rabbitmq", "foo-job", "job1"]
    - name: job2-container
      container:
        image: batch-job
        imagePullPolicy: Never
        command: ["python", "send.py"]
        args: ["rabbitmq.rabbitmq", "foo-job", "job2"]
    - name: finally-container
      container:
        image: batch-job
        imagePullPolicy: Never
        command: ["sh", "-c"]
        args:
        - |
          echo finally

CronWorkflowを起動します。

kubectl apply -f foo-cronworkflow.yaml

image.png

起動確認

argo -n foo cron list

image.png

事前に起動したreceive.py側を見ると、1分毎にワークフローからのメッセージが受信され続けました。

image.png

image.png

image.png

argo-serverのUIで、ワークフローをリアルタイムに確認できます。

image.png

CronWorkflowを動作確認できました。

Admin Serverで、起動中のCronWorkflowのscheduleを操作する

外部からAdmin ServerのAPIを呼んで、CronWorkflowのscheduleをリアルタイムに変更してみます。
your_hostには、Kubernetesを動かしているホストを指定します。

まずGET APIで現在のスケジュールを確認すると、1分毎の実行になっています。

curl http://your_host:1323/foo/cronworkflows

image.png

次にPUT APIでスケジュールを毎日0時0分に変更します。

curl --location --request PUT 'http://your_host:1323/foo/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "foo-cronworkflow", "schedule": "0 0 * * *"}'

image.png

receive.py側を見ると、メッセージの受信が止まっていました。
0時0分までこのまま動かないはずです。

image.png

Admin ServerのAPIで、CronWorkflowのscheduleを変更できました。

後始末

Kubernetesクラスタを削除するコマンドです。

kind delete clusters batch-cluster
2
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?