はじめに
Kubernetes では Job を実行するための機能として CronJob と Job というリソースが用意されていますが、そこまで機能が豊富なわけではないのでユースケースによっては機能が足りない場合が出てきます。
その場合には現状では以下のいずれかを用いるケースが多いです。
-
ワークフローツールを使用する
e.g. Argo Workflows、Apache Airflow etc -
カスタムコントローラーを使って機能を充実させる
e.g. Kueue、kube-batch etc
今回はその中で、カスタムコントローラーを使って機能を充実されるアプローチを選択した Furiko というプロダクトについて紹介してきます。
Furiko とは
The word "furiko" (振り子) means "pendulum" in Japanese.
Logo designed by Duan Weiwei, distributed under CC-BY 4.0.
Rundeck を用いて Job を実行するプラットフォームを運用しており、それを k8s 上に移行を検討したところ、既存の k8s の CronJob/Job の機能では要件を満たせたかったために独自に開発されたそうです。
Furiko で Job を実行する時の考え方
Furiko では k8s における CronJob-> Job -> Pod
の関係と同じように JobConfig -> Job -> Task
という考え方が存在します。
(Furiko で Job という単語を使用する際は k8s の batch/v1 の Job ではなく Furiko 独自の execution.furiko.io/v1alpha1 の Job のことを意味します。)
実際の Manifest は以下のようになります。
apiVersion: execution.furiko.io/v1alpha1
kind: JobConfig
metadata:
name: example-jobconfig
spec:
# Concurrency configuration.
concurrency:
policy: Forbid # Allow or Forbid or Enqueue
maxConcurrency: 1
# Schedule configuration.
schedule:
cron:
expression: "H H/3 * * *"
timezone: "Asia/Singapore"
disabled: false
constraints:
notBefore: "2022-01-01T00:00:00-03:00"
notAfter: "2023-01-01T00:00:00-03:00"
# Job options.
option:
options:
- type: String
name: username
label: Username
string:
default: Example User
trimSpaces: true
# Template for the Job to be created. For more info, see the Job sample configuration.
template:
# Any labels and annotations will be automatically added to downstream Jobs.
metadata:
annotations:
annotations.furiko.io/job-group: "cool-jobs"
spec:
# Specifies maximum number of attempts for each task, defaults to 1.
maxAttempts: 3
# Optional delay between each task retry.
retryDelaySeconds: 10
# Optional duration in seconds for how long each task should be pending for
# until it gets killed.
taskPendingTimeoutSeconds: 1800
# The template for each task to be created by the Job.
taskTemplate:
# Specify how to create the task as a Pod. This is just a PodTemplateSpec.
pod:
spec:
containers:
# Notice how we can use context variables and job options inside
# the PodSpec freely to be substituted at runtime.
- name: job-container
args:
- echo
- "Hello world, ${option.username}!"
env:
- name: JOBCONFIG_NAME
value: "${jobconfig.name}"
- name: JOB_NAME
value: "${job.name}"
image: "alpine"
resources:
limits:
cpu: 100m
memory: 64Mi
書き方はとても CronJob に似ており、これが Job の定義になります。
基本的な使い方は JobConfig
を作成して以下の k8s にデプロイして使用します。
$ kubectl apply -f example-jobconfig.yaml
jobconfig.execution.furiko.io/example-jobconfig created
Furiko には独自の CLI が用意されているので、それを用いて JobConfig
から Job を作成してアドホックに実行することもできます。
$ furiko run example-jobconfig
Job default/example-jobconfig-klptm created
以下のように独自の CLI を用いて Job の情報を取得することができます。
$ furiko get job example-jobconfig-klptm
JOB INFO
Name: example-jobconfig-klptm
Namespace: default
Type: Adhoc
Created: Wed, 14 Dec 2022 01:15:58 +08:00 (22 seconds ago)
Job Config: forbid-example
Concurrency Policy: Forbid
JOB STATUS
Phase: Succeeded
Started: Wed, 14 Dec 2022 01:15:58 +08:00 (22 seconds ago)
Tasks: 0 Running / 1 Succeeded
Run Duration: About a second
Result: Success
LATEST TASK
Name: example-jobconfig-klptm-gezdqo-0
Created: Wed, 14 Dec 2022 01:15:58 +08:00 (22 seconds ago)
State: Terminated
Started: Wed, 14 Dec 2022 01:16:00 +08:00 (20 seconds ago)
Finished: Wed, 14 Dec 2022 01:16:00 +08:00 (20 seconds ago)
通常の kubectl get
や kubectl describe
を使用しても情報を取得することができます。
$ kubectl get furikojob example-jobconfig-q4dw9 -o yaml
apiVersion: execution.furiko.io/v1alpha1
kind: Job
metadata:
creationTimestamp: "2022-12-22T06:31:11Z"
finalizers:
- execution.furiko.io/delete-dependents-finalizer
generateName: example-jobconfig-
generation: 1
labels:
execution.furiko.io/job-config-uid: 7d95384a-3cd4-49eb-8a47-d8d2fbca027f
name: example-jobconfig-q4dw9
namespace: default
ownerReferences:
- apiVersion: execution.furiko.io/v1alpha1
blockOwnerDeletion: true
controller: true
kind: JobConfig
name: example-jobconfig
uid: 7d95384a-3cd4-49eb-8a47-d8d2fbca027f
resourceVersion: "155365"
uid: f68e732f-64b3-40a1-953a-6eced8e6c536
spec:
startPolicy:
concurrencyPolicy: Forbid
substitutions:
jobconfig.cron_schedule: '*/5 * * * *'
jobconfig.cron_timezone: ""
jobconfig.name: example-jobconfig
jobconfig.namespace: default
jobconfig.uid: 7d95384a-3cd4-49eb-8a47-d8d2fbca027f
template:
maxAttempts: 1
taskTemplate:
pod:
metadata:
creationTimestamp: null
spec:
containers:
- command:
- echo
- Hello World
image: alpine
name: container
resources: {}
restartPolicy: Never
ttlSecondsAfterFinished: 3600
type: Adhoc
status:
condition:
finished:
finishTime: "2022-12-22T06:31:15Z"
latestCreationTimestamp: "2022-12-22T06:31:11Z"
latestRunningTimestamp: "2022-12-22T06:31:15Z"
result: Success
createdTasks: 1
phase: Succeeded
runningTasks: 0
startTime: "2022-12-22T06:31:11Z"
tasks:
- containerStates:
- containerID: containerd://b644e9726aa7cb553f2f1502a980ad660fc68d5b2f88cacc728a72d70dd5ff34
exitCode: 0
reason: Completed
creationTimestamp: "2022-12-22T06:31:11Z"
deletedStatus:
result: Succeeded
state: Terminated
finishTimestamp: "2022-12-22T06:31:15Z"
name: example-jobconfig-q4dw9-gezdqo-0
nodeName: kind-worker
parallelIndex:
indexNumber: 0
retryIndex: 0
runningTimestamp: "2022-12-22T06:31:15Z"
status:
result: Succeeded
state: Terminated
Furiko の特徴
k8s の既存の CronJob/Job と比較した際の Furiko の特徴については、Furiko の公式ドキュメントの機能比較表を元に、k8s にはない機能に絞って以降で説明していきます。
公式の比較表は、比較対象となっている k8s のバージョンが低いようでしたので、k8s v1.26.0 の機能との差分を元に再整理した表が以下になります。
- Scheduling
Feature | Furiko | batch/v1 |
---|---|---|
Cron expressions | ✅ | ✅ (cronjob.spec.schedule により実現) |
Cron timezone | ✅ | ✅ (k8s 1.25 より Beta。cronjob.spec.timeZone により実現) |
Scheduling constraints | ✅ | ❌ |
Cron load balancing | ✅ | ❌ |
Enqueue jobs for later | ✅ | ❌ |
Forbid concurrent with adhoc execution | ✅ | ❌ |
Back-scheduling | ✅ | ✅ (cronjob.spec.startingDeadlineSeconds により実現) |
- Task Execution
Feature | Furiko | batch/v1 |
---|---|---|
Retries using separate Pods | ✅ | ❌ |
Pending timeouts for dead nodes | ✅ | ❌ |
Multiple parallel Pods per Job | ✅ | ✅ (k8s 1.22 より Beta。job.spec.completionMode により実現) |
Parallel expansion by list and matrix | ✅ | ❌ |
- General
Feature | Furiko | batch/v1 |
---|---|---|
Automatic cleanup with TTL | ✅ | ✅ (k8s 1.21 より Beta。job.spec.ttlSecondsAfterFinished により実現) |
Parameterization of job inputs | ✅ | ❌ |
上記のうち、 Furiko にのみ ✅ がついている機能について紹介していきます。
のマークがついているのもは筆者の個人的な感想になります。
Scheduling
スケジュールに関する機能になります。
Scheduling constraints
schedule に対して notBefore
と notAfter
で CronJob が実行できる範囲を指定できます。
schedule:
cron:
expression: "H H/3 * * *"
disabled: false
+ constraints:
+ notBefore: "2022-01-01T00:00:00-03:00"
+ notAfter: "2023-01-01T00:00:00-03:00"
該当の Job は何日以降から実行したいケースはあると思うので結構便利な機能だなと思います。
Cron load balancing
Furiko では unix-cron に加えて H
を使用できます。
schedule:
cron:
+ expression: "H H/3 * * *"
H
を使用すると指定した時間区間の範囲内で一意のランダムな値が設定されるので、多数の CronJob が存在する場合に実行時刻がずらせるというメリットがあります。
H
については以下の 4 つの使い方がサポートされています。
使用方法 | 説明 |
---|---|
H |
H * * * * : この場合は、1時間毎に H 分に1回実行されます。 |
H/6 |
0 H/6 * * * : AM 0〜5時のいずれかで実行され、以降は6時間毎に実行されます。H=0: 00:00 に実行され、以降は6時間毎に実行 H=2: 02:00 に実行され、以降は6時間毎に実行 |
H(5-19) |
H(5-19) * * * * : 1時間毎に1回実行されます。その際の実行分は5〜19分のいずれかになります。H=5: 00:05 に実行される。 H=12: 00:12 に実行される。 |
H(5-19)/5 |
H(5-19)/5 * * * * : 5〜19分の範囲で実行され、以降は5分毎に実行されます。H=0: 00:05, 00:10, 00:15, 01:05, 01:10, 01:15, ... H=2: 00:07, 00:12, 00:17, 01:07, 01:12, 01:17, .. |
k8s の CronJob で実行時刻をずらそうとすると以下の2つのいずれかをやる必要があると思います。
- CronJob 毎に実行時刻を変える
- 実行する Job に対して jitter を実装する
CronJob の時刻を変えるのは管理が煩雑で面倒です。jitter を入れる場合は機能追加するのが手間なのに加えて、Pod が割り当てられた後に実行を待つので request に指定された分のリソースが確保された状態で、アプリケーションが起動しない状態となるので、リソースの管理の面で少し勿体無く感じます。
正直、この機能は k8s の方に導入してほしい。実行時間をずらす作業がだるいので私、すぐ使いたいです。
Enqueue jobs for later
以下のように JobConfig に対して同時実行に対するポリシーを設定できます。
spec:
concurrency:
+ policy: Forbid # Allow or Forbid or Enqueue
+ maxConcurrency: 1
設定できるポリシーは以下の3種類です。
Policy | 説明 |
---|---|
Allow |
同時に実行できる Job に制限はなし |
Forbid |
複数の Job の同時実行を禁止 |
Enqueue |
Job をキューに入れて他の Job が完了した後で実行を開始する |
k8s にある Replace
がなくて代わりに Enqueue
が指定できるのが違いになります。
k8s の CronJob で parallelism=1
を設定した時も似たような挙動にできそうなので、ウリはウリなんだと思いますが、そこまで差別化のポイントでもないのかなと思っています。
Forbid concurrent with adhoc execution
Job をアドホックに実行する際に JobConfig で設定した Forbid
のポリシーが Job でも適用されるのが k8s との差分になります。
Furiko では独自の CLI により JobConfig から Job をアドホックに生成することができます。k8s で Cronjob から Job を生成した時にはポリシーが適用されないけど、Furiko だと適用されるというのがウリになります。
- Furiko の CLI で JobConfig から Job を作成する場合
$ furiko run <JobConfig名>
- k8s で CronJob から Job を作成する場合
$ kubectl create job <NAME> --from=cronjob/<name>
個人的にそんなにここを推さなくてもいいのでは?と思った機能になります。実際の運用中に Job が失敗した時のリカバリで手動で実行する時に制限がかかると嬉しいかも。開発期間中の手動実行だと同時実行ポリシーはなくても嬉しいケースもあるかなと思いました。
Task Execution
Retries using separate Pods
ノードの設定誤りやホストの問題で Job が実行した際に、同一のノードで再度 Job を実行しても失敗する可能性があるためにリトライ時に別のノードで Job を実行する機能が追加されています。
k8s の Job でも spec.restartPolicy=Never
に設定すれば失敗時に Pod の再生成をしてくれますし。ノードの問題についてはノードのヘルチェックの方で確認して、問題がある場合はノードを unschedulable
にして Pod のスケジュール対象にならないようにするアプローチの方が良いかなと個人的には思うので、あまり心は惹かれなかったです。
リトライに関しては Job の失敗時の遅延時間を指定できる retryDelaySeconds
の機能が便利だなと思いました。Job の失敗時に一旦、時間を置きたいケースは多いと思うので汎用的に使えるいい機能だなと思いました。
spec:
# Optional delay between each task retry.
+ retryDelaySeconds: 10
Pending timeouts for dead nodes
これは Pending に対してタイムアウトを設定できる機能になります。
spec:
# Optional duration in seconds for how long each task should be pending for
# until it gets killed.
+ taskPendingTimeoutSeconds: 1800
Job が Pending のステータスのままだと、失敗もしないのでアラートで気付けずにそのままになるケースも考えられるので、この機能を使って Job を失敗にできるっていうのは結構便利だし、汎用的に使えるんじゃないかと思いました。こういう機能を見るのは運用していく上で勉強になりますね。
Parallel expansion by list and matrix
並列に Job を実行する際に各 Job 毎に個別にパラメータを与えられる機能になります。
文章だけだとわかりにくいので、実際の例を用いて説明します
以下のように spec.parallelism.withMatrix
に付与したいパラメータの key/value を設定します。
実行する Task の環境変数の方に ${task.index_matrix.goos}
のよう形で設定します。
apiVersion: execution.furiko.io/v1alpha1
kind: Job
spec:
parallelism:
+ withMatrix:
+ goos:
+ - linux
+ - darwin
+ - windows
+ goarch:
+ - amd64
+ - arm64
completionStrategy: AllSuccessful
template:
taskTemplate:
pod:
spec:
containers:
- name: job-container
image: my-image
env:
+ - name: GOOS
+ value: ${task.index_matrix.goos}
+ - name: GOARCH
+ value: ${task.index_matrix.goarch}
args:
- go build ./...
この時 goos
の組み合わせが3パターン、goarch
の組み合わせが2パターンとなり、組み合わせとして 3 * 2 = 6 パターンがあるので、6個の Task が以下のように並列で実行されます。
タスク | goos | goarch |
---|---|---|
0 | linux | amd64 |
1 | linux | arm64 |
2 | darwin | amd64 |
3 | darwin | arm64 |
4 | windows | amd64 |
5 | windows | arm64 |
k8s でも k8s 1.22 から IndexedJob の機能が利用できるようになり、並列実行時の Job に対してユニークな番号を振れるようになったので、それと Configmap を組み合わせたら似たようなことはできるのですが、こういう形で機能として提供されてると便利ですね。自分で実装するとなると少し手間ですし、こちらの方がパラーメータのバリエーションがそこまで多くないなら見やすいかなと思いました。
Task Execution
Parameterization of job inputs
JobConfig に入力パラメータの定義を追加でき、Job の作成時に入力パラメータをチェックして Task で実行できる機能になります。
これも分かりにくいので、実際の例を用いて説明します。
- JobConfig
apiVersion: execution.furiko.io/v1alpha1
kind: JobConfig
metadata:
generateName: jobconfig-sample
spec:
option:
options:
+ - type: String
+ name: username
label: Username
string:
default: Guest User
trimSpaces: true
required: true
上記のように String での入力を期待する username
を定義します。
- PodTemplateSpec(Taskの定義)
spec:
containers:
- name: job-container
args:
- echo
+ - "Hello world, ${option.username}!"
JobConfig で定義した値を Task で利用する際には option.<オプション名>
で利用することができます。
入力パラメータについては、JobConfig から Job を作成する際にチェックされます。
- JobConfig から CLI で Job を作成する際の例
$ furiko run jobconfig-sample
+Please input option values.
+? Username <-`required: true` に設定したため、入力を求めれらています。
Job default/jobconfig-sample-fds52 created
定義できる種類は以下になります。
Name | Input Type | Description |
---|---|---|
String | string | 文字列 |
Bool | bool | 真偽値 |
Select | string | リスト形式の文字列 |
Multi | []string | 複数の文字列のリスト |
Date | string | RFC3339 形式 |
Job の定義だけを用意しておいて、実行時にパラメータを指定するみたいなことを k8s してしようと思ったことがなかったので面白いなと思いました。普段 shell とかで作ってるものを Job で置き換えたりするのに使うのかな? この機能の開発にするに至った背景とかも併せて知りたいですね。
所感
こういう機能開発には、その環境での開発ノウハウとかが詰まっているので調べみると勉強になって面白かったです。こういう独自のコントローラーで実装されて、その中で k8s でも汎用的に使える機能が k8s の方に取り込まれてプロダクトとして成長していくのかなと感じることができました。