始まり
-
Endpointsって重要だけど、Serviceのほうが注目されてて結構裏役。最初Kubernetes始めたときしらなかったから、Endpointsって何?って人結構いるはず。 -
Prometheus-Operatorで、
ServiceMonitorを見てたら (ServiceMonitorSpec)、Endpointsが使われてるけど、Endpointsの詳細は確認したことない
Endpointsとは
-
-
Selectorありラベルセレクターを定義したHeadless Serviceにおいて、EndpointsコントローラーはAPIにおいてEndpointsレコードを作成し、ServiceのバックエンドにあるPodへのIPを直接指し示すためにDNS設定を修正します。
-
Selectorなしラベルセレクターを定義しないHeadless Serviceにおいては、EndpointsコントローラーはEndpointsレコードを作成しません。
-
-
-
kubeletが正常な終了を開始すると同時に、コントロールプレーンは、終了中のPodをEndpoints(および有効な場合はEndpointSlice)オブジェクトから削除します。
-
コンテナがリクエスト応答する準備ができているかを示します。 readinessProbeに失敗すると、エンドポイントコントローラーにより、ServiceからそのPodのIPアドレスが削除されます
-
Endpointsに関するドキュメントが少ないと感じた。PodやServiceのところに別れて書いてあり、Endpointsに関することがまとまって書いてある部分がないためか。
簡単に自分の現状の理解を書くと、
ServiceをSelectorありで作ると、Serviceの名前と同じEndpointsが作成されて、PodのmatchLabelsに合うPodのIpがEndpointsに格納される。ただし、PodがReadyの状態だと、EndpointsのAddressesにPodIpが追加され、PodがReadyでないとNotReadyAddressesのリストに入る。これにより、準備の出来ていないPodにServiceからトラフィックが流れないようにしている。(これはEndpointsController外の話)
はっきりさせたいポイント
- 何が変更したらEndpointsが変更されるのか?
- 以下のイベントの際に変更があるかどうかチェックがトリガーされる:
- Serviceの作成、更新、削除
- Podの作成、更新、削除
- Endpointsの削除
- 以下のイベントの際に変更があるかどうかチェックがトリガーされる:
- Endpointsの
AddressesにIPを入れる条件は?- 以下のいずれかを満たせば:
-
service.Spec.PublishNotReadyAddressesがtrue - Annotation
"service.alpha.kubernetes.io/tolerate-unready-endpoints"がtrue (deprecated) -
podがReadyである
-
- 以下のいずれかを満たせば:
- Endpointsの
NotReadyAddressesにIPを入れる条件は?- 以下の条件を両方満たした場合:
-
Addressesに追加する条件に満たなかった -
pod.Spec.RestartPolicyにより-
RestartPolicyNever->PodFailedでもPodSucceededでもない場合 -
RestartPolicyOnFailure->PodSucceededでない場合 - その他
-
-
- 以下の条件を両方満たした場合:
コードを読む
基本はこれ: endpoints_controller.go
Overview
- ServiceのKeyをWorkQueueに入れる
-
Podが追加/変更/削除されたときに、そのPodがメンバーとなるServiceのkeyをWorkQueueに入れる -
Endpointが削除されたらobjからkeyを取り出して、WorkQueueに入れる -
Serviceが追加、更新、削除されたときも同様にkeyを取り出して、WorkQueueに入れる
-
- ReconcileLoopは、Serviceごとに処理をする
- ServiceのSelectorにより対応するPodを取得
- PodからPodIPを取得
- PodのStatusによりPodのIpをEndpointsの中の
Addressesにいれるか,NotReadyAddressesに入れる - 現状のEndpointsを取得
- 必要があればEndpointsの作成または更新
詳細
エントリーポイント
EndpointsControllerは、kube-controller-manager にいて、NewEndpointController + Run が呼ばれてスタート するので、この2つをエントリーポイントとして理解すれば良い
NewEndpointController
EndpointsControllerを作成して返す。
-
NewEndpointControllerでは3つのinformerがある-
serviceInformer-
AddFunc: onServiceUpdate = objからkeyを取り出して (namespace/name)、serviceSelectorCacheのSelectorを更新して、WorkQueueにkeyを入れる -
UpdateFunc: onServiceUpdate 上と同じ -
DeleteFunc: onServiceDelete = serviceSelectorCacheからkeyを取り除いてqueueに入れる
-
-
podInformer-
AddFunc: addPod = Podが追加されたら、属するサービスをqueueに追加する -
UpdateFunc: updatePod = podが更新されたら、前まで属していたServiceとこれから属するServiceを探し、それらをqueueに流す。 -
DeleteFunc: deletePod = podが消されたらv1.PodかDeletionFinalStateUnknownをobjとして受け取り endpointutliからpodを取り出し、nilじゃなかったら addPodに追加
-
-
endpointInformer-
DeleteFunc: onEndpointsDelete = endpointのkey取り出してqueueに追加
-
-
EndpointsController
Fieldは14個
- client clientset.Interface
- eventBroadcaster record.EventBroadcaster
- eventRecorder record.EventRecorder
- serviceLister corelisters.ServiceLister: ServiceをList/Getするため
- servicesSynced cache.InformerSynced: 初めてsyncされたらTrueを返す
- podLister corelisters.PodLister: PodをList/Getするため
- podsSynced cache.InformerSynced: 同じ
- endpointsLister corelisters.EndpointsLister: EndpointsをList/Getするため
- endpointsSynced cache.InformerSynced: 同じ
- queue workqueue.RateLimitingInterface: 更新が必要なServiceを格納する
- workerLoopPeriod time.Duration: 複数のworkerを走らせるのでその間隔
- triggerTimeTracker *endpointutil.TriggerTimeTracker: 最終更新時間を計算して、EndpointsLastChangeTriggerTimeというannotationを更新する
- endpointUpdatesBatchPeriod time.Duration
- serviceSelectorCache *endpointutil.ServiceSelectorCache: service selectorsのキャッシュ
Run(workers int, stopCh <-chan struct{})
この関数が、kube-controller-managerで呼ばれている。
workersの数が指定されているので、workerLoopPeriod 間隔を開けてスタートする
worker()
Runの中で複数回呼ばれるメインの関数
processNextWorkItem() をfor分で呼ぶ
processNextWorkItem()
-
queueからメッセージを取り出しekeyに格納する。 取得できなければfalseを返し終了。 - ekeyが存在すれば、
syncService(eKey)を実行 - エラーがあれば、
handleErr() - 最後に queueのDoneメソッドを読んで完了し
trueを返す
syncService(key string)
これがいわゆるReconcilation Loopをやっている関数。200行くらいの関数でメインロジックがあるので、いくつかにわけて行く。
1. 対象となるserviceを取得
-
namespaceとname←keyからセット -
service←serviceListerから、 namespaceとnameを指定しserviceを取得。 (今後はこのserviceに対して処理を行う)
2. 対象serviceの状況から直接処理できるケース
-
serviceが見つからなかった場合は、削除されているのでnameと同じ名前のEndpointsを削除する -
service.Spec.Selectorがなければ、nilを返して終了 (Service#without selector)
3. Serviceに対応するPodsを取得
-
pods←podListerからNamespaceとserviceのSelectorにより対応するpodを取得
4. Podsに対して処理をする前の準備
-
tolerateUnreadyEndpoints←service.Spec.PublishNotReadyAddressesとAnnotation (Deprecated)からセット -
endpointsLastChangeTriggerTimeを計算しセット -
subsetsというv1.EndpointSubsetの空リストを作成。 (ここにEndpointsを入れていき最後にこれにより更新する。)type EndpointSubset struct { Addresses []EndpointAddress NotReadyAddresses []EndpointAddress Ports []EndpointPort }
4. 対象となるPodsに対してEndpointSubsetsを埋めていく
podsのそれぞれのpodに対して以下の処理を行う:
-
pod.Status.PodIPがない場合→Continue
-
podが削除中の場合→Continue
-
ep, err := podToEndpointAddressForService(service, pod)← EndpointAddressを取得返されるEndpointAddress&v1.EndpointAddress{ IP: endpointIP, //`IPv6DualStack `がEnableされてない場合は、`endpointIP = pod.Status.PodIP` NodeName: &pod.Spec.NodeName, TargetRef: &v1.ObjectReference{ Kind: "Pod", Namespace: pod.ObjectMeta.Namespace, Name: pod.ObjectMeta.Name, UID: pod.ObjectMeta.UID, ResourceVersion: pod.ObjectMeta.ResourceVersion, }, } -
epa := *epポインタ変数を変換 -
epa(EndpointAddress) をsubsets(EndpointSubset)に追加していく
6.serviceにportがない場合
1. headless serviceの場合ならendpointAddressをsubsetsに追加するaddEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
7.serviceに portがある場合:
1.service.Spec.Portsに対してそれぞれ
1.endpointPortFromServicePort(servicePort, portNum)でEndpointPortインスタンスを生成
2.addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)により追加addEndpointSubset()if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) { // ServiceにPublishNotReadyAddressesがTrueとセットされてるorPodがReadyの場合 subsets = append(subsets, v1.EndpointSubset{ Addresses: []v1.EndpointAddress{epa}, Ports: ports, }) else if shouldPodBeInEndpoints(pod) { // それ以外で詳細は`RestartPolicy`依存だが、基本はReadyになってるべきだけど一時的にReadyではないような状態の場合 subsets = append(subsets, v1.EndpointSubset{ NotReadyAddresses: []v1.EndpointAddress{epa}, Ports: ports, }) }
5. 集めたEndpointSubsetsを加工
-
subsets = endpoints.RepackSubsets(subsets)でRepackする。 (複数のPodごとにバラバラにappendされてる、集約する)
[]v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []v1.EndpointPort{{Port: 111}},
}, {
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5"}},
Ports: []v1.EndpointPort{{Port: 222}},
}, {
Addresses: []v1.EndpointAddress{{IP: "1.2.3.6"}},
Ports: []v1.EndpointPort{{Port: 111}},
}, {
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5"}},
Ports: []v1.EndpointPort{{Port: 333}},
}},
[]v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "1.2.3.6"}},
Ports: []v1.EndpointPort{{Port: 111}},
}, {
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5"}},
Ports: []v1.EndpointPort{{Port: 222}, {Port: 333}},
}},
6. 現在のEndpointsを取得
-
currentEndpoints←endpointsListerを使って現状のEndpointsを取得し格納、存在していなければ、空のものを作成し格納
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
if errors.IsNotFound(err) {
currentEndpoints = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
} else {
return err
}
}
7. Endpointsの変更・作成が必要かどうかを判断
-
createEndpoints←currentEndpoints.resourceVersionの長さが0の場合は作成する (Resource Verionがないということは、上の行で初期化しただけで、実際には対応するリソースは存在していないことを示すため作成フラグをTrueにする) -
compareLabels← currentEndpoints.Labelsをセットして、v1.IsHeadlessServiceラベルは除く. (なぜならサービス自体にはセットされないので、diffチェックでfalse negativeになる Endpointだけについてるラベルで、サービスにはセットされてないものなので、無視する) -
createEndpointsがFalse & subsetsも同じ &compareLabelsが同じ & Capacityも問題ない & → 更新する必要がないのでreturn nil
8. 更新のための新しいEndpointsを準備
-
newEndpoints← この後のロジックでは作成・更新が必要になるものだけが残っているので、currentEndpointsからDeepCopy -
newEndpoints.Subsets = subsetssubsetsを更新 -
newEndpoints.Labels = service.LabelsserviceのLabelsをendpointsのlabelにする -
newEndpoints.Annotations- endpointLastChangeTriggerTimeの更新 (ない場合はAnnotationを消す)
- EndpointsOverCapacityの更新 ← maxCapacity=1000 を超えたsubsetsがあったら
warningをセット。そうでない場合はKeyを消す
-
newEndpoints.Labels- nilだったら空マップをセット
-
IsHeadlessService← Headless Serviceの場合は、service.kubernetes.io/headless=というラベルをつける. value=""でそれ以外はKeyを消す
9. Endpointsの作成・更新
-
createEndpointsならCreate()でそうでないならUpdate()する
if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{}) } else { // Pre-existing _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{}) } -
errorがあればログを書いたりeventRecorderに書いたりして return errする
-
errorがなければnilを返す
その他細かいもの
-
maxCapacity: 1000個以上行くとAnnotationにWarningがつく -
PublishNotReadyAddresses: PodがReadyでなくても、そのIPをEndpointsに含めるかのBoolean -
podToEndpointAddressForService: podからEndpointAddressを生成する関数。IPv6DualStackがEnableされてない場合は、endpointIP = pod.Status.PodIPを使ってEndpointAddressを初期化したものを返す。 -
ShouldSetHostname: podにHostnameがあって、PodのSubdomainとServiceのNameが同じで、ServiceとPodが同じNamespaceであればTrue -
addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int):-
eppがあればEndpointsPortに追加 -
tolerateUnreadyEndpointsがtrueかpodがReadyであれば、subsetsにEndpointSubset{ Address: EndpointAddress, Port: ports}を追加 - そうでない場合には、
shouldPodBeInEndpoints(pod)のときには、NotReadyAddressesにEndpointAddressを入れて追加する-
shouldPodBeInEndpoints(pod *v1.Pod)は、RestartPolicyによってことなり、-
Never:pod.Status.Phaseがv1.PodFailedでもv1.PodSucceededでもない場合にTrue -
OnFailure:pod.Status.Phaseがv1.PodFailedでない場合にTrue - default: true
-
-
-
-
endpointsLastChangeTriggerTimeを計算- triggerTimeTrackerのServiceStatesをServiceのNamespaceとNameから取得し
stateに入れる - minChangedTriggerTimeをPodの
LastTransitionTimeから取得 -
state内のそのPodのlastPodTriggerTimes[pod.Name]よりもPodがあとに更新されていれば、 minChangedTirggerTimeをpodTriggerTimeと比較して小さい方をminChangedTirggerTimeにセットする - serviceTriggerTimeのほうがminChangedTriggerTimeより小さければ、
minChangedTirggerTimeを更新する - stateのlastPodTriggerTimesとlastServiceTriggerTimeを更新
- ServiceStatesを更新してFuncを抜ける
- 返り値は、PodとServiceで一番小さい
lastPodTriggerTimeかCreationTimestamp
- triggerTimeTrackerのServiceStatesをServiceのNamespaceとNameから取得し
まとめ
Endpointsの管理のされ方を、コードベースで詳細まで確認できた。コントローラのコード読んだことない人の助けになれればと。
今後は、Endpointsに依存している部分 (例. Serviceの名前解決kube-proxy?やPrometheusOperatorでの使われ方) をちゃんと見てみたい。
