LoginSignup
6
4

More than 1 year has passed since last update.

[Kubernetes] EndpointsがよくわかってないのでEndpointsControllerを読んでみた

Last updated at Posted at 2021-06-14

始まり

  • Endpointsって重要だけど、Serviceのほうが注目されてて結構裏役。最初Kubernetes始めたときしらなかったから、Endpointsって何?って人結構いるはず。

  • Prometheus-Operatorで、ServiceMonitor を見てたら (ServiceMonitorSpec)、Endpointsが使われてるけど、Endpointsの詳細は確認したことない

Endpointsとは

  • Service

    • Selectorあり

      ラベルセレクターを定義したHeadless Serviceにおいて、EndpointsコントローラーはAPIにおいてEndpointsレコードを作成し、ServiceのバックエンドにあるPodへのIPを直接指し示すためにDNS設定を修正します。

    • Selectorなし

      ラベルセレクターを定義しないHeadless Serviceにおいては、EndpointsコントローラーはEndpointsレコードを作成しません。

  • PodのLifeCycle

    • Podの終了

      kubeletが正常な終了を開始すると同時に、コントロールプレーンは、終了中のPodをEndpoints(および有効な場合はEndpointSlice)オブジェクトから削除します。

    • ReadinessProbe

      コンテナがリクエスト応答する準備ができているかを示します。 readinessProbeに失敗すると、エンドポイントコントローラーにより、ServiceからそのPodのIPアドレスが削除されます

Endpointsに関するドキュメントが少ないと感じた。PodやServiceのところに別れて書いてあり、Endpointsに関することがまとまって書いてある部分がないためか。

簡単に自分の現状の理解を書くと、

ServiceをSelectorありで作ると、Serviceの名前と同じEndpointsが作成されて、PodのmatchLabelsに合うPodのIpがEndpointsに格納される。ただし、PodがReadyの状態だと、EndpointsのAddressesにPodIpが追加され、PodがReadyでないとNotReadyAddressesのリストに入る。これにより、準備の出来ていないPodにServiceからトラフィックが流れないようにしている。(これはEndpointsController外の話)

はっきりさせたいポイント

  • 何が変更したらEndpointsが変更されるのか?
    • 以下のイベントの際に変更があるかどうかチェックがトリガーされる:
      • Serviceの作成、更新、削除
      • Podの作成、更新、削除
      • Endpointsの削除
  • EndpointsのAddressesにIPを入れる条件は?
    • 以下のいずれかを満たせば:
      • service.Spec.PublishNotReadyAddressesがtrue
      • Annotation "service.alpha.kubernetes.io/tolerate-unready-endpoints"がtrue (deprecated)
      • podReadyである
  • EndpointsのNotReadyAddressesにIPを入れる条件は?
    • 以下の条件を両方満たした場合:
      • Addressesに追加する条件に満たなかった
      • pod.Spec.RestartPolicyにより
        • RestartPolicyNever -> PodFailed でも PodSucceededでもない場合
        • RestartPolicyOnFailure -> PodSucceededでない場合
        • その他

コードを読む

基本はこれ: endpoints_controller.go

Overview

  1. ServiceのKeyをWorkQueueに入れる
    1. Podが追加/変更/削除されたときに、そのPodがメンバーとなるServiceのkeyをWorkQueueに入れる
    2. Endpointが削除されたらobjからkeyを取り出して、WorkQueueに入れる
    3. Serviceが追加、更新、削除されたときも同様にkeyを取り出して、WorkQueueに入れる
  2. ReconcileLoopは、Serviceごとに処理をする
  3. ServiceのSelectorにより対応するPodを取得
  4. PodからPodIPを取得
  5. PodのStatusによりPodのIpをEndpointsの中の Addresses にいれるか, NotReadyAddresses に入れる
  6. 現状のEndpointsを取得
  7. 必要があればEndpointsの作成または更新

endpoints-controller.png

詳細

エントリーポイント

EndpointsControllerは、kube-controller-manager にいて、NewEndpointController + Run が呼ばれてスタート するので、この2つをエントリーポイントとして理解すれば良い

NewEndpointController

EndpointsControllerを作成して返す。

  1. NewEndpointController では3つのinformerがある <!-- (informerとは?という人はControllerのおさらいへ) -->
    1. serviceInformer
      1. AddFunc: onServiceUpdate = objからkeyを取り出して (namespace/name)、serviceSelectorCacheのSelectorを更新して、WorkQueueにkeyを入れる
      2. UpdateFunc: onServiceUpdate 上と同じ
      3. DeleteFunc: onServiceDelete = serviceSelectorCacheからkeyを取り除いてqueueに入れる
    2. podInformer
      1. AddFunc: addPod = Podが追加されたら、属するサービスをqueueに追加する
      2. UpdateFunc: updatePod = podが更新されたら、前まで属していたServiceとこれから属するServiceを探し、それらをqueueに流す。
      3. DeleteFunc: deletePod = podが消されたらv1.PodかDeletionFinalStateUnknownをobjとして受け取り endpointutliからpodを取り出し、nilじゃなかったら addPodに追加
    3. endpointInformer
      1. DeleteFunc: onEndpointsDelete = endpointのkey取り出してqueueに追加

EndpointsController

Fieldは14個

  1. client clientset.Interface
  2. eventBroadcaster record.EventBroadcaster
  3. eventRecorder record.EventRecorder
  4. serviceLister corelisters.ServiceLister: ServiceをList/Getするため
  5. servicesSynced cache.InformerSynced: 初めてsyncされたらTrueを返す
  6. podLister corelisters.PodLister: PodをList/Getするため
  7. podsSynced cache.InformerSynced: 同じ
  8. endpointsLister corelisters.EndpointsLister: EndpointsをList/Getするため
  9. endpointsSynced cache.InformerSynced: 同じ
  10. queue workqueue.RateLimitingInterface: 更新が必要なServiceを格納する
  11. workerLoopPeriod time.Duration: 複数のworkerを走らせるのでその間隔
  12. triggerTimeTracker *endpointutil.TriggerTimeTracker: 最終更新時間を計算して、EndpointsLastChangeTriggerTimeというannotationを更新する
  13. endpointUpdatesBatchPeriod time.Duration
  14. serviceSelectorCache *endpointutil.ServiceSelectorCache: service selectorsのキャッシュ

Run(workers int, stopCh <-chan struct{})

この関数が、kube-controller-managerで呼ばれている。
workersの数が指定されているので、workerLoopPeriod 間隔を開けてスタートする

worker()

Runの中で複数回呼ばれるメインの関数

processNextWorkItem() をfor分で呼ぶ

processNextWorkItem()

  1. queue からメッセージを取り出し ekeyに格納する。 取得できなければ false を返し終了。
  2. ekeyが存在すれば、 syncService(eKey) を実行
  3. エラーがあれば、handleErr()
  4. 最後に queueのDoneメソッドを読んで完了し trueを返す

syncService(key string)

これがいわゆるReconcilation Loopをやっている関数。200行くらいの関数でメインロジックがあるので、いくつかにわけて行く。

1. 対象となるserviceを取得

  1. namespacenamekeyからセット
  2. serviceserviceListerから、 namespaceとnameを指定しserviceを取得。 (今後はこのservice に対して処理を行う)

2. 対象serviceの状況から直接処理できるケース

  1. serviceが見つからなかった場合は、削除されているのでnameと同じ名前のEndpointsを削除する
  2. service.Spec.Selector がなければ、nilを返して終了 (Service#without selector)

3. Serviceに対応するPodsを取得

  1. podspodListerからNamespaceとserviceのSelectorにより対応するpodを取得

4. Podsに対して処理をする前の準備

  1. tolerateUnreadyEndpointsservice.Spec.PublishNotReadyAddressesとAnnotation (Deprecated)からセット
  2. endpointsLastChangeTriggerTime を計算しセット
  3. subsetsという v1.EndpointSubsetの空リストを作成。 (ここにEndpointsを入れていき最後にこれにより更新する。)

    type EndpointSubset struct {
        Addresses []EndpointAddress
        NotReadyAddresses []EndpointAddress
        Ports []EndpointPort
    }
    

4. 対象となるPodsに対してEndpointSubsetsを埋めていく

podsのそれぞれのpodに対して以下の処理を行う:

  1. pod.Status.PodIPがない場合→Continue
  2. podが削除中の場合→Continue
  3. ep, err := podToEndpointAddressForService(service, pod) ← EndpointAddressを取得

    返されるEndpointAddress
    &v1.EndpointAddress{
        IP:       endpointIP,  //`IPv6DualStack `がEnableされてない場合は、`endpointIP = pod.Status.PodIP`
        NodeName: &pod.Spec.NodeName,
        TargetRef: &v1.ObjectReference{
            Kind:            "Pod",
            Namespace:       pod.ObjectMeta.Namespace,
            Name:            pod.ObjectMeta.Name,
            UID:             pod.ObjectMeta.UID,
            ResourceVersion: pod.ObjectMeta.ResourceVersion,
        },
    }
    
  4. epa := *ep ポインタ変数を変換

  5. epa (EndpointAddress) をsubsets (EndpointSubset)に追加していく

    1. serviceにportがない場合
      1. headless serviceの場合ならendpointAddressをsubsetsに追加する addEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
    2. service に portがある場合:
      1. service.Spec.Ports に対してそれぞれ
        1. endpointPortFromServicePort(servicePort, portNum) でEndpointPortインスタンスを生成
        2. addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints) により追加
    addEndpointSubset()
    if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) { // ServiceにPublishNotReadyAddressesがTrueとセットされてるorPodがReadyの場合
        subsets = append(subsets, v1.EndpointSubset{
            Addresses: []v1.EndpointAddress{epa},
            Ports:     ports,
        })
    else if shouldPodBeInEndpoints(pod) { // それ以外で詳細は`RestartPolicy`依存だが、基本はReadyになってるべきだけど一時的にReadyではないような状態の場合
        subsets = append(subsets, v1.EndpointSubset{
            NotReadyAddresses: []v1.EndpointAddress{epa},
            Ports:     ports,
        })
    }
    

 5. 集めたEndpointSubsetsを加工

  1. subsets = endpoints.RepackSubsets(subsets) でRepackする。 (複数のPodごとにバラバラにappendされてる、集約する)
処理前
[]v1.EndpointSubset{{
    Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}},
    Ports:     []v1.EndpointPort{{Port: 111}},
}, {
    NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5"}},
    Ports:             []v1.EndpointPort{{Port: 222}},
}, {
    Addresses: []v1.EndpointAddress{{IP: "1.2.3.6"}},
    Ports:     []v1.EndpointPort{{Port: 111}},
}, {
    NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5"}},
    Ports:             []v1.EndpointPort{{Port: 333}},
}},
処理後
[]v1.EndpointSubset{{
    Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "1.2.3.6"}},
    Ports:     []v1.EndpointPort{{Port: 111}},
}, {
    NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5"}},
    Ports:             []v1.EndpointPort{{Port: 222}, {Port: 333}},
}},

6. 現在のEndpointsを取得

  1. currentEndpointsendpointsListerを使って現状のEndpointsを取得し格納、存在していなければ、空のものを作成し格納
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
    if errors.IsNotFound(err) {
        currentEndpoints = &v1.Endpoints{
            ObjectMeta: metav1.ObjectMeta{
                Name:   service.Name,
                Labels: service.Labels,
            },
        }
    } else {
        return err
    }
}

7. Endpointsの変更・作成が必要かどうかを判断

  1. createEndpointscurrentEndpoints.resourceVersionの長さが0の場合は作成する (Resource Verionがないということは、上の行で初期化しただけで、実際には対応するリソースは存在していないことを示すため作成フラグをTrueにする)
  2. compareLabels ← currentEndpoints.Labelsをセットして、v1.IsHeadlessServiceラベルは除く. (なぜならサービス自体にはセットされないので、diffチェックでfalse negativeになる Endpointだけについてるラベルで、サービスにはセットされてないものなので、無視する)
  3. createEndpointsがFalse & subsetsも同じ & compareLabelsが同じ & Capacityも問題ない & → 更新する必要がないのでreturn nil

8. 更新のための新しいEndpointsを準備

  1. newEndpoints ← この後のロジックでは作成・更新が必要になるものだけが残っているので、currentEndpointsからDeepCopy
  2. newEndpoints.Subsets = subsets subsetsを更新
  3. newEndpoints.Labels = service.Labels serviceのLabelsをendpointsのlabelにする
  4. newEndpoints.Annotations
    1. endpointLastChangeTriggerTimeの更新 (ない場合はAnnotationを消す)
    2. EndpointsOverCapacityの更新 ← maxCapacity=1000 を超えたsubsetsがあったらwarningをセット。そうでない場合はKeyを消す
  5. newEndpoints.Labels
    1. nilだったら空マップをセット
    2. IsHeadlessService ← Headless Serviceの場合は、service.kubernetes.io/headless= というラベルをつける. value=""でそれ以外はKeyを消す

9. Endpointsの作成・更新

  1. createEndpointsならCreate()でそうでないならUpdate()する

    if createEndpoints {
        // No previous endpoints, create them
        _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{})
    } else {
        // Pre-existing
        _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{})
    }
    
  2. errorがあればログを書いたりeventRecorderに書いたりして return errする

  3. errorがなければnilを返す

その他細かいもの

  • maxCapacity: 1000個以上行くとAnnotationにWarningがつく
  • PublishNotReadyAddresses: PodがReadyでなくても、そのIPをEndpointsに含めるかのBoolean
  • podToEndpointAddressForService: podからEndpointAddressを生成する関数。IPv6DualStackがEnableされてない場合は、endpointIP = pod.Status.PodIPを使って EndpointAddressを初期化したものを返す。
  • ShouldSetHostname: podにHostnameがあって、PodのSubdomainとServiceのNameが同じで、ServiceとPodが同じNamespaceであればTrue
  • addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int) :
    • epp があれば EndpointsPort に追加
    • tolerateUnreadyEndpoints がtrueかpodがReadyであれば、subsetsに EndpointSubset{ Address: EndpointAddress, Port: ports} を追加
    • そうでない場合には、 shouldPodBeInEndpoints(pod) のときには、 NotReadyAddresses にEndpointAddressを入れて追加する
      • shouldPodBeInEndpoints(pod *v1.Pod) は、RestartPolicyによってことなり、
        • Never: pod.Status.Phasev1.PodFailed でも v1.PodSucceeded でもない場合にTrue
        • OnFailure: pod.Status.Phasev1.PodFailed でない場合にTrue
        • default: true
  • endpointsLastChangeTriggerTime を計算
    1. triggerTimeTrackerのServiceStatesをServiceのNamespaceとNameから取得し state に入れる
    2. minChangedTriggerTimeをPodの LastTransitionTime から取得
    3. state 内のそのPodの lastPodTriggerTimes[pod.Name] よりもPodがあとに更新されていれば、 minChangedTirggerTimeをpodTriggerTimeと比較して小さい方を minChangedTirggerTime にセットする
    4. serviceTriggerTimeのほうがminChangedTriggerTimeより小さければ、 minChangedTirggerTime を更新する
    5. stateのlastPodTriggerTimesとlastServiceTriggerTimeを更新
    6. ServiceStatesを更新してFuncを抜ける
    7. 返り値は、PodとServiceで一番小さい lastPodTriggerTimeCreationTimestamp

まとめ

Endpointsの管理のされ方を、コードベースで詳細まで確認できた。コントローラのコード読んだことない人の助けになれればと。
今後は、Endpointsに依存している部分 (例. Serviceの名前解決kube-proxy?やPrometheusOperatorでの使われ方) をちゃんと見てみたい。

6
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
4