まとめ
- ttlafterfinishedcontrollerが処理している
- Job作成または、Job更新時にEventHandlerによりWorkQueueにjobが追加される
- Controllerは
processNextWorkItem
でWorkQueue内のItemを処理する - JobがCompleted or Failedの状態で、
TTLSecondsAfterFinished
が設定されていて、そのTTLが過ぎている場合に削除される - expireしていない場合には、expireするであろう時間分を待ってenqueueして処理するようになっている
Controller
entrypoint: tc.worker
processNextWorkItem
をFor loopで実行する
func (tc *Controller) worker(ctx context.Context) {
for tc.processNextWorkItem(ctx) {
}
}
processNextWorkItem
queueからitemを取り出し、processJobで処理をする。
func (tc *Controller) processNextWorkItem(ctx context.Context) bool {
key, quit := tc.queue.Get()
if quit {
return false
}
defer tc.queue.Done(key)
err := tc.processJob(ctx, key)
tc.handleErr(err, key)
return true
}
processJob
jobのStatusをチェックして、
- Jobが終了状態かつTTLを超えていたらjobを削除する
- Jobが終了していない場合、またはTTLがExpireしてない場合には、残りのTTLの時間を指定してqueueに追加する
func (tc *Controller) processJob(ctx context.Context, key string) error {
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
job, err := tc.jLister.Jobs(namespace).Get(name)
logger := klog.FromContext(ctx)
logger.V(4).Info("Checking if Job is ready for cleanup", "job", klog.KRef(namespace, name))
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
if expiredAt, err := tc.processTTL(logger, job); err != nil {
return err
} else if expiredAt == nil {
return nil
}
// The Job's TTL is assumed to have expired, but the Job TTL might be stale.
// Before deleting the Job, do a final sanity check.
// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
// The latest Job may have a different UID, but it's fine because the checks will be run again.
fresh, err := tc.client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}
// Use the latest Job TTL to see if the TTL truly expires.
expiredAt, err := tc.processTTL(logger, fresh)
if err != nil {
return err
} else if expiredAt == nil {
return nil
}
// Cascade deletes the Jobs if TTL truly expires.
policy := metav1.DeletePropagationForeground
options := metav1.DeleteOptions{
PropagationPolicy: &policy,
Preconditions: &metav1.Preconditions{UID: &fresh.UID},
}
logger.V(4).Info("Cleaning up Job", "job", klog.KObj(fresh))
if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(ctx, fresh.Name, options); err != nil {
return err
}
metrics.JobDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds())
return nil
}
processTTL
processJob
の中で重要なLogicの一つでTTLを確認する関数で、needsCleanup
でCleanUpする必要があるかどうかをまずチェックし、CleanUpする必要がなければ、nilを返す。processJob
の中で expiredAtがnilの場合にはjobの削除は行われない。
jobのDeletionTimestampに値が入っていない(jobが削除された状態でない)かつ、needCleanUpがTrueであった場合は以下のロジックが実行される。
- expireしている場合は、expiredAtに値を入れて返す。
processJob
の中でjobの削除が行われる - expiredAtがまだ来てない場合には、nilを返す。
processJob
の中でexpiredAtがnilの場合にはjobの削除は行われない
// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
// if the TTL will expire later.
func (tc *Controller) processTTL(logger klog.Logger, job *batch.Job) (expiredAt *time.Time, err error) {
// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
if job.DeletionTimestamp != nil || !needsCleanup(job) {
return nil, nil
}
now := tc.clock.Now()
t, e, err := timeLeft(logger, job, &now)
if err != nil {
return nil, err
}
// TTL has expired
if *t <= 0 {
return e, nil
}
tc.enqueueAfter(job, *t)
return nil, nil
}
needsCleanUp
Cleanupするかどうかの条件は、TTLSecondsAfterFinishedが設定してあるかつIsJobFinished
がTrueである。
// needsCleanup checks whether a Job has finished and has a TTL set.
func needsCleanup(j *batch.Job) bool {
return j.Spec.TTLSecondsAfterFinished != nil && jobutil.IsJobFinished(j)
}
IsJobFinished
IsJobFinished
は、FinishedConditionの結果
// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
func IsJobFinished(j *batch.Job) bool {
isFinished, _ := FinishedCondition(j)
return isFinished
}
FinishedCondition
Status.Conditions
のそれぞれに対してloopを回し、JobComplete
or JobFailed
だった場合に trueを返し、すべてのConditionsでこれらにマッチしなかった場合はfalseを返す。
// FinishedCondition returns true if a job is finished as well as the condition type indicating that.
// Returns false and no condition type otherwise
func FinishedCondition(j *batch.Job) (bool, batch.JobConditionType) {
for _, c := range j.Status.Conditions {
if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
return true, c.Type
}
}
return false, ""
}
EventHandler
jobInformerに追加されているEventHandlerを見るとどの条件で、Controllerのworkqueueに処理するitemを入れるのかがわかる。
AddFunc
とUpdateFunc
でjobを追加しているので、 Job作成時とJob更新時にqueueに入れられ、上で見たForloopで処理が行われることがわかる。
jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
tc.addJob(logger, obj)
},
UpdateFunc: func(oldObj, newObj interface{}) {
tc.updateJob(logger, oldObj, newObj)
},
})