はじめに - Karpenterについて
Karpenterは、Kubernetes向けに構築された、ノードのオートスケールを実現するツールです。OSSとなっていてワークロードの実行効率とコストの改善を期待することができます。
Karpenterの主な役割として、公式ドキュメントの冒頭に以下のような記述があります。
・Watching for pods that the Kubernetes scheduler has marked as unschedulable
・Evaluating scheduling constraints (resource requests, nodeselectors, affinities, tolerations, and topology spread constraints) requested by the pods
・Provisioning nodes that meet the requirements of the pods
・Disrupting the nodes when the nodes are no longer needed
ドキュメントに書かれている通り、主な役割の例として、Provisioning があります。これは、スケジューリング不可能(Pending状態など)のPodがあったときに様々なリソースの制約を考慮した上で最適なNodeにスケジューリングするような仕組みです。
また、Disruption も重要な仕組みの一つとなっています。Karpenterはクラスタ内のNodeを監視し、利用されていないNodeやリソース効率の悪いNodeの中断を行い、コスト最適化を図ります。
これらの仕組みについて中身の実装がどうなっているか気になったので、追ってみようと思います。
対象リポジトリは
です。なお、以下に示す関数の中身は、単純化のため主要ロジックのみ抜き出しているところが多いのできちんとコードを見たい方はリンク先の確認をお願いします。
実装を見てみる
具体的なソースコードを見てみましょう。
まず、pkg/controller/controllers.goでコントローラの登録を行なっています。
先ほども説明した通り、Karpenterの主要ロジックを担っているといえるprovisionerとdisruptionの実装について見てみようと思います。
他にもNodeClaimsやNodePoolsなど、重要な部分がありますが今回はそこまでは深堀りしないものとします。
func NewControllers(
ctx context.Context,
mgr manager.Manager,
clock clock.Clock,
kubeClient client.Client,
recorder events.Recorder,
cloudProvider cloudprovider.CloudProvider,
) []controller.Controller {
cluster := state.NewCluster(clock, kubeClient, cloudProvider)
p := provisioning.NewProvisioner(kubeClient, recorder, cloudProvider, cluster, clock)
evictionQueue := terminator.NewQueue(kubeClient, recorder)
disruptionQueue := orchestration.NewQueue(kubeClient, recorder, cluster, clock, p)
controllers := []controller.Controller{
p, evictionQueue, disruptionQueue,
disruption.NewController(clock, kubeClient, p, cloudProvider, recorder, cluster, disruptionQueue),
provisioning.NewPodController(kubeClient, p),
provisioning.NewNodeController(kubeClient, p),
nodepoolhash.NewController(kubeClient, cloudProvider),
expiration.NewController(clock, kubeClient, cloudProvider),
informer.NewDaemonSetController(kubeClient, cluster),
informer.NewNodeController(kubeClient, cluster),
informer.NewPodController(kubeClient, cluster),
informer.NewNodePoolController(kubeClient, cloudProvider, cluster),
informer.NewNodeClaimController(kubeClient, cloudProvider, cluster),
termination.NewController(clock, kubeClient, cloudProvider, terminator.NewTerminator(clock, kubeClient, evictionQueue, recorder), recorder),
metricspod.NewController(kubeClient, cluster),
metricsnodepool.NewController(kubeClient, cloudProvider),
metricsnode.NewController(cluster),
nodepoolreadiness.NewController(kubeClient, cloudProvider),
nodepoolcounter.NewController(kubeClient, cloudProvider, cluster),
nodepoolvalidation.NewController(kubeClient, cloudProvider),
podevents.NewController(clock, kubeClient, cloudProvider),
nodeclaimconsistency.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimlifecycle.NewController(clock, kubeClient, cloudProvider, recorder),
nodeclaimgarbagecollection.NewController(clock, kubeClient, cloudProvider),
nodeclaimdisruption.NewController(clock, kubeClient, cloudProvider),
status.NewController[*v1.NodeClaim](kubeClient, mgr.GetEventRecorderFor("karpenter"), status.EmitDeprecatedMetrics),
status.NewController[*v1.NodePool](kubeClient, mgr.GetEventRecorderFor("karpenter"), status.EmitDeprecatedMetrics),
status.NewGenericObjectController[*corev1.Node](kubeClient, mgr.GetEventRecorderFor("karpenter")),
}
// The cloud provider must define status conditions for the node repair controller to use to detect unhealthy nodes
if len(cloudProvider.RepairPolicies()) != 0 && options.FromContext(ctx).FeatureGates.NodeRepair {
controllers = append(controllers, health.NewController(kubeClient, cloudProvider, clock, recorder))
}
return controllers
}
Provisionerについて
Karpenterは、UnschedulableなPodを既存または新規のNodeにプロアクティブにBindすることで、効率的なスケジューリングを行います。
provisioning/provisioner.goのReconcile関数において、Podを最適なNodeにスケジューリングする処理を行っています。(以下はコードの一部)
func (p *Provisioner) Reconcile(ctx context.Context) (result reconcile.Result, err error) {
ctx = injection.WithControllerName(ctx, "provisioner")
// Batch pods
if triggered := p.batcher.Wait(ctx); !triggered {
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}
// Schedule pods to potential nodes, exit if nothing to do
results, err := p.Schedule(ctx)
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}
この中で呼び出されているSchedule関数で具体的な処理が行われているようなので、Schedule関数の中身を見てみます。
リソース効率の最適化を図るため、NodeにスケジューリングされていないPending状態のPodを取得した後、削除予定のNodeにスケジューリングされているPodを取得しています。これらのPodを対象に、Solve関数においてNodeへの最適なスケジューリング処理を行います。
func (p *Provisioner) Schedule(ctx context.Context) (scheduler.Results, error) {
defer metrics.Measure(scheduler.DurationSeconds, map[string]string{scheduler.ControllerLabel: injection.GetControllerName(ctx)})()
start := time.Now()
// We collect the nodes with their used capacities before we get the list of pending pods. This ensures that
// the node capacities we schedule against are always >= what the actual capacity is at any given instance. This
// prevents over-provisioning at the cost of potentially under-provisioning which will self-heal during the next
// scheduling loop when we launch a new node. When this order is reversed, our node capacity may be reduced by pods
// that have bound which we then provision new un-needed capacity for.
// -------
// We don't consider the nodes that are MarkedForDeletion since this capacity shouldn't be considered
// as persistent capacity for the cluster (since it will soon be removed). Additionally, we are scheduling for
// the pods that are on these nodes so the MarkedForDeletion node capacity can't be considered.
nodes := p.cluster.Nodes()
// Get pods, exit if nothing to do
pendingPods, err := p.GetPendingPods(ctx)
// Get pods from nodes that are preparing for deletion
// We do this after getting the pending pods so that we undershoot if pods are
// actively migrating from a node that is being deleted
// NOTE: The assumption is that these nodes are cordoned and no additional pods will schedule to them
deletingNodePods, err := nodes.Deleting().ReschedulablePods(ctx, p.kubeClient)
pods := append(pendingPods, deletingNodePods...)
s, err := p.NewScheduler(ctx, pods, nodes.Active())
results := s.Solve(ctx, pods).TruncateInstanceTypes(scheduler.MaxInstanceTypes)
return results, nil
}
次に、Solve関数では、対象のPodをQueueに格納し、一つずつスケジューリングできるか試しています。実際のNodeへのスケジューリングはadd関数で行われ、失敗した場合はRelax関数で制限を緩和してから再度スケジューリングを試みます。
func (s *Scheduler) Solve(ctx context.Context, pods []*corev1.Pod) Results {
defer metrics.Measure(DurationSeconds, map[string]string{ControllerLabel: injection.GetControllerName(ctx)})()
// We loop trying to schedule unschedulable pods as long as we are making progress. This solves a few
// issues including pods with affinity to another pod in the batch. We could topo-sort to solve this, but it wouldn't
// solve the problem of scheduling pods where a particular order is needed to prevent a max-skew violation. E.g. if we
// had 5xA pods and 5xB pods were they have a zonal topology spread, but A can only go in one zone and B in another.
// We need to schedule them alternating, A, B, A, B, .... and this solution also solves that as well.
for _, p := range pods {
s.cachedPodRequests[p.UID] = resources.RequestsForPods(p)
}
q := NewQueue(pods, s.cachedPodRequests)
for {
// Try the next pod
pod, ok := q.Pop()
if !ok {
break
}
// Schedule to existing nodes or create a new node
if errors[pod] = s.add(ctx, pod); errors[pod] == nil {
delete(errors, pod)
continue
}
// If unsuccessful, relax the pod and recompute topology
relaxed := s.preferences.Relax(ctx, pod)
q.Push(pod, relaxed)
if relaxed {
if err := s.topology.Update(ctx, pod); err != nil {
log.FromContext(ctx).Error(err, "failed updating topology")
}
}
}
}
Relax関数で制限を緩和している部分も見てみます。
NodeAffinityやPodAffinity、TopologySpreadConstraintsを一つずつ削除していくことで条件の緩和を行なっていることがわかります。
func (p *Preferences) Relax(ctx context.Context, pod *v1.Pod) bool {
relaxations := []func(*v1.Pod) *string{
p.removeRequiredNodeAffinityTerm,
p.removePreferredPodAffinityTerm,
p.removePreferredPodAntiAffinityTerm,
p.removePreferredNodeAffinityTerm,
p.removeTopologySpreadScheduleAnyway}
if p.ToleratePreferNoSchedule {
relaxations = append(relaxations, p.toleratePreferNoScheduleTaints)
}
for _, relaxFunc := range relaxations {
if reason := relaxFunc(pod); reason != nil {
log.FromContext(ctx).WithValues("Pod", klog.KRef(pod.Namespace, pod.Name)).V(1).Info(fmt.Sprintf("relaxing soft constraints for pod since it previously failed to schedule, %s", lo.FromPtr(reason)))
return true
}
}
return false
}
以下にadd関数の実装を示します。
既存のノードにスケジューリングできるかをまず試しています。失敗する場合、現在作成中のノードへのスケジューリングを試みます。この際、Nodeに存在するPod数についてソートを行い最もPod数の少ないNodeに優先的にスケジューリングするようにしています。これらのスケジューリングができない場合、新しくNodeを作成しスケジューリングを試みます。
filterByRemainingResources関数で、ノードプールの制限を基にして使用できるインスタンスタイプの候補を絞っており、適切なインスタンスタイプを計算した上で新しくノードの作成を行います。
func (s *Scheduler) add(ctx context.Context, pod *corev1.Pod) error {
// first try to schedule against an in-flight real node
for _, node := range s.existingNodes {
if err := node.Add(ctx, s.kubeClient, pod, s.cachedPodRequests[pod.UID]); err == nil {
return nil
}
}
// Consider using https://pkg.go.dev/container/heap
sort.Slice(s.newNodeClaims, func(a, b int) bool { return len(s.newNodeClaims[a].Pods) < len(s.newNodeClaims[b].Pods) })
// Pick existing node that we are about to create
for _, nodeClaim := range s.newNodeClaims {
if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err == nil {
return nil
}
}
// Create new node
var errs error
for _, nodeClaimTemplate := range s.nodeClaimTemplates {
instanceTypes := nodeClaimTemplate.InstanceTypeOptions
// if limits have been applied to the nodepool, ensure we filter instance types to avoid violating those limits
if remaining, ok := s.remainingResources[nodeClaimTemplate.NodePoolName]; ok {
instanceTypes = filterByRemainingResources(instanceTypes, remaining)
if len(instanceTypes) == 0 {
errs = multierr.Append(errs, fmt.Errorf("all available instance types exceed limits for nodepool: %q", nodeClaimTemplate.NodePoolName))
continue
} else if len(nodeClaimTemplate.InstanceTypeOptions) != len(instanceTypes) {
log.FromContext(ctx).V(1).WithValues("NodePool", klog.KRef("", nodeClaimTemplate.NodePoolName)).Info(fmt.Sprintf("%d out of %d instance types were excluded because they would breach limits",
len(nodeClaimTemplate.InstanceTypeOptions)-len(instanceTypes), len(nodeClaimTemplate.InstanceTypeOptions)))
}
}
nodeClaim := NewNodeClaim(nodeClaimTemplate, s.topology, s.daemonOverhead[nodeClaimTemplate], instanceTypes)
if err := nodeClaim.Add(pod, s.cachedPodRequests[pod.UID]); err != nil {
nodeClaim.Destroy() // Ensure we cleanup any changes that we made while mocking out a NodeClaim
errs = multierr.Append(errs, fmt.Errorf("incompatible with nodepool %q, daemonset overhead=%s, %w",
nodeClaimTemplate.NodePoolName,
resources.String(s.daemonOverhead[nodeClaimTemplate]),
err))
continue
}
// we will launch this nodeClaim and need to track its maximum possible resource usage against our remaining resources
s.newNodeClaims = append(s.newNodeClaims, nodeClaim)
s.remainingResources[nodeClaimTemplate.NodePoolName] = subtractMax(s.remainingResources[nodeClaimTemplate.NodePoolName], nodeClaim.InstanceTypeOptions)
return nil
}
return errs
}
// filterByRemainingResources is used to filter out instance types that if launched would exceed the nodepool limits
func filterByRemainingResources(instanceTypes []*cloudprovider.InstanceType, remaining corev1.ResourceList) []*cloudprovider.InstanceType {
var filtered []*cloudprovider.InstanceType
for _, it := range instanceTypes {
itResources := it.Capacity
viableInstance := true
for resourceName, remainingQuantity := range remaining {
// if the instance capacity is greater than the remaining quantity for this resource
if resources.Cmp(itResources[resourceName], remainingQuantity) > 0 {
viableInstance = false
}
}
if viableInstance {
filtered = append(filtered, it)
}
}
return filtered
}
Disruptionについて
ドキュメントにも書かれていますが、Karpenterは中断可能なNodeを自動で検出し、必要に応じて代替用のNodeを作成します。この際、まずDrift を行なってNodeの中断を試みて、その後にconsolidation を通じてNodeの中断を試みるという形で一つずつの手法でNodeの中断ができるかどうかを試していきます。
Driftについて
Driftは、Nodeが望ましい状態から外れた場合(古いAMIを使用しているなど) に、そのNodeを置き換えるというものです。
具体的な実装は、pkg/controllers/nodeclaim/disruption/drift.goに記述されています。
具体的にDrift状態か否かを判定しているisDrifted関数の実装を見てみます。
まず最初に静的フィールドとNode要件を確認しDriftの検出を行います。
// isDrifted will check if a NodeClaim is drifted from the fields in the NodePool Spec and the CloudProvider
func (d *Drift) isDrifted(ctx context.Context, nodePool *v1.NodePool, nodeClaim *v1.NodeClaim) (cloudprovider.DriftReason, error) {
// First check for static drift or node requirements have drifted to save on API calls.
if reason := lo.FindOrElse([]cloudprovider.DriftReason{areStaticFieldsDrifted(nodePool, nodeClaim), areRequirementsDrifted(nodePool, nodeClaim)}, "", func(i cloudprovider.DriftReason) bool {
return i != ""
}); reason != "" {
return reason, nil
}
// Include instance type checking separate from the other two to reduce the amount of times we grab the instance types.
its, err := d.cloudProvider.GetInstanceTypes(ctx, nodePool)
if err != nil {
return "", err
}
if reason := instanceTypeNotFound(its, nodeClaim); reason != "" {
return reason, nil
}
// Then check if it's drifted from the cloud provider side.
driftedReason, err := d.cloudProvider.IsDrifted(ctx, nodeClaim)
if err != nil {
return "", err
}
return driftedReason, nil
}
areStaticFieldsDrifted関数に示しているように、Nodeの望ましい状態であるNodePoolと実際にリクエストされたNodeClaimについて、アノテーションからハッシュ値のバージョンを評価することでDrift状態か否かの検出をします。その後、インスタンスタイプの整合性を確認したりクラウドプロバイダーの処理を呼び出し、Driftの検出を行います。
// Eligible fields for drift are described in the docs
// https://karpenter.sh/docs/concepts/deprovisioning/#drift
func areStaticFieldsDrifted(nodePool *v1.NodePool, nodeClaim *v1.NodeClaim) cloudprovider.DriftReason {
nodePoolHash, foundNodePoolHash := nodePool.Annotations[v1.NodePoolHashAnnotationKey]
nodePoolHashVersion, foundNodePoolHashVersion := nodePool.Annotations[v1.NodePoolHashVersionAnnotationKey]
nodeClaimHash, foundNodeClaimHash := nodeClaim.Annotations[v1.NodePoolHashAnnotationKey]
nodeClaimHashVersion, foundNodeClaimHashVersion := nodeClaim.Annotations[v1.NodePoolHashVersionAnnotationKey]
if !foundNodePoolHash || !foundNodePoolHashVersion || !foundNodeClaimHash || !foundNodeClaimHashVersion {
return ""
}
// validate that the hash version on the NodePool is the same as the NodeClaim before evaluating for static drift
if nodePoolHashVersion != nodeClaimHashVersion {
return ""
}
return lo.Ternary(nodePoolHash != nodeClaimHash, NodePoolDrifted, "")
}
areRequirementsDrifted関数では、理想状態であるNodePoolとリクエストされたNodeClaimについてラベルの比較を行っており、NodePoolの要件を満たしているかどうかをチェックすることでDrift状態の判定を行います。
func areRequirementsDrifted(nodePool *v1.NodePool, nodeClaim *v1.NodeClaim) cloudprovider.DriftReason {
nodepoolReq := scheduling.NewNodeSelectorRequirementsWithMinValues(nodePool.Spec.Template.Spec.Requirements...)
nodeClaimReq := scheduling.NewLabelRequirements(nodeClaim.Labels)
// Every nodepool requirement is compatible with the NodeClaim label set
if nodeClaimReq.Compatible(nodepoolReq) != nil {
return RequirementsDrifted
}
return ""
}
Consolidationについて
Consolidationは、KarpenterがNodeの削除または置換を行うというものです。以下の3つのアクションが存在します。
- Empty Node Consolidation: 空のNode並列でを削除
- Multi Node Consolidation: 2つ以上のノードを並行して削除し、より安価な1つのNodeに置換(統合)する
- Single Node Consolidation: 1つのNodeを削除しより安価な1つのNodeに置換する
ここでは、具体的なConsolidationのロジックについて見てみます。
該当の処理を行っているのはpkg/controllers/desruptions/consolidation.goです。
ここでは、より安価なインスタンスタイプのNodeへの置き換えや、スポットインスタンスを優先的に利用するようなスケジューリングを行います。
例えば、以下の部分では現在のインスタンスの価格よりも安価なインスタンスタイプの絞り込みを行なっており、その中で絞り込んだインスタンスタイプがリソース要件を満たしているかどうかを同時に確認しています。
func (n *NodeClaim) RemoveInstanceTypeOptionsByPriceAndMinValues(reqs scheduling.Requirements, maxPrice float64) (*NodeClaim, error) {
n.InstanceTypeOptions = lo.Filter(n.InstanceTypeOptions, func(it *cloudprovider.InstanceType, _ int) bool {
launchPrice := it.Offerings.Available().WorstLaunchPrice(reqs)
return launchPrice < maxPrice
})
if _, err := n.InstanceTypeOptions.SatisfiesMinValues(reqs); err != nil {
return nil, err
}
return n, nil
}
// Compute command to execute spot-to-spot consolidation if:
// 1. The SpotToSpotConsolidation feature flag is set to true.
// 2. For single-node consolidation:
// a. There are at least 15 cheapest instance type replacement options to consolidate.
// b. The current candidate is NOT part of the first 15 cheapest instance types inorder to avoid repeated consolidation.
func (c *consolidation) computeSpotToSpotConsolidation(ctx context.Context, candidates []*Candidate, results pscheduling.Results,
candidatePrice float64) (Command, pscheduling.Results, error) {
// filterByPrice returns the instanceTypes that are lower priced than the current candidate and any error that indicates the input couldn't be filtered.
var err error
results.NewNodeClaims[0], err = results.NewNodeClaims[0].RemoveInstanceTypeOptionsByPriceAndMinValues(results.NewNodeClaims[0].Requirements, candidatePrice)
// For multi-node consolidation:
// We don't have any requirement to check the remaining instance type flexibility, so exit early in this case.
if len(candidates) > 1 {
return Command{
candidates: candidates,
replacements: results.NewNodeClaims,
}, results, nil
}
// For single-node consolidation:
// We check whether we have 15 cheaper instances than the current candidate instance. If this is the case, we know the following things:
// 1) The current candidate is not in the set of the 15 cheapest instance types and
// 2) There were at least 15 options cheaper than the current candidate.
if len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions) < MinInstanceTypesForSpotToSpotConsolidation {
c.recorder.Publish(disruptionevents.Unconsolidatable(candidates[0].Node, candidates[0].NodeClaim, fmt.Sprintf("SpotToSpotConsolidation requires %d cheaper instance type options than the current candidate to consolidate, got %d",
MinInstanceTypesForSpotToSpotConsolidation, len(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions)))...)
return Command{}, pscheduling.Results{}, nil
}
// If a user has minValues set in their NodePool requirements, then we cap the number of instancetypes at 100 which would be the actual number of instancetypes sent for launch to enable spot-to-spot consolidation.
// If no minValues in the NodePool requirement, then we follow the default 15 to cap the instance types for launch to enable a spot-to-spot consolidation.
// Restrict the InstanceTypeOptions for launch to 15(if default) so we don't get into a continual consolidation situation.
// For example:
// 1) Suppose we have 5 instance types, (A, B, C, D, E) in order of price with the minimum flexibility 3 and they’ll all work for our pod. We send CreateInstanceFromTypes(A,B,C,D,E) and it gives us a E type based on price and availability of spot.
// 2) We check if E is part of (A,B,C,D) and it isn't, so we will immediately have consolidation send a CreateInstanceFromTypes(A,B,C,D), since they’re cheaper than E.
// 3) Assuming CreateInstanceFromTypes(A,B,C,D) returned D, we check if D is part of (A,B,C) and it isn't, so will have another consolidation send a CreateInstanceFromTypes(A,B,C), since they’re cheaper than D resulting in continual consolidation.
// If we had restricted instance types to min flexibility at launch at step (1) i.e CreateInstanceFromTypes(A,B,C), we would have received the instance type part of the list preventing immediate consolidation.
// Taking this to 15 types, we need to only send the 15 cheapest types in the CreateInstanceFromTypes call so that the resulting instance is always in that set of 15 and we won’t immediately consolidate.
if results.NewNodeClaims[0].Requirements.HasMinValues() {
// Here we are trying to get the max of the minimum instances required to satisfy the minimum requirement and the default 15 to cap the instances for spot-to-spot consolidation.
minInstanceTypes, _ := results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions.SatisfiesMinValues(results.NewNodeClaims[0].Requirements)
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions = lo.Slice(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, 0, lo.Max([]int{MinInstanceTypesForSpotToSpotConsolidation, minInstanceTypes}))
} else {
results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions = lo.Slice(results.NewNodeClaims[0].NodeClaimTemplate.InstanceTypeOptions, 0, MinInstanceTypesForSpotToSpotConsolidation)
}
return Command{
candidates: candidates,
replacements: results.NewNodeClaims,
}, results, nil
}
Disruptionを行っている部分の処理について
ソースコードでは以下の部分で実装されています。
pkg/controllers/disruption/controller.goのReconcile関数を通じて、KarpenterがNodeの中断を行う処理を見ていきたいと思います。
DriftとConsolidationが反映された、複数のmethods
を適用し、各method
ごとにNodeの中断を試していき、成功した時点で終了するという形の処理となっています。method
間の優先順位が存在しているようなイメージです。
// Attempt different disruption methods. We'll only let one method perform an action
for _, m := range c.methods {
c.recordRun(fmt.Sprintf("%T", m))
success, err := c.disrupt(ctx, m)
if err != nil {
if errors.IsConflict(err) {
return reconcile.Result{Requeue: true}, nil
}
return reconcile.Result{}, fmt.Errorf("disrupting via reason=%q, %w", strings.ToLower(string(m.Reason())), err)
}
if success {
return reconcile.Result{RequeueAfter: singleton.RequeueImmediately}, nil
}
}
methods
の中身については同じファイル内のNewController関数で定義されています。NewDrift()がDriftに対応し、他の3つがConsolidationに対応していますね。
メソッド | 機能 |
---|---|
NewDrift() | プロビジョニングから外れた(NodePoolやEC2NodeClassが理想の仕様から逸脱している) Nodeを終了する |
NewEmptiness() | 空のNodeを並列で削除する |
NewMultiNodeConsolidation() | 2つ以上のNodeを並列で削除し、より安価のNodeを一つ起動して置換する |
NewSingleNodeConsolidation() | 単一のノードを削除し、より安価のNodeに置換する |
func NewController(clk clock.Clock, kubeClient client.Client, provisioner *provisioning.Provisioner,
cp cloudprovider.CloudProvider, recorder events.Recorder, cluster *state.Cluster, queue *orchestration.Queue,
) *Controller {
c := MakeConsolidation(clk, cluster, kubeClient, provisioner, cp, recorder, queue)
return &Controller{
queue: queue,
clock: clk,
kubeClient: kubeClient,
cluster: cluster,
provisioner: provisioner,
recorder: recorder,
cloudProvider: cp,
lastRun: map[string]time.Time{},
methods: []Method{
// Terminate any NodeClaims that have drifted from provisioning specifications, allowing the pods to reschedule.
NewDrift(kubeClient, cluster, provisioner, recorder),
// Delete any empty NodeClaims as there is zero cost in terms of disruption.
NewEmptiness(c),
// Attempt to identify multiple NodeClaims that we can consolidate simultaneously to reduce pod churn
NewMultiNodeConsolidation(c),
// And finally fall back our single NodeClaim consolidation to further reduce cluster cost.
NewSingleNodeConsolidation(c),
},
}
}
次に、Nodeの中断を試みるdisrupt関数について見ていきます。(エラーハンドリング等、一部省いています)
まず、disruption.Method
に従って中断の対象となるノードの候補(candidates)の特定を行っています。ここで得られたcandidatesについてComputeCommand関数を適用し削除の対象になるかどうかを計算します。そして、executeCommand関数においてNodeの中断処理を行います。
func (c *Controller) disrupt(ctx context.Context, disruption Method) (bool, error) {
candidates, err := GetCandidates(ctx, c.cluster, c.kubeClient, c.recorder, c.clock, c.cloudProvider, disruption.ShouldDisrupt, disruption.Class(), c.queue
// Determine the disruption action
cmd, schedulingResults, err := disruption.ComputeCommand(ctx, disruptionBudgetMapping, candidates...)
// Attempt to disrupt
if err := c.executeCommand(ctx, disruption, cmd, schedulingResults); err != nil {
return false, fmt.Errorf("disrupting candidates, %w", err)
}
return true, nil
}
pkg/controllers/disruption/helpers.goにGetCandidates関数の処理があり、ここではPDBの制約やNodeの状態をもとに、新たにNewCandidate関数を呼び出しNodeを削除できるか検証を行った上で削除されるNodeの候補を絞り出しています。
// GetCandidates returns nodes that appear to be currently deprovisionable based off of their nodePool
func GetCandidates(ctx context.Context, cluster *state.Cluster, kubeClient client.Client, recorder events.Recorder, clk clock.Clock,
cloudProvider cloudprovider.CloudProvider, shouldDisrupt CandidateFilter, disruptionClass string, queue *orchestration.Queue,
) ([]*Candidate, error) {
nodePoolMap, nodePoolToInstanceTypesMap, err := BuildNodePoolMap(ctx, kubeClient, cloudProvider)
if err != nil {
return nil, err
}
pdbs, err := pdb.NewLimits(ctx, clk, kubeClient)
if err != nil {
return nil, fmt.Errorf("tracking PodDisruptionBudgets, %w", err)
}
candidates := lo.FilterMap(cluster.Nodes(), func(n *state.StateNode, _ int) (*Candidate, bool) {
cn, e := NewCandidate(ctx, kubeClient, recorder, clk, n, pdbs, nodePoolMap, nodePoolToInstanceTypesMap, queue, disruptionClass)
return cn, e == nil
})
// Filter only the valid candidates that we should disrupt
return lo.Filter(candidates, func(c *Candidate, _ int) bool { return shouldDisrupt(ctx, c) }), nil
}
executeCommand関数の実装は以下のようになっています。ここでの処理は、Nodeに対しNoScheduleのTaintを付与し、代替のNodeを作成します。なお、代替するNodeの作成に失敗した場合には中断を行わないとの記述があります。削除対象のマークを付与し実際に削除を行うためのキューに追加します。
// executeCommand will do the following, untainting if the step fails.
// 1. Taint candidate nodes
// 2. Spin up replacement nodes
// 3. Add Command to orchestration.Queue to wait to delete the candiates.
func (c *Controller) executeCommand(ctx context.Context, m Method, cmd Command, schedulingResults scheduling.Results) error {
commandID := uuid.NewUUID()
log.FromContext(ctx).WithValues("command-id", commandID, "reason", strings.ToLower(string(m.Reason()))).Info(fmt.Sprintf("disrupting nodeclaim(s) via %s", cmd))
stateNodes := lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode {
return c.StateNode
})
// Cordon the old nodes before we launch the replacements to prevent new pods from scheduling to the old nodes
if err := state.RequireNoScheduleTaint(ctx, c.kubeClient, true, stateNodes...); err != nil {
return fmt.Errorf("tainting nodes with %s (command-id: %s), %w", pretty.Taint(v1.DisruptedNoScheduleTaint), commandID, err)
}
var nodeClaimNames []string
var err error
if len(cmd.replacements) > 0 {
if nodeClaimNames, err = c.createReplacementNodeClaims(ctx, m, cmd); err != nil {
// If we failed to launch the replacement, don't disrupt. If this is some permanent failure,
// we don't want to disrupt workloads with no way to provision new nodes for them.
return fmt.Errorf("launching replacement nodeclaim (command-id: %s), %w", commandID, err)
}
}
// Nominate each node for scheduling and emit pod nomination events
// We emit all nominations before we exit the disruption loop as
// we want to ensure that nodes that are nominated are respected in the subsequent
// disruption reconciliation. This is essential in correctly modeling multiple
// disruption commands in parallel.
// This will only nominate nodes for 2 * batchingWindow. Once the candidates are
// tainted with the Karpenter taint, the provisioning controller will continue
// to do scheduling simulations and nominate the pods on the candidate nodes until
// the node is cleaned up.
schedulingResults.Record(log.IntoContext(ctx, operatorlogging.NopLogger), c.recorder, c.cluster)
providerIDs := lo.Map(cmd.candidates, func(c *Candidate, _ int) string { return c.ProviderID() })
// We have the new NodeClaims created at the API server so mark the old NodeClaims for deletion
c.cluster.MarkForDeletion(providerIDs...)
if err = c.queue.Add(orchestration.NewCommand(nodeClaimNames,
lo.Map(cmd.candidates, func(c *Candidate, _ int) *state.StateNode { return c.StateNode }), commandID, m.Reason(), m.ConsolidationType())); err != nil {
c.cluster.UnmarkForDeletion(providerIDs...)
return fmt.Errorf("adding command to queue (command-id: %s), %w", commandID, err)
}
// An action is only performed and pods/nodes are only disrupted after a successful add to the queue
DecisionsPerformedTotal.Inc(map[string]string{
decisionLabel: string(cmd.Decision()),
metrics.ReasonLabel: strings.ToLower(string(m.Reason())),
consolidationTypeLabel: m.ConsolidationType(),
})
return nil
}
終わりに
ドキュメントを読むだけではなくソースコードも併せて確認してみることで、Karpenterがどのようにしてスケジューリングを行っているかが理解しやすくなった気がします。
個人的にはOSSのコードを読む良い機会になりました。
ここまで読んでいただき、ありがとうございました。