始まり
-
Endpoints
って重要だけど、Service
のほうが注目されてて結構裏役。最初Kubernetes始めたときしらなかったから、Endpointsって何?って人結構いるはず。 -
Prometheus-Operatorで、
ServiceMonitor
を見てたら (ServiceMonitorSpec)、Endpointsが使われてるけど、Endpointsの詳細は確認したことない
Endpointsとは
-
-
Selector
ありラベルセレクターを定義したHeadless Serviceにおいて、EndpointsコントローラーはAPIにおいてEndpointsレコードを作成し、ServiceのバックエンドにあるPodへのIPを直接指し示すためにDNS設定を修正します。
-
Selector
なしラベルセレクターを定義しないHeadless Serviceにおいては、EndpointsコントローラーはEndpointsレコードを作成しません。
-
-
-
kubeletが正常な終了を開始すると同時に、コントロールプレーンは、終了中のPodをEndpoints(および有効な場合はEndpointSlice)オブジェクトから削除します。
-
コンテナがリクエスト応答する準備ができているかを示します。 readinessProbeに失敗すると、エンドポイントコントローラーにより、ServiceからそのPodのIPアドレスが削除されます
-
Endpointsに関するドキュメントが少ないと感じた。PodやServiceのところに別れて書いてあり、Endpointsに関することがまとまって書いてある部分がないためか。
簡単に自分の現状の理解を書くと、
ServiceをSelectorありで作ると、Serviceの名前と同じEndpointsが作成されて、PodのmatchLabelsに合うPodのIpがEndpointsに格納される。ただし、PodがReadyの状態だと、EndpointsのAddressesにPodIpが追加され、PodがReadyでないとNotReadyAddresses
のリストに入る。これにより、準備の出来ていないPodにServiceからトラフィックが流れないようにしている。(これはEndpointsController外の話)
はっきりさせたいポイント
- 何が変更したらEndpointsが変更されるのか?
- 以下のイベントの際に変更があるかどうかチェックがトリガーされる:
- Serviceの作成、更新、削除
- Podの作成、更新、削除
- Endpointsの削除
- 以下のイベントの際に変更があるかどうかチェックがトリガーされる:
- Endpointsの
Addresses
にIPを入れる条件は?- 以下のいずれかを満たせば:
-
service.Spec.PublishNotReadyAddresses
がtrue - Annotation
"service.alpha.kubernetes.io/tolerate-unready-endpoints"
がtrue (deprecated) -
pod
がReady
である
-
- 以下のいずれかを満たせば:
- Endpointsの
NotReadyAddresses
にIPを入れる条件は?- 以下の条件を両方満たした場合:
-
Addresses
に追加する条件に満たなかった -
pod.Spec.RestartPolicy
により-
RestartPolicyNever
->PodFailed
でもPodSucceeded
でもない場合 -
RestartPolicyOnFailure
->PodSucceeded
でない場合 - その他
-
-
- 以下の条件を両方満たした場合:
コードを読む
基本はこれ: endpoints_controller.go
Overview
- ServiceのKeyをWorkQueueに入れる
-
Pod
が追加/変更/削除されたときに、そのPodがメンバーとなるServiceのkeyをWorkQueueに入れる -
Endpoint
が削除されたらobjからkeyを取り出して、WorkQueueに入れる -
Service
が追加、更新、削除されたときも同様にkeyを取り出して、WorkQueueに入れる
-
- ReconcileLoopは、Serviceごとに処理をする
- ServiceのSelectorにより対応するPodを取得
- PodからPodIPを取得
- PodのStatusによりPodのIpをEndpointsの中の
Addresses
にいれるか,NotReadyAddresses
に入れる - 現状のEndpointsを取得
- 必要があればEndpointsの作成または更新
詳細
エントリーポイント
EndpointsControllerは、kube-controller-manager にいて、NewEndpointController + Run が呼ばれてスタート するので、この2つをエントリーポイントとして理解すれば良い
NewEndpointController
EndpointsController
を作成して返す。
-
NewEndpointController
では3つのinformerがある-
serviceInformer
-
AddFunc
: onServiceUpdate = objからkeyを取り出して (namespace/name
)、serviceSelectorCacheのSelectorを更新して、WorkQueueにkeyを入れる -
UpdateFunc
: onServiceUpdate 上と同じ -
DeleteFunc
: onServiceDelete = serviceSelectorCacheからkeyを取り除いてqueueに入れる
-
-
podInformer
-
AddFunc
: addPod = Podが追加されたら、属するサービスをqueueに追加する -
UpdateFunc
: updatePod = podが更新されたら、前まで属していたServiceとこれから属するServiceを探し、それらをqueueに流す。 -
DeleteFunc
: deletePod = podが消されたらv1.PodかDeletionFinalStateUnknownをobjとして受け取り endpointutliからpodを取り出し、nilじゃなかったら addPodに追加
-
-
endpointInformer
-
DeleteFunc
: onEndpointsDelete = endpointのkey取り出してqueueに追加
-
-
EndpointsController
Fieldは14個
- client clientset.Interface
- eventBroadcaster record.EventBroadcaster
- eventRecorder record.EventRecorder
- serviceLister corelisters.ServiceLister: ServiceをList/Getするため
- servicesSynced cache.InformerSynced: 初めてsyncされたらTrueを返す
- podLister corelisters.PodLister: PodをList/Getするため
- podsSynced cache.InformerSynced: 同じ
- endpointsLister corelisters.EndpointsLister: EndpointsをList/Getするため
- endpointsSynced cache.InformerSynced: 同じ
- queue workqueue.RateLimitingInterface: 更新が必要なServiceを格納する
- workerLoopPeriod time.Duration: 複数のworkerを走らせるのでその間隔
- triggerTimeTracker *endpointutil.TriggerTimeTracker: 最終更新時間を計算して、EndpointsLastChangeTriggerTimeというannotationを更新する
- endpointUpdatesBatchPeriod time.Duration
- serviceSelectorCache *endpointutil.ServiceSelectorCache: service selectorsのキャッシュ
Run(workers int, stopCh <-chan struct{})
この関数が、kube-controller-manager
で呼ばれている。
workersの数が指定されているので、workerLoopPeriod
間隔を開けてスタートする
worker()
Runの中で複数回呼ばれるメインの関数
processNextWorkItem()
をfor分で呼ぶ
processNextWorkItem()
-
queue
からメッセージを取り出しekey
に格納する。 取得できなければfalse
を返し終了。 - ekeyが存在すれば、
syncService(eKey)
を実行 - エラーがあれば、
handleErr()
- 最後に queueのDoneメソッドを読んで完了し
true
を返す
syncService(key string)
これがいわゆるReconcilation Loopをやっている関数。200行くらいの関数でメインロジックがあるので、いくつかにわけて行く。
1. 対象となるserviceを取得
-
namespace
とname
←key
からセット -
service
←serviceLister
から、 namespaceとnameを指定しservice
を取得。 (今後はこのservice
に対して処理を行う)
2. 対象serviceの状況から直接処理できるケース
-
service
が見つからなかった場合は、削除されているのでname
と同じ名前のEndpoints
を削除する -
service.Spec.Selector
がなければ、nilを返して終了 (Service#without selector)
3. Serviceに対応するPodsを取得
-
pods
←podLister
からNamespaceとserviceのSelectorにより対応するpodを取得
4. Podsに対して処理をする前の準備
-
tolerateUnreadyEndpoints
←service.Spec.PublishNotReadyAddresses
とAnnotation (Deprecated)からセット -
endpointsLastChangeTriggerTime
を計算しセット -
subsets
というv1.EndpointSubset
の空リストを作成。 (ここにEndpointsを入れていき最後にこれにより更新する。)type EndpointSubset struct { Addresses []EndpointAddress NotReadyAddresses []EndpointAddress Ports []EndpointPort }
4. 対象となるPodsに対してEndpointSubsetsを埋めていく
pods
のそれぞれのpodに対して以下の処理を行う:
-
pod.Status.PodIPがない場合→Continue
-
podが削除中の場合→Continue
-
ep, err := podToEndpointAddressForService(service, pod)
← EndpointAddressを取得返されるEndpointAddress&v1.EndpointAddress{ IP: endpointIP, //`IPv6DualStack `がEnableされてない場合は、`endpointIP = pod.Status.PodIP` NodeName: &pod.Spec.NodeName, TargetRef: &v1.ObjectReference{ Kind: "Pod", Namespace: pod.ObjectMeta.Namespace, Name: pod.ObjectMeta.Name, UID: pod.ObjectMeta.UID, ResourceVersion: pod.ObjectMeta.ResourceVersion, }, }
-
epa := *ep
ポインタ変数を変換 -
epa
(EndpointAddress
) をsubsets
(EndpointSubset
)に追加していく
6.service
にportがない場合
1. headless serviceの場合ならendpointAddressをsubsetsに追加するaddEndpointSubset(subsets, pod, epa, nil, tolerateUnreadyEndpoints)
7.service
に portがある場合:
1.service.Spec.Ports
に対してそれぞれ
1.endpointPortFromServicePort(servicePort, portNum)
でEndpointPortインスタンスを生成
2.addEndpointSubset(subsets, pod, epa, epp, tolerateUnreadyEndpoints)
により追加addEndpointSubset()if tolerateUnreadyEndpoints || podutil.IsPodReady(pod) { // ServiceにPublishNotReadyAddressesがTrueとセットされてるorPodがReadyの場合 subsets = append(subsets, v1.EndpointSubset{ Addresses: []v1.EndpointAddress{epa}, Ports: ports, }) else if shouldPodBeInEndpoints(pod) { // それ以外で詳細は`RestartPolicy`依存だが、基本はReadyになってるべきだけど一時的にReadyではないような状態の場合 subsets = append(subsets, v1.EndpointSubset{ NotReadyAddresses: []v1.EndpointAddress{epa}, Ports: ports, }) }
#### 5. 集めたEndpointSubsetsを加工
-
subsets = endpoints.RepackSubsets(subsets)
でRepackする。 (複数のPodごとにバラバラにappendされてる、集約する)
[]v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}},
Ports: []v1.EndpointPort{{Port: 111}},
}, {
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5"}},
Ports: []v1.EndpointPort{{Port: 222}},
}, {
Addresses: []v1.EndpointAddress{{IP: "1.2.3.6"}},
Ports: []v1.EndpointPort{{Port: 111}},
}, {
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5"}},
Ports: []v1.EndpointPort{{Port: 333}},
}},
[]v1.EndpointSubset{{
Addresses: []v1.EndpointAddress{{IP: "1.2.3.4"}, {IP: "1.2.3.6"}},
Ports: []v1.EndpointPort{{Port: 111}},
}, {
NotReadyAddresses: []v1.EndpointAddress{{IP: "1.2.3.5"}},
Ports: []v1.EndpointPort{{Port: 222}, {Port: 333}},
}},
6. 現在のEndpointsを取得
-
currentEndpoints
←endpointsLister
を使って現状のEndpoints
を取得し格納、存在していなければ、空のものを作成し格納
currentEndpoints, err := e.endpointsLister.Endpoints(service.Namespace).Get(service.Name)
if err != nil {
if errors.IsNotFound(err) {
currentEndpoints = &v1.Endpoints{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Labels: service.Labels,
},
}
} else {
return err
}
}
7. Endpointsの変更・作成が必要かどうかを判断
-
createEndpoints
←currentEndpoints.resourceVersion
の長さが0の場合は作成する (Resource Verionがないということは、上の行で初期化しただけで、実際には対応するリソースは存在していないことを示すため作成フラグをTrueにする) -
compareLabels
← currentEndpoints.Labelsをセットして、v1.IsHeadlessService
ラベルは除く. (なぜならサービス自体にはセットされないので、diffチェックでfalse negativeになる Endpointだけについてるラベルで、サービスにはセットされてないものなので、無視する) -
createEndpoints
がFalse & subsetsも同じ &compareLabels
が同じ & Capacityも問題ない & → 更新する必要がないのでreturn nil
8. 更新のための新しいEndpointsを準備
-
newEndpoints
← この後のロジックでは作成・更新が必要になるものだけが残っているので、currentEndpoints
からDeepCopy -
newEndpoints.Subsets = subsets
subsetsを更新 -
newEndpoints.Labels = service.Labels
serviceのLabelsをendpointsのlabelにする -
newEndpoints.Annotations
- endpointLastChangeTriggerTimeの更新 (ない場合はAnnotationを消す)
- EndpointsOverCapacityの更新 ← maxCapacity=1000 を超えたsubsetsがあったら
warning
をセット。そうでない場合はKeyを消す
-
newEndpoints.Labels
- nilだったら空マップをセット
-
IsHeadlessService
← Headless Serviceの場合は、service.kubernetes.io/headless=
というラベルをつける. value=""でそれ以外はKeyを消す
9. Endpointsの作成・更新
-
createEndpointsならCreate()でそうでないならUpdate()する
if createEndpoints { // No previous endpoints, create them _, err = e.client.CoreV1().Endpoints(service.Namespace).Create(context.TODO(), newEndpoints, metav1.CreateOptions{}) } else { // Pre-existing _, err = e.client.CoreV1().Endpoints(service.Namespace).Update(context.TODO(), newEndpoints, metav1.UpdateOptions{}) }
-
errorがあればログを書いたりeventRecorderに書いたりして return errする
-
errorがなければnilを返す
その他細かいもの
-
maxCapacity
: 1000個以上行くとAnnotationにWarningがつく -
PublishNotReadyAddresses
: PodがReadyでなくても、そのIPをEndpointsに含めるかのBoolean -
podToEndpointAddressForService
: podからEndpointAddressを生成する関数。IPv6DualStack
がEnableされてない場合は、endpointIP = pod.Status.PodIP
を使ってEndpointAddress
を初期化したものを返す。 -
ShouldSetHostname
: podにHostnameがあって、PodのSubdomainとServiceのNameが同じで、ServiceとPodが同じNamespaceであればTrue -
addEndpointSubset(subsets []v1.EndpointSubset, pod *v1.Pod, epa v1.EndpointAddress, epp *v1.EndpointPort, tolerateUnreadyEndpoints bool) ([]v1.EndpointSubset, int, int)
:-
epp
があればEndpointsPort
に追加 -
tolerateUnreadyEndpoints
がtrueかpodがReadyであれば、subsetsにEndpointSubset{ Address: EndpointAddress, Port: ports}
を追加 - そうでない場合には、
shouldPodBeInEndpoints(pod)
のときには、NotReadyAddresses
にEndpointAddressを入れて追加する-
shouldPodBeInEndpoints(pod *v1.Pod)
は、RestartPolicyによってことなり、-
Never
:pod.Status.Phase
がv1.PodFailed
でもv1.PodSucceeded
でもない場合にTrue -
OnFailure
:pod.Status.Phase
がv1.PodFailed
でない場合にTrue - default: true
-
-
-
-
endpointsLastChangeTriggerTime
を計算- triggerTimeTrackerのServiceStatesをServiceのNamespaceとNameから取得し
state
に入れる - minChangedTriggerTimeをPodの
LastTransitionTime
から取得 -
state
内のそのPodのlastPodTriggerTimes[pod.Name]
よりもPodがあとに更新されていれば、 minChangedTirggerTimeをpodTriggerTimeと比較して小さい方をminChangedTirggerTime
にセットする - serviceTriggerTimeのほうがminChangedTriggerTimeより小さければ、
minChangedTirggerTime
を更新する - stateのlastPodTriggerTimesとlastServiceTriggerTimeを更新
- ServiceStatesを更新してFuncを抜ける
- 返り値は、PodとServiceで一番小さい
lastPodTriggerTime
かCreationTimestamp
- triggerTimeTrackerのServiceStatesをServiceのNamespaceとNameから取得し
まとめ
Endpointsの管理のされ方を、コードベースで詳細まで確認できた。コントローラのコード読んだことない人の助けになれればと。
今後は、Endpointsに依存している部分 (例. Serviceの名前解決kube-proxy?やPrometheusOperatorでの使われ方) をちゃんと見てみたい。