本記事で行うこと
前回の記事を発展させて、Kubernetesでバッチ処理基盤を構築してみます。
その第1弾として、以下の構成図を考えました。
緑の四角はオープンソース、紫の四角は本記事で実装するアプリです。
緑の四角(オープンソース)について
-
CronWorkflow
Argo Workflowsが提供するKubernetesリソースの一つで、スケジュール実行を行うワークフローエンジンです。 -
argo-server(Webアプリ)
Argo Workflowsが提供する、ワークフローを管理するためのUIです。 -
RabbitMQ
メッセージキューイングシステムです。
本記事では、バッチジョブがメッセージをパブリッシュし、それをフロントエンドアプリケーションが受け取るように作ります。
将来は、バッチ処理のTODOをキューに積んで、バッチ処理を駆動する等に利用したいです。 -
RabbitMQ Management UI(Webアプリ)
RabbitMQが提供する、キューイングシステム管理のためのUIです。
紫の四角(本記事で実装)について
-
batch job
バッチジョブ本体です。
本記事ではRabbitMQにメッセージをパブリッシュするだけの処理を行います。
Pythonで実装しましたが(send.py)、Dockerコンテナで動かすので言語は何でも良いです。 -
Frontend Application
本記事では、RabbitMQからメッセージを受け取ってコンソールに表示するだけのPythonアプリ(receive.py)を実装します。
将来はWebフロントエンドアプリにしたいです。 -
Admin Server(Web APIサーバー)
処理内容は前回の記事と同じで、CronWorkflowのscheduleを確認したり変更したりするAPIを提供します。
Go言語でclient-goを使用して実装します。
将来は、Workflowを手動実行するAPIを追加する等、機能を充実させたいです。
また、APIだけでなく、運用担当者向けに管理画面も作りたいです。
Webアプリの外部公開方法について
本記事では、上記の構成図のポート番号でWebアプリを外部公開します。
- 本番ではhttpsにすべきですが、本記事では全てhttpで統一しました。
- KubernetesでWebアプリを公開する時、Ingressリソースを使うことが多いと思います。
しかしIngressの標準機能では、80/443以外のポートでは公開できないそうです。
ただしこのページによれば、テクニックを使えば80/443以外のポートで公開できるそうですが、IngressよりもGateway APIが主流になる可能性を考慮し、本記事では手間を省いてNodePortタイプのServiceを使うに留めます。
主な参考サイト(感謝します)
- rabbitmq-on-kubernetes
- RabbitMQのチュートリアルを雑に日本語に訳してみた -Hello World-
- KubernetesのService Accountについて調べてみた
- kubernetes – ServiceAccount
環境
Kubernetesはkind(Kubernetes in Docker)を使用しました。
kind version
事前準備
Argo CLIのインストール
このページの手順でインストールしました。
本記事では手順は省略します。
作業手順
Kubernetesクラスタ作成
このページを参考に、kind用にマニフェストファイルを用意します。
hostPortに、構成図に記載した公開用portを設定します。
containerPortには、後にNodePortタイプのServiceを作るときに、マニフェストで設定する番号を入れます。
kind: Cluster
apiVersion: kind.x-k8s.io/v1alpha4
nodes:
- role: control-plane
extraPortMappings:
- containerPort: 31323
hostPort: 1323
listenAddress: "0.0.0.0"
- containerPort: 32746
hostPort: 2746
listenAddress: "0.0.0.0"
- containerPort: 32094
hostPort: 5672
listenAddress: "0.0.0.0"
- containerPort: 32095
hostPort: 15672
listenAddress: "0.0.0.0"
kindコマンドでKubernetesクラスタを作成します。
クラスタ名をbatch-clusterにしましたが、何でも良いです。
kind create cluster --name batch-cluster --config=batch-cluster.yaml
Kubernetesクラスタを作成できました。
namespace作成
RabbitMQ用、Argo Workflows用、バッチ処理用のnamespaceをそれぞれ作成します。
バッチ処理用namespace名をfooにしましたが、何でも良いです。
kubectl create namespace rabbitmq
kubectl create namespace argo
kubectl create namespace foo
namespaceを作成できました。
RabbitMQ(その1):導入~管理UI起動まで
Deploymentを起動
このリポジトリのマニフェストを、そのまま使わせていただきました。感謝します。
kubectl -n rabbitmq apply -f https://raw.githubusercontent.com/latonaio/rabbitmq-on-kubernetes/main/deployment.yml
起動確認
kubectl -n rabbitmq get deployment
RabbitMQ Management UI(port 15672)を起動
外部からブラウザで起動します。
your_hostには、Kubernetesを動かしているホストを指定します。
http://your_host:15672
Username、Pasword共に「guest」でログインできます。
RabbitMQの導入とUI起動が成功しました。
batch job(その1):docker buildまで
CronWorkflowの中で実行する各ジョブを開発します。
CronWorkflowは複数の種類のジョブを実行できますが、本記事では一種類のみ作りました。
ファイル一覧とソースコード
src directory
├── send.py
├── requirements.txt
└── Dockerfile
send.py
pikaライブラリを使用して、RabbitMQにメッセージをパブリッシュします。
3つのプログラム引数を渡す必要があります。
- host:RabbitMQのホスト名
- queue:キュー名
- message:メッセージ
import json
import sys
import traceback
import pika
def main():
host = sys.argv[1]
queue = sys.argv[2]
message = sys.argv[3]
param = pika.ConnectionParameters(host=host)
connection = pika.BlockingConnection(param)
channel = connection.channel()
channel.queue_declare(queue=queue)
channel.basic_publish(exchange="", routing_key=queue, body=json.dumps({"message": message}))
connection.close()
if __name__ == "__main__":
try:
main()
except Exception:
traceback.print_exc()
requirements.txt
pika
Dockerfile
FROM python:3-slim-bullseye
RUN set -e; \
apt-get update -y && apt-get install -y \
lsb-release; \
apt-get clean
RUN pip install --upgrade pip
RUN pip install --upgrade setuptools
ENV PYTHONUNBUFFERED 1
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY ./send.py ./
COPY ./requirements.txt ./
RUN pip install -r requirements.txt
ENV PORT 5672
ENTRYPOINT ["python", "send.py"]
CMD ["dummy"]
Dockerビルド
docker build ./ -t batch-job
Dockerイメージをkindに登録
kind load docker-image batch-job --name=batch-cluster
batch jobの開発&docker buildまでできました。
RabbitMQ(その2)& batch job(その2):RabbitMQとbatch jobを同時に動作確認
パブリッシュ側(send.py)の動作確認
kubectl debugコマンドを使って動作確認を行います。
記事執筆時点のマニフェストを使った場合は、以下のコマンドでデバッグ作業に入れました。
マニフェストが更新されたら、コマンドを調整する必要があるかもしれません。
kubectl -n rabbitmq debug \
$(kubectl -n rabbitmq get --no-headers=true pods -l run=rabbitmq -o custom-columns=:metadata.name) \
--image=batch-job \
--image-pull-policy=Never \
-it \
-- sh
デバッグに入ってのシェルで、send.pyを実行します。
python send.py rabbitmq.rabbitmq foo-job test-message
RabbitMQのUIを見て、キューにメッセージが溜まれば、ひとまずOKです。
サブスクライブ側(receive.py)の開発と動作確認
外部のマシンで以下のコードを用意します。
pikaライブラリを使用して、RabbitMQからメッセージを受信します。
2つのプログラム引数を渡す必要があります。
- host:RabbitMQのホスト名
- queue:キュー名
import sys
import traceback
import pika
def main():
host = sys.argv[1]
queue = sys.argv[2]
def callback(ch, method, properties, body):
print(" [x] Received %r" % body)
param = pika.ConnectionParameters(host=host)
connection = pika.BlockingConnection(param)
channel = connection.channel()
channel.queue_declare(queue=queue)
channel.basic_consume(queue=queue, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
if __name__ == "__main__":
try:
main()
except Exception:
traceback.print_exc()
pip等でpikaライブラリをインストール後、以下を実行します。
your_hostには、Kubernetesを動かしているホストを指定します。
python receive.py your_host foo-job
RabbitMQのキューに溜まっていたテストメッセージが受信されればOKです。
管理UIでも、キューのメッセージが0件になったことを確認します。
RabbitMQとbatch jobの動作確認ができました。
receive.pyを終了させ、Kerbernetes側のホストでOSのシェルに戻っておきます。
Argo Workflowsのインストールとargo-serverのUI起動
インストール
このページの手順でインストールします。
本記事執筆時点で、Argo Workflowsの最新バージョンはv3.4.11でした。
kubectl -n argo apply -f https://github.com/argoproj/argo-workflows/releases/download/v3.4.11/install.yaml
argo-serverの設定変更
本記事ではhttpsでなくhttpに統一しますので、kubectl patchコマンドを使って、argo-serverの設定をそのように変更します。
kubectl patch deployment \
argo-server \
--namespace argo \
--type='json' \
-p='[
{
"op": "replace",
"path": "/spec/template/spec/containers/0/readinessProbe/httpGet/scheme",
"value": "HTTP"
},
{
"op": "replace",
"path": "/spec/template/spec/containers/0/args",
"value": [
"server",
"--auth-mode=server",
"--secure=false"
]
}
]'
変更内容確認
kubectl -n argo get deployment argo-server -o yaml
argo-server用のServiceを起動
以下のマニフェストファイルを用意します。
記事冒頭で書きましたように、NodePortタイプのServiceで外部公開します。
apiVersion: v1
kind: Service
metadata:
name: argo-server-nodeport
namespace: argo
spec:
type: NodePort
ports:
- name: web
port: 2746
targetPort: 2746
nodePort: 32746
selector:
app: argo-server
Serviceを起動します。
kubectl apply -f argo-server-nodeport.yaml
起動確認
kubectl -n argo get svc
argo-server(port 2746)を起動
外部からブラウザで起動します。
your_hostには、Kubernetesを動かしているホストを指定します。
http://your_host:2746
argo-serverのUIを起動できました。
Admin Server(Go言語)
※ この記事と内容がダブります。
client-goを使用して、CronJobやCronWorkflowのscheduleを変更するAPIを開発します。
こちらのソースコードを参考にしました。
ファイル一覧とソースコード
src directory
├── admin_server.go
├── go.mod
└── Dockerfile
admin_server.go
WebフレームワークはEchoを使用しました。
起動時にプログラム引数として、namespace名を1つ渡す必要があります。
APIは以下の4つです。
-
GET /<<namespace>>/cronjobs
namespaceに属するCronJobの一覧を取得します。 -
PUT /<<namespace>>/cronjobs
CronJobのscheduleをupdateします。 -
GET /<<namespace>/cronworkflows
namespaceに属するCronWorkflowの一覧を取得します。 -
PUT /<<namespace>>/cronworkflows
CronWorkflowのscheduleをupdateします。
※ 本記事ではCronJobは使用しません。
package main
import (
"context"
argoclientset "github.com/argoproj/argo-workflows/v3/pkg/client/clientset/versioned"
"github.com/labstack/echo/v4"
"github.com/labstack/echo/v4/middleware"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/client-go/util/homedir"
"net/http"
"os"
"path/filepath"
)
type httpError struct {
Error string `json:"error"`
}
var (
namespace string
clientset *kubernetes.Clientset
argoClientset *argoclientset.Clientset
)
func getCronjobs(c echo.Context) error {
// レスポンス初期化
type cronjob struct {
Name string `json:"name"`
Schedule string `json:"schedule"`
}
type responseType struct {
Data []cronjob `json:"data"`
}
var response responseType
response.Data = []cronjob{}
// CronJobリスト取得
cjobs, err := clientset.BatchV1().CronJobs(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
}
// 各CronJobをレスポンスに追加
for _, cjob := range cjobs.Items {
cj := cronjob{
Name: cjob.Name,
Schedule: cjob.Spec.Schedule,
}
response.Data = append(response.Data, cj)
}
return c.JSON(http.StatusOK, response)
}
func updateCronjob(c echo.Context) error {
// リクエストボディ&レスポンスの型
type cronjob struct {
Name string `json:"name"`
Schedule string `json:"schedule"`
}
// リクエストボディを構造体にバインド
reqb := cronjob{}
if err := c.Bind(&reqb); err != nil {
return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
}
cjClient := clientset.BatchV1().CronJobs(namespace)
// CronJob取得
cj, err := cjClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
if err != nil {
return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
}
// スケジュール変更
cj.Spec.Schedule = reqb.Schedule
// 更新適用
resultCj, err := cjClient.Update(context.TODO(), cj, metav1.UpdateOptions{})
if err != nil {
return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
}
// 更新された内容をレスポンスにする
response := cronjob{
Name: resultCj.Name,
Schedule: resultCj.Spec.Schedule,
}
return c.JSON(http.StatusOK, response)
}
func getCronworkflows(c echo.Context) error {
// レスポンス初期化
type cronworkflow struct {
Name string `json:"name"`
Schedule string `json:"schedule"`
}
type responseType struct {
Data []cronworkflow `json:"data"`
}
var response responseType
response.Data = []cronworkflow{}
// CronWorkflowリスト取得
cworkflows, err := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace).List(context.TODO(), metav1.ListOptions{})
if err != nil {
return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
}
// 各CronWorkflowをレスポンスに追加
for _, cworkflow := range cworkflows.Items {
cw := cronworkflow{
Name: cworkflow.Name,
Schedule: cworkflow.Spec.Schedule,
}
response.Data = append(response.Data, cw)
}
return c.JSON(http.StatusOK, response)
}
func updateCronworkflow(c echo.Context) error {
// リクエストボディ&レスポンスの型
type cronworkflow struct {
Name string `json:"name"`
Schedule string `json:"schedule"`
}
// リクエストボディを構造体にバインド
reqb := cronworkflow{}
if err := c.Bind(&reqb); err != nil {
return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
}
cwClient := argoClientset.ArgoprojV1alpha1().CronWorkflows(namespace)
// CronWorkflow取得
cw, err := cwClient.Get(context.TODO(), reqb.Name, metav1.GetOptions{})
if err != nil {
return c.JSON(http.StatusBadRequest, httpError{Error: err.Error()})
}
// スケジュール変更
cw.Spec.Schedule = reqb.Schedule
// 更新適用
resultCw, err := cwClient.Update(context.TODO(), cw, metav1.UpdateOptions{})
if err != nil {
return c.JSON(http.StatusInternalServerError, httpError{Error: err.Error()})
}
// 更新された内容をレスポンスにする
response := cronworkflow{
Name: resultCw.Name,
Schedule: resultCw.Spec.Schedule,
}
return c.JSON(http.StatusOK, response)
}
// 開発用。本番コードでは不要
func outClusterConfig() (*rest.Config, error) {
// Create client
kubeconfig, ok := os.LookupEnv("KUBECONFIG")
if !ok {
kubeconfig = filepath.Join(homedir.HomeDir(), ".kube", "config")
}
return clientcmd.BuildConfigFromFlags("", kubeconfig)
}
func main() {
// Program argsでKubernetesのnamespaceを指定
if len(os.Args) != 2 {
panic("assert len(os.Args) == 2")
}
namespace = os.Args[1]
// creates the in-cluster config
config, err := rest.InClusterConfig()
if err != nil {
config, err = outClusterConfig() // 開発用。本番コードでは消す
if err != nil {
panic(err.Error())
}
}
// creates the clientset and argoClientset
clientset, err = kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}
argoClientset, err = argoclientset.NewForConfig(config)
if err != nil {
panic(err.Error())
}
// EchoでAPIサーバー
e := echo.New()
e.Use(middleware.Logger())
e.Use(middleware.Recover())
// Routing
api := e.Group("/" + namespace)
api.GET("/cronjobs", getCronjobs)
api.PUT("/cronjobs", updateCronjob)
api.GET("/cronworkflows", getCronworkflows)
api.PUT("/cronworkflows", updateCronworkflow)
e.Logger.Fatal(e.Start(":1323"))
}
go.mod
module admin_server
go 1.20
require (
github.com/argoproj/argo-workflows/v3 v3.4.11
github.com/labstack/echo/v4 v4.11.1
k8s.io/apimachinery v0.28.2
k8s.io/client-go v0.28.2
)
require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/emicklei/go-restful/v3 v3.10.0 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-openapi/jsonpointer v0.19.6 // indirect
github.com/go-openapi/jsonreference v0.20.2 // indirect
github.com/go-openapi/swag v0.22.3 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt v3.2.2+incompatible // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/google/gnostic-models v0.6.8 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/uuid v1.3.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
github.com/imdario/mergo v0.3.13 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/labstack/gommon v0.4.0 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.19 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/valyala/bytebufferpool v1.0.0 // indirect
github.com/valyala/fasttemplate v1.2.2 // indirect
golang.org/x/crypto v0.12.0 // indirect
golang.org/x/net v0.14.0 // indirect
golang.org/x/oauth2 v0.11.0 // indirect
golang.org/x/sys v0.11.0 // indirect
golang.org/x/term v0.11.0 // indirect
golang.org/x/text v0.12.0 // indirect
golang.org/x/time v0.3.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230720185612-659f7aaaa771 // indirect
google.golang.org/grpc v1.56.2 // indirect
google.golang.org/protobuf v1.31.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
k8s.io/api v0.28.2 // indirect
k8s.io/klog/v2 v2.100.1 // indirect
k8s.io/kube-openapi v0.0.0-20230717233707-2695361300d9 // indirect
k8s.io/utils v0.0.0-20230406110748-d93618cff8a2 // indirect
sigs.k8s.io/json v0.0.0-20221116044647-bc3834ca7abd // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
)
Dockerfile
FROM golang:bullseye as builder
WORKDIR /app
COPY ./admin_server.go ./
COPY ./go.mod ./
RUN go mod tidy
RUN go build -o admin_server admin_server.go
FROM debian:bullseye
WORKDIR /app
COPY --from=builder /app/admin_server /app/
ENTRYPOINT ["/app/admin_server"]
Dockerビルド
docker build ./ -t admin-server
Dockerイメージをkindに登録
kind load docker-image admin-server --name=batch-cluster
Admin Server用サービスアカウントの作成
Admin ServerをPodで動かすために必要な、サービスアカウントを作成します。
以下のマニフェストファイルを用意します。
apiVersion: v1
kind: ServiceAccount
metadata:
name: admin-sa
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: admin-role
rules:
- apiGroups:
- batch
verbs:
- list
- get
- update
resources:
- cronjobs
- apiGroups:
- argoproj.io
verbs:
- list
- get
- update
resources:
- cronworkflows
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: admin-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: admin-role
subjects:
- kind: ServiceAccount
name: admin-sa
サービスアカウントを作成します。
kubectl -n foo apply -f admin-sa.yaml
Admin Server用Serviceの作成
以下のマニフェストファイルを用意します。
- 先ほど作成したサービスアカウント「admin-sa」を指定します。
- Admin Server起動時のプログラム引数として、namespace名(本記事では"foo")を渡すように設定します。
- 記事冒頭で書きましたように、NodePortタイプのServiceで外部公開します。
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: foo-admin-deployment
namespace: foo
labels:
app: foo-admin-server
spec:
replicas: 1
selector:
matchLabels:
app: foo-admin-server
template:
metadata:
labels:
app: foo-admin-server
spec:
serviceAccountName: admin-sa
containers:
- name: foo-admin-container
image: admin-server:latest
imagePullPolicy: Never
args: ["foo"]
---
apiVersion: v1
kind: Service
metadata:
name: foo-admin-nodeport
namespace: foo
labels:
app: foo-admin-server
spec:
type: NodePort
ports:
- port: 1323
targetPort: 1323
nodePort: 31323
selector:
app: foo-admin-server
Serviceを起動します。
kubectl apply -f foo-admin-nodeport.yaml
起動確認
kubectl -n foo get pod
Serviceの動作確認
ここで一旦、Admin ServerのAPIを動作確認しておきます。
動作確認用CronWorkflowを起動
公式サイトのexamplesのマニフェストファイルをそのまま使用しました。
kubectl -n foo apply -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cron-workflow.yaml
起動確認
argo -n foo cron list
CronWorkflow名がhello-worldであることを覚えておきます。
APIを呼んでみる
外から2つのAPIを呼んでみます。
your_hostには、Kubernetesを動かしているホストを指定します。
curl http://your_host:1323/foo/cronworkflows
curl --location --request PUT 'http://your_host:1323/foo/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "hello-world", "schedule": "1 0 * * *"}'
APIが正常に動くことが確認できました。
動作確認用CronWorkflowを削除
kubectl applyコマンドで作ったCronWorkflowを、kubectl deleteコマンドで削除します。
kubectl -n foo delete -f https://raw.githubusercontent.com/argoproj/argo-workflows/master/examples/cron-workflow.yaml
別の方法として、argo-serverのUIから削除することもできます。
Admin Serverの一旦の動作確認まで完了しました。
CronWorkflow
CronWorkflowを起動する前に、あらかじめreceive.pyを外部で起動しておきます。
your_hostには、Kubernetesを動かしているホストを指定します。
python receive.py your_host foo-job
次に、1分毎にスケジュール実行を行うCronWorkflowを作成します。
以下のマニフェストファイルを用意します。
既に構築したbatch-jobを2回、直列に実行します。
apiVersion: argoproj.io/v1alpha1
kind: CronWorkflow
metadata:
name: foo-cronworkflow
namespace: foo
spec:
schedule: "* * * * *"
timezone: "Asia/Tokyo"
concurrencyPolicy: "Forbid"
startingDeadlineSeconds: 0
workflowSpec:
entrypoint: foo-entrypoint
onExit: finally-container
templates:
- name: foo-entrypoint
dag:
tasks:
- name: job1
template: job1-container
- name: job2
depends: job1
template: job2-container
- name: job1-container
container:
image: batch-job
imagePullPolicy: Never
command: ["python", "send.py"]
args: ["rabbitmq.rabbitmq", "foo-job", "job1"]
- name: job2-container
container:
image: batch-job
imagePullPolicy: Never
command: ["python", "send.py"]
args: ["rabbitmq.rabbitmq", "foo-job", "job2"]
- name: finally-container
container:
image: batch-job
imagePullPolicy: Never
command: ["sh", "-c"]
args:
- |
echo finally
CronWorkflowを起動します。
kubectl apply -f foo-cronworkflow.yaml
起動確認
argo -n foo cron list
事前に起動したreceive.py側を見ると、1分毎にワークフローからのメッセージが受信され続けました。
argo-serverのUIで、ワークフローをリアルタイムに確認できます。
CronWorkflowを動作確認できました。
Admin Serverで、起動中のCronWorkflowのscheduleを操作する
外部からAdmin ServerのAPIを呼んで、CronWorkflowのscheduleをリアルタイムに変更してみます。
your_hostには、Kubernetesを動かしているホストを指定します。
まずGET APIで現在のスケジュールを確認すると、1分毎の実行になっています。
curl http://your_host:1323/foo/cronworkflows
次にPUT APIでスケジュールを毎日0時0分に変更します。
curl --location --request PUT 'http://your_host:1323/foo/cronworkflows' --header 'Content-Type: application/json' --data '{"name": "foo-cronworkflow", "schedule": "0 0 * * *"}'
receive.py側を見ると、メッセージの受信が止まっていました。
0時0分までこのまま動かないはずです。
Admin ServerのAPIで、CronWorkflowのscheduleを変更できました。
後始末
Kubernetesクラスタを削除するコマンドです。
kind delete clusters batch-cluster







































