1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Kubernetesの終了/失敗したJobのCleanUpの仕組み (TTLAfterFinishedController)

Last updated at Posted at 2024-07-31

まとめ

  1. ttlafterfinishedcontrollerが処理している
  2. Job作成または、Job更新時にEventHandlerによりWorkQueueにjobが追加される
  3. ControllerはprocessNextWorkItemでWorkQueue内のItemを処理する
  4. JobがCompleted or Failedの状態で、TTLSecondsAfterFinished が設定されていて、そのTTLが過ぎている場合に削除される
  5. expireしていない場合には、expireするであろう時間分を待ってenqueueして処理するようになっている

Controller

ttlafterfinishedcontroller

entrypoint: tc.worker

processNextWorkItemをFor loopで実行する

func (tc *Controller) worker(ctx context.Context) {
	for tc.processNextWorkItem(ctx) {
	}
}

processNextWorkItem

queueからitemを取り出し、processJobで処理をする。

func (tc *Controller) processNextWorkItem(ctx context.Context) bool {
	key, quit := tc.queue.Get()
	if quit {
		return false
	}
	defer tc.queue.Done(key)

	err := tc.processJob(ctx, key)
	tc.handleErr(err, key)

	return true
}

processJob

jobのStatusをチェックして、

  • Jobが終了状態かつTTLを超えていたらjobを削除する
  • Jobが終了していない場合、またはTTLがExpireしてない場合には、残りのTTLの時間を指定してqueueに追加する
func (tc *Controller) processJob(ctx context.Context, key string) error {
	namespace, name, err := cache.SplitMetaNamespaceKey(key)
	if err != nil {
		return err
	}

	// Ignore the Jobs that are already deleted or being deleted, or the ones that don't need clean up.
	job, err := tc.jLister.Jobs(namespace).Get(name)

	logger := klog.FromContext(ctx)
	logger.V(4).Info("Checking if Job is ready for cleanup", "job", klog.KRef(namespace, name))

	if errors.IsNotFound(err) {
		return nil
	}
	if err != nil {
		return err
	}

	if expiredAt, err := tc.processTTL(logger, job); err != nil {
		return err
	} else if expiredAt == nil {
		return nil
	}

	// The Job's TTL is assumed to have expired, but the Job TTL might be stale.
	// Before deleting the Job, do a final sanity check.
	// If TTL is modified before we do this check, we cannot be sure if the TTL truly expires.
	// The latest Job may have a different UID, but it's fine because the checks will be run again.
	fresh, err := tc.client.BatchV1().Jobs(namespace).Get(ctx, name, metav1.GetOptions{})
	if errors.IsNotFound(err) {
		return nil
	}
	if err != nil {
		return err
	}
	// Use the latest Job TTL to see if the TTL truly expires.
	expiredAt, err := tc.processTTL(logger, fresh)
	if err != nil {
		return err
	} else if expiredAt == nil {
		return nil
	}
	// Cascade deletes the Jobs if TTL truly expires.
	policy := metav1.DeletePropagationForeground
	options := metav1.DeleteOptions{
		PropagationPolicy: &policy,
		Preconditions:     &metav1.Preconditions{UID: &fresh.UID},
	}
	logger.V(4).Info("Cleaning up Job", "job", klog.KObj(fresh))
	if err := tc.client.BatchV1().Jobs(fresh.Namespace).Delete(ctx, fresh.Name, options); err != nil {
		return err
	}
	metrics.JobDeletionDurationSeconds.Observe(time.Since(*expiredAt).Seconds())
	return nil
}

processTTL

processJob の中で重要なLogicの一つでTTLを確認する関数で、needsCleanup でCleanUpする必要があるかどうかをまずチェックし、CleanUpする必要がなければ、nilを返す。processJobの中で expiredAtがnilの場合にはjobの削除は行われない。

jobのDeletionTimestampに値が入っていない(jobが削除された状態でない)かつ、needCleanUpがTrueであった場合は以下のロジックが実行される。

  • expireしている場合は、expiredAtに値を入れて返す。 processJobの中でjobの削除が行われる
  • expiredAtがまだ来てない場合には、nilを返す。 processJobの中でexpiredAtがnilの場合にはjobの削除は行われない
// processTTL checks whether a given Job's TTL has expired, and add it to the queue after the TTL is expected to expire
// if the TTL will expire later.
func (tc *Controller) processTTL(logger klog.Logger, job *batch.Job) (expiredAt *time.Time, err error) {

	// We don't care about the Jobs that are going to be deleted, or the ones that don't need clean up.
	if job.DeletionTimestamp != nil || !needsCleanup(job) {
		return nil, nil
	}

	now := tc.clock.Now()
	t, e, err := timeLeft(logger, job, &now)
	if err != nil {
		return nil, err
	}

	// TTL has expired
	if *t <= 0 {
		return e, nil
	}

	tc.enqueueAfter(job, *t)
	return nil, nil
}

needsCleanUp

Cleanupするかどうかの条件は、TTLSecondsAfterFinishedが設定してあるかつIsJobFinishedがTrueである。

// needsCleanup checks whether a Job has finished and has a TTL set.
func needsCleanup(j *batch.Job) bool {
	return j.Spec.TTLSecondsAfterFinished != nil && jobutil.IsJobFinished(j)
}

IsJobFinished

IsJobFinishedは、FinishedConditionの結果

// IsJobFinished checks whether the given Job has finished execution.
// It does not discriminate between successful and failed terminations.
func IsJobFinished(j *batch.Job) bool {
	isFinished, _ := FinishedCondition(j)
	return isFinished
}

FinishedCondition

Status.Conditionsのそれぞれに対してloopを回し、JobComplete or JobFailed だった場合に trueを返し、すべてのConditionsでこれらにマッチしなかった場合はfalseを返す。

// FinishedCondition returns true if a job is finished as well as the condition type indicating that.
// Returns false and no condition type otherwise
func FinishedCondition(j *batch.Job) (bool, batch.JobConditionType) {
	for _, c := range j.Status.Conditions {
		if (c.Type == batch.JobComplete || c.Type == batch.JobFailed) && c.Status == v1.ConditionTrue {
			return true, c.Type
		}
	}
	return false, ""
}

EventHandler

jobInformerに追加されているEventHandlerを見るとどの条件で、Controllerのworkqueueに処理するitemを入れるのかがわかる。

AddFuncUpdateFuncでjobを追加しているので、 Job作成時とJob更新時にqueueに入れられ、上で見たForloopで処理が行われることがわかる。

	jobInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
		AddFunc: func(obj interface{}) {
			tc.addJob(logger, obj)
		},
		UpdateFunc: func(oldObj, newObj interface{}) {
			tc.updateJob(logger, oldObj, newObj)
		},
	})

Ref

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?