5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

3-shakeAdvent Calendar 2023

Day 23

Kubernetesのソースコードを読む Kubelet編

Last updated at Posted at 2023-12-25

起動処理

Kubeletの起動処理についてソースコードを追っていき、どんな処理をしているのかみていきたいと思います。

読むソースコード: バージョン: v1.27.2

まず、Kubeletの起動処理について追っていきたいと思います。

appパッケージのKubeletコマンド

Kubeletのエントリーポイントは、cmdディレクトリのappパッケージ内で定義されたmain関数になります。

main

エントリーポイント

  • Kubeletのエントリーポイントは以下
func main() {
	command := app.NewKubeletCommand()
	code := cli.Run(command)
	os.Exit(code)
}
  • 最初にappパッケージ内で定義されたNewKubeletCommandを実行しています。

  • NewKubeletCommandを実行することでcobra.Commandを生成しています。

  • cli.Run(command)することで、NewKubeletCommandで定義したRunE関数を実行しています。

    • この辺もちゃんと説明するとちょっと長くなります。

      • cliというのはk8s.io/component-base/cliライブラリのこと
      • そのRun関数を実行するということはcobra.CommandのExecuteを実行するということ
        • cmd.Execute()
      • Execute→ExecuteC→executeと実行されていく
      • executeに色々書かれているが、今回関係ある部分は以下で、
        • cobraのRunEが定義されていればRunEを実行する
        • そうでなければ、Runを実行
      if c.RunE != nil {
      		if err := c.RunE(c, argWoFlags); err != nil {
      			return err
      		}
      	} else {
      		c.Run(c, argWoFlags)
      	}
      
  • cobra

NewKubeletCommand

  • main関数の部分で見た通り、(フラグなどの細かい部分を省略すると)KubeletはNewKubeletCommandを実行して生成されたcobra.CommandのRunE関数を実行していることがわかります。
  • RunE関数を実行するということは同じappパッケージ内で定義されたRun関数を実行することになります。
  • そのため、Run関数の実装を見にいきます。
// NewKubeletCommand creates a *cobra.Command object with default parameters
func NewKubeletCommand() *cobra.Command {
  .....
	cmd := &cobra.Command{
			Use: componentKubelet,
			Long: `The kubelet is the primary "node agent" that runs on each
  .....`
      RunE: func(cmd *cobra.Command, args []string) error {
  .....
        // run the kubelet
	    return Run(ctx, kubeletServer, kubeletDeps, utilfeature.DefaultFeatureGate)
      }

Run

  • Run関数は結局run関数を実行していることになるので、run関数を見にいきます
// Run runs the specified KubeletServer with the given Dependencies. This should never exit.
// The kubeDeps argument may be nil - if so, it is initialized from the settings on KubeletServer.
// Otherwise, the caller is assumed to have set up the Dependencies object and a default one will
// not be generated.
func Run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) error {
	// To help debugging, immediately log version
	klog.InfoS("Kubelet version", "kubeletVersion", version.Get())

	klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

	if err := initForOS(s.KubeletFlags.WindowsService, s.KubeletFlags.WindowsPriorityClass); err != nil {
		return fmt.Errorf("failed OS init: %w", err)
	}
	if err := run(ctx, s, kubeDeps, featureGate); err != nil {
		return fmt.Errorf("failed to run Kubelet: %w", err)
	}
	return nil
}

run

func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
   ... 
   // 784行目
   if err := RunKubelet(s, kubeDeps, s.RunOnce); err != nil {
		return err
	}
}

RunKubelet

  • メインループに関わる部分では以下の処理が実施されています。
    • createAndInitKubeletでpkg/kubeletで定義されているKubelet構造体を初期化する
    • (runOnceがfalseの時)startKubeletを実行する
// RunKubelet is responsible for setting up and running a kubelet.  It is used in three different applications:
//
//	1 Integration tests
//	2 Kubelet binary
//	3 Standalone 'kubernetes' binary
//
// Eventually, #2 will be replaced with instances of #3
func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {
   ...
   // 1139行目
   k, err := createAndInitKubelet(kubeServer,
		kubeDeps,
		hostname,
		hostnameOverridden,
		nodeName,
		nodeIPs)
   ... 
   
   // 1160行目
   // process pods and exit.
	if runOnce {
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
			return fmt.Errorf("runonce failed: %w", err)
		}
		klog.InfoS("Started kubelet as runonce")
	} else {
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
		klog.InfoS("Started kubelet")
	}
  return nil
}

createAndInitKubelet

  • RunKubeletの部分でも記載したが、NewMainKubeletを実行してKubelet構造体を初期化する
  • NewMainKubeletはkubeletパッケージで定義された関数

func createAndInitKubelet(kubeServer *options.KubeletServer,
  kubeDeps *kubelet.Dependencies,
	hostname string,
	hostnameOverridden bool,
	nodeName types.NodeName,
	nodeIPs []net.IP) (k kubelet.Bootstrap, err error) {
	// TODO: block until all sources have delivered at least one update to the channel, or break the sync loop
	// up into "per source" synchronizations

	k, err = kubelet.NewMainKubelet(&kubeServer.KubeletConfiguration,
		kubeDeps,
    .... // 引数たくさん
    kubeServer.KubeletFlags.SeccompDefault || kubeServer.KubeletConfiguration.SeccompDefault)
  
  ...

  return k, nil
}

startKubelet

  • startKubeletはkubelet.BootstrapのRun関数のゴルーチンを実行しています
  • kubeletパッケージのBootstrapはinterfaceとなっています。
  • RunKubeletで第一引数にKubelet構造体を入れているので、結局Kubelet構造体のRun関数を実行していることになります。
    • →Bootstrap interfaceはKubelet 構造体(struct)にて実装されている
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
	go k.Run(podCfg.Updates())

}

ここまで振り返り

  • ここまでで分かったことを一旦振り返ると、大体以下のことになります
    • Kubeletを実行するということはappパッケージ内のmain関数を実行するということ
    • それはつまり、kubeletパッケージ内のKubelet構造体のRun関数の後ルーチンを実行するということ

KubeletパッケージのKubelet構造体

  • ここからはkubeletパッケージの処理を追っていきます
  • ここまででわかった通り、「Kubeletを実行する→Kubelet構造体のRun関数を実行する」ことがわかったので、Run関数を見ていきます

Run

  • Run関数の中で、cloudResourceSyncManager実行、volumeManager実行などさまざまな処理を実行しています
  • メインループの部分はsyncLoop関数を実行している部分になります
  • syncLoopの3番目の引数にkl(Kubelet構造体)が入れられています
    • 後続のsyncLoopやsyncLoopIterationの中のSyncHandler interfaceの実装がKubelet構造体だということがわかります。
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
   ...
   // Start the cloud provider sync manager
	if kl.cloudResourceSyncManager != nil {
		go kl.cloudResourceSyncManager.Run(wait.NeverStop)
	}
  ...

  // Start volume manager
	go kl.volumeManager.Run(kl.sourcesReady, wait.NeverStop)
  ...

  // Start component sync loops.
	kl.statusManager.Start()
  ...

  // 1604行目
  kl.syncLoop(ctx, updates, kl)
}

syncLoop

  • コメントに変更を処理するためのメインループと記載されています。
  • この中で実際の処理をしている部分が2279行目から始まる部分で、無限ループでsyncLoopIteration関数を実行し続けています。
// syncLoop is the main loop for processing changes. It watches for changes from
// three channels (file, apiserver, and http) and creates a union of them. For
// any new change seen, will run a sync against desired state and running state. If
// no changes are seen to the configuration, will synchronize the last known desired
// state every sync-frequency seconds. Never returns.
func (kl *Kubelet) syncLoop(ctx context.Context, updates <-chan kubetypes.PodUpdate, handler SyncHandler) {
    ...		

    // 2279行目
    for {
				if err := kl.runtimeState.runtimeErrors(); err != nil {
					klog.ErrorS(err, "Skipping pod synchronization")
					// exponential backoff
					time.Sleep(duration)
					duration = time.Duration(math.Min(float64(max), factor*float64(duration)))
					continue
				}
				// reset backoff if we have a success
				duration = base
		
				kl.syncLoopMonitor.Store(kl.clock.Now())
				if !kl.syncLoopIteration(ctx, updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) {
					break
				}
				kl.syncLoopMonitor.Store(kl.clock.Now())
			}
}

syncLoopIteration

  • syncLoopIteration以下の様々なチャネルから受け取ったイベントに応じて処理を実行することになります。
    • configCh
    • syncCh
    • housekeepingCh
    • plegCh
  • それぞれのイベントを受け取った処理の中で、handler.HandlePodAdditions(u.Pods)など、handlerで定義された処理が実行されています。
  • Run関数の中で説明した通り、SyncHandlerの実装はKubelet構造体なので、HandlePodAdditionsなどの処理はKubelet構造体で定義された関数を見れば良いことになります。
  • Handlerの説明は後述します。
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
  
  case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
    case kubetypes.UPDATE:
			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodUpdates(u.Pods)
      ....
}

configCh <-chan kubetypes.PodUpdate

syncLoopIterationの中で受け取っているチャネルのconfigChについて理解していきます。

configCh

  • 遡ると、これは元々は(kubeletパッケージの)Run関数で引数として渡されている「configCh <-chan kubetypes.PodUpdate」のことでした。
// Run starts the kubelet reacting to config updates
func (kl *Kubelet) Run(updates <-chan kubetypes.PodUpdate) {
   ...
}
  • Runはappパッケージ内のstartKubeletで定義された、PodConfigのUpdates()関数だということがわかります。
func startKubelet(k kubelet.Bootstrap, podCfg *config.PodConfig, kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *kubelet.Dependencies, enableServer bool) {
	// start the kubelet
	go k.Run(podCfg.Updates())

}

RunKubelet

  • startKubeletで渡されているpodCfgはkubeDeps.PodConfigであることがわかります
  • コメントからもわかる通り、kubeDeps.PodConfigはNewMainKubelet内で初期化されています

func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencies, runOnce bool) error {

   
  // 1149行目
  // NewMainKubelet should have set up a pod source config if one didn't exist
	// when the builder was run. This is just a precaution.
	if kubeDeps.PodConfig == nil {
		return fmt.Errorf("failed to create kubelet, pod source config was nil")
	}
	podCfg := kubeDeps.PodConfig

   // 1160行目
   // process pods and exit.
	if runOnce {
		if _, err := k.RunOnce(podCfg.Updates()); err != nil {
			return fmt.Errorf("runonce failed: %w", err)
		}
		klog.InfoS("Started kubelet as runonce")
	} else {
		startKubelet(k, podCfg, &kubeServer.KubeletConfiguration, kubeDeps, kubeServer.EnableServer)
		klog.InfoS("Started kubelet")
	}
  return nil
}

NewMainKubelet

  • RunKubeletの中で、createAndInitKubeletを実行し、その中でNewMainKubeletを実行しています。
  • kubeDeps.PodConfigは414行目からはじまるmakePodSourceConfigの中で初期化されています。
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration,
	kubeDeps *Dependencies,
  ...
  seccompDefault bool,
) (*Kubelet, error) {
  ...

  // 414行目
	if kubeDeps.PodConfig == nil {
			var err error
			kubeDeps.PodConfig, err = makePodSourceConfig(kubeCfg, kubeDeps, nodeName, nodeHasSynced)
			if err != nil {
				return nil, err
			}
		}

}

makePodSourceConfig

  • makePodSourceConfigで以下の更新を受け取っていることがわかります。
    • URLから指定されたStaticPod
    • ファイルパスで指定されたStaticPod
    • API Serverから取得したPod
  • チャネルはNewPodConfigで初期化されています
  • NewSourceFileとNewSourceURLの説明は省略
  • NewSourceApiserverの説明をします
  • NewSourceApiserverの中では以下の処理を行っています。
    • API Serverにリクエストを送る
    • 送った後チャネルにデータを送信する。
    • この時送信するチャネルはcfg.Channel(ctx, kubetypes.ApiserverSource)で定義したチャネルで、そのあとのMerge関数を経てNewPodConfigで定義した共通(URL,ファイルパス,API)のチャネルに送信されます。
// makePodSourceConfig creates a config.PodConfig from the given
// KubeletConfiguration or returns an error.
func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies, nodeName types.NodeName, nodeHasSynced func() bool) (*config.PodConfig, error) {
	manifestURLHeader := make(http.Header)
	if len(kubeCfg.StaticPodURLHeader) > 0 {
		for k, v := range kubeCfg.StaticPodURLHeader {
			for i := range v {
				manifestURLHeader.Add(k, v[i])
			}
		}
	}

	// source of all configuration
	cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder, kubeDeps.PodStartupLatencyTracker)

	// TODO:  it needs to be replaced by a proper context in the future
	ctx := context.TODO()

	// define file config source
	if kubeCfg.StaticPodPath != "" {
		klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
		config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
	}

	// define url config source
	if kubeCfg.StaticPodURL != "" {
		klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
		config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
	}

	if kubeDeps.KubeClient != nil {
		klog.InfoS("Adding apiserver pod source")
		config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
	}
	return cfg, nil
}

NewPodConfig

  • NewPodConfigでkubetypes.PodUpdate型のバッファ付きチャネルを定義しています
    • 50個までは待たされることなく、送信されチャネルがいっぱいになったら、待機
  • PodConfig構造体を返す
// NewPodConfig creates an object that can merge many configuration sources into a stream
// of normalized updates to a pod configuration.
func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder, startupSLIObserver podStartupSLIObserver) *PodConfig {
	updates := make(chan kubetypes.PodUpdate, 50)
	storage := newPodStorage(updates, mode, recorder, startupSLIObserver)
	podConfig := &PodConfig{
		pods:    storage,
		mux:     config.NewMux(storage),
		updates: updates,
		sources: sets.String{},
	}
	return podConfig
}

NewSourceApiserver

  • cache.NewListWatchFromClient関数でAPI Serverから情報を取得するための関数を定義しています
    • 結局、NewSourceApiserverはNewListWatchFromClientで定義したリソースをwatchする関数を実行し続けるという処理になっています。
  • その後、goルーチンの即時関数を定義して実行しています。主な処理は
    • 無限ループの中で、nodeがAPIServerとsyncしている状態になるまでnodeHasSyncedを実行し続けます
    • syncされたら、無限ループを抜けて、newSourceApiserverFromLWを実行します
    • nodeHasSynced関数はNewMainKubelet関数内で定義された
// NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(c clientset.Interface, nodeName types.NodeName, nodeHasSynced func() bool, updates chan<- interface{}) {
	lw := cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))

	// The Reflector responsible for watching pods at the apiserver should be run only after
	// the node sync with the apiserver has completed.
	klog.InfoS("Waiting for node sync before watching apiserver pods")
	go func() {
		for {
			if nodeHasSynced() {
				klog.V(4).InfoS("node sync completed")
				break
			}
			time.Sleep(WaitForAPIServerSyncPeriod)
			klog.V(4).InfoS("node sync has not completed yet")
		}
		klog.InfoS("Watching apiserver")
		newSourceApiserverFromLW(lw, updates)
	}()
}

NewListWatchFromClient

  • client-goの中の話
  • 今回は↓なので
  • cache.NewListWatchFromClient(c.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", string(nodeName)))
    • namespace: 全て
    • resource: pods
    • options:
      • spec.nodeNameがnodeNameと等しいもの
      • nodeNameはRunKubelet(1115行目)関数の中で取得していて、Kubeletが存在しているノードを指している
      • VersionedParamsでクエリするpodのspecを制限している
  • List関数とWatch関数が定義されたListWatch構造体を返す
// NewFilteredListWatchFromClient creates a new ListWatch from the specified client, resource, namespace, and option modifier.
// Option modifier is a function takes a ListOptions and modifies the consumed ListOptions. Provide customized modifier function
// to apply modification to ListOptions with a field selector, a label selector, or any other desired options.
func NewFilteredListWatchFromClient(c Getter, resource string, namespace string, optionsModifier func(options *metav1.ListOptions)) *ListWatch {
	listFunc := func(options metav1.ListOptions) (runtime.Object, error) {
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Do(context.TODO()).
			Get()
	}
	watchFunc := func(options metav1.ListOptions) (watch.Interface, error) {
		options.Watch = true
		optionsModifier(&options)
		return c.Get().
			Namespace(namespace).
			Resource(resource).
			VersionedParams(&options, metav1.ParameterCodec).
			Watch(context.TODO())
	}
	return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}

newSourceApiserverFromLW

  • send関数を定義して、その中では
    • objを受け取ってpods配列に追加し,
    • updatesチャネルにPodUpdateのイベントを送信している
    • ここでupdatesチャネルはmakePodSourceConfigの中でcfg.Channel(ctx, kubetypes.ApiserverSource)で定義したチャネルでKubeletに送信しているチャネルではない。
  • Run関数の詳細は省略するが、大体の処理は以下のようになっています。
    • 上記で定義したList関数とWatch関数を実行し続ける
    • List-Watch関数を実行して取得したリソースの情報をsend関数を利用してupdatesチャネルに送信する
      • NewReflectorの3番目の引数でわたされている「cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)」がUndeltaStore構造体でその中のPushFuncがsend関数として初期化されている。
      • List-Watch関数を実行した結果に対して、send関数を実行してupdatesに送信している感じ
// newSourceApiserverFromLW holds creates a config source that watches and pulls from the apiserver.
func newSourceApiserverFromLW(lw cache.ListerWatcher, updates chan<- interface{}) {
	send := func(objs []interface{}) {
		var pods []*v1.Pod
		for _, o := range objs {
			pods = append(pods, o.(*v1.Pod))
		}
		updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.ApiserverSource}
	}
	r := cache.NewReflector(lw, &v1.Pod{}, cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc), 0)
	go r.Run(wait.NeverStop)
}

Run関数の中で下記のReplace関数を実行している

PushFuncがsend関数

// Replace will delete the contents of current store, using instead the given list.
// 'u' takes ownership of the list, you should not reference the list again
// after calling this function.
// The new contents complete state will be sent by calling PushFunc after replacement.
func (u *UndeltaStore) Replace(list []interface{}, resourceVersion string) error {
	if err := u.Store.Replace(list, resourceVersion); err != nil {
		return err
	}
	u.PushFunc(u.Store.List())
	return nil
}

ここまで

  • newSourceApiserverFromLW関数で、API Serverからデータを取得して、イベントをチャネルに送信していることがわかった。
  • 以下の部分を説明して、kubetypes.ADDのイベントを送信していることを確認しないといけない
    • NewSourceApiserverの最後の引数の部分で、cfg.Channel(ctx, kubetypes.ApiserverSource)となっている部分
    • 上記の関数の中で、Merge関数を利用してkubetypesのイベントを設定し直して、Kubeletが受信しているチャネルにデータを送信している
  • 上記のsend関数の中ではkubetypes.SETでupdatesイベントを送信している。

Channel

  • ChannelでChannelWithContextを実行している
// Channel creates or returns a config source channel.  The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} {
	c.sourcesLock.Lock()
	defer c.sourcesLock.Unlock()
	c.sources.Insert(source)
	return c.mux.ChannelWithContext(ctx, source)
}

ChannelWithContext Listen

  • ChannelWithContextとlisten関数を以下に載せる
  • ChannelWithContextの中では以下の処理を行っている
    • 新しいチャンネルを定義
    • listen関数をnewChannelに対して実行し続ける
      • wait.Untilで指定した関数(listen)をずっと実行し続ける
// ChannelWithContext returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
	if len(source) == 0 {
		panic("Channel given an empty name")
	}
	m.sourceLock.Lock()
	defer m.sourceLock.Unlock()
	channel, exists := m.sources[source]
	if exists {
		return channel
	}
	newChannel := make(chan interface{})
	m.sources[source] = newChannel

	go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done())
	return newChannel
}

func (m *Mux) listen(source string, listenChannel <-chan interface{}) {
	for update := range listenChannel {
		m.merger.Merge(source, update)
	}
}

Merge

  • Merge関数は
    • merge関数を実行して、それぞれイベントをadds,updates,deletes,removesに振り分ける
    • 振り分けたイベントをチャネルに送信する
  • URLから取得したStaticPod、API Serverから取得したPおdなどの情報を一つにまとめる役目がある。
// Merge normalizes a set of incoming changes from different sources into a map of all Pods
// and ensures that redundant changes are filtered out, and then pushes zero or more minimal
// updates onto the update channel.  Ensures that updates are delivered in order.
func (s *podStorage) Merge(source string, change interface{}) error {
	s.updateLock.Lock()
	defer s.updateLock.Unlock()

	seenBefore := s.sourcesSeen.Has(source)
	adds, updates, deletes, removes, reconciles := s.merge(source, change)
	firstSet := !seenBefore && s.sourcesSeen.Has(source)

	// deliver update notifications
	switch s.mode {
	case PodConfigNotificationIncremental:
		if len(removes.Pods) > 0 {
			s.updates <- *removes
		}
		if len(adds.Pods) > 0 {
			s.updates <- *adds
		}
   .....
}

merge

  • localで定義されたupdatePodsFuncを実行して、podsをaddPods,updatePods,deletePods,removePods,reconcilePodsに振り分ける。
  • 振り分けたPodsをチャネルに送信するイベントとして返す
    • ここでkubetypes.ADDなどを設定している
func (s *podStorage) merge(source string, change interface{}) (adds, updates, deletes, removes, reconciles *kubetypes.PodUpdate) {
  // updatePodFunc is the local function which updates the pod cache *oldPods* with new pods *newPods*.
	// After updated, new pod will be stored in the pod cache *pods*.
	// Notice that *pods* and *oldPods* could be the same cache.
	updatePodsFunc := func(newPods []*v1.Pod, oldPods, pods map[types.UID]*v1.Pod) {
		filtered := filterInvalidPods(newPods, source, s.recorder)
    ...
  }
  ...

  update := change.(kubetypes.PodUpdate)
	switch update.Op {
	case kubetypes.ADD, kubetypes.UPDATE, kubetypes.DELETE:
		if update.Op == kubetypes.ADD {
			klog.V(4).InfoS("Adding new pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
		} else if update.Op == kubetypes.DELETE {
			klog.V(4).InfoS("Gracefully deleting pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
		} else {
			klog.V(4).InfoS("Updating pods from source", "source", source, "pods", klog.KObjSlice(update.Pods))
		}
		updatePodsFunc(update.Pods, pods, pods)
   
  .......
  // API Serverなどからリソースの情報を取得した後はkubetypes.SETなのでこちらが実行される
	case kubetypes.SET:
		klog.V(4).InfoS("Setting pods for source", "source", source)
		s.markSourceSet(source)
		// Clear the old map entries by just creating a new map
		oldPods := pods
		pods = make(map[types.UID]*v1.Pod)
		updatePodsFunc(update.Pods, oldPods, pods)
		for uid, existing := range oldPods {
			if _, found := pods[uid]; !found {
				// this is a delete
				removePods = append(removePods, existing)
			}
		}

	default:
		klog.InfoS("Received invalid update type", "type", update)

	}
  
  s.pods[source] = pods

	adds = &kubetypes.PodUpdate{Op: kubetypes.ADD, Pods: copyPods(addPods), Source: source}
	updates = &kubetypes.PodUpdate{Op: kubetypes.UPDATE, Pods: copyPods(updatePods), Source: source}
	deletes = &kubetypes.PodUpdate{Op: kubetypes.DELETE, Pods: copyPods(deletePods), Source: source}
	removes = &kubetypes.PodUpdate{Op: kubetypes.REMOVE, Pods: copyPods(removePods), Source: source}
	reconciles = &kubetypes.PodUpdate{Op: kubetypes.RECONCILE, Pods: copyPods(reconcilePods), Source: source}

	return adds, updates, deletes, removes, reconciles
}

API Serverからリソース情報を取得、イベントを適当に加工してチャネルに送信しているところまで追うことが確認できました。

Podの作成されるまで

ある程度詳細は省略しつつ、Kubeletの起動処理について追うことができました。
次はPodが作成される流れについてソースコードを追っていきたいと思います。

Kubeletでは以下の関数が定義されています。

  • HandlePodAdditions
  • HandlePodUpdates
  • HandlePodRemoves
  • HandlePodReconcile
  • HandlePodSyncs
  • HandlePodCleanups
  • 上記の関数はKubeletのメインループの中でチャネルから受け取ったイベントを元に、Pod作成などを実施します。
  • イベントを受け取ってから上記の関数が実行されて、Podが作成されるまでの処理を追っていきます。

syncLoopIteration

  • メインループでチャネルからイベントを受け取って処理を行う部分はsyncLoopIterationで行われています。
  • 特にPodの作成更新処理に関するイベントはconfigCh <-chan kubetypes.PodUpdateチャネルから受け取って処理を行っています。
  • チャネルから受け取ったkubetypesによってそれぞれ処理が分岐します。
  • Podが作成される時はkubetypes.ADDのようです。
  • kubetypes.ADDのイベントを受け取ると、HandlePodAdditionsが実行されます。
func (kl *Kubelet) syncLoopIteration(ctx context.Context, configCh <-chan kubetypes.PodUpdate, handler SyncHandler,
	syncCh <-chan time.Time, housekeepingCh <-chan time.Time, plegCh <-chan *pleg.PodLifecycleEvent) bool {
	select {
	case u, open := <-configCh:
		// Update from a config source; dispatch it to the right handler
		// callback.
		if !open {
			klog.ErrorS(nil, "Update channel is closed, exiting the sync loop")
			return false
		}

		switch u.Op {
		case kubetypes.ADD:
			klog.V(2).InfoS("SyncLoop ADD", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			// After restarting, kubelet will get all existing pods through
			// ADD as if they are new pods. These pods will then go through the
			// admission process and *may* be rejected. This can be resolved
			// once we have checkpointing.
			handler.HandlePodAdditions(u.Pods)
		case kubetypes.UPDATE:
			klog.V(2).InfoS("SyncLoop UPDATE", "source", u.Source, "pods", klog.KObjSlice(u.Pods))
			handler.HandlePodUpdates(u.Pods)
...
}

ハンドラー

  • メインループで実行されるハンドラーはSyncHandlerというinterfaceで定義されています。
  • syncLoopIterationを実行するときにSyncHandlerとして、Kubelet(構造体)を渡しているため、KubeletのHandlePodAdditionsが実行されることになります。
// SyncHandler is an interface implemented by Kubelet, for testability
type SyncHandler interface {
	HandlePodAdditions(pods []*v1.Pod)
	HandlePodUpdates(pods []*v1.Pod)
	HandlePodRemoves(pods []*v1.Pod)
	HandlePodReconcile(pods []*v1.Pod)
	HandlePodSyncs(pods []*v1.Pod)
	HandlePodCleanups(ctx context.Context) error
}

HandlePodAdditions

  • Kubeletがkubetypes.ADDのイベントを受け取った時はHandlePodAdditionsが実行されます。
  • HandlePodAdditionsでは主に以下の処理を実施しています
  • podManagerの処理
    • podManagerはNewMainKubeletで初期化(NewBasicPodManager)され、実装はbasicManagerです。
    • KubeletがPodの状態を管理するのに利用しているようです。
    • (NewMainKubeletのコメントにはconfigMapとsecretの状態も管理しているよう)
  • podWorkersの処理
    • !kl.podWorkers.IsPodTerminationRequested(pod.UID)
  • statusManagerの処理
    • kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name)
    • statusManagerからResources.Requestsを取得して、コンテナに設定する
  • canAdmitPod
    • falseならば、rejectPodする
  • 最終的にdispatchWorkが実行されます。
// HandlePodAdditions is the callback in SyncHandler for pods being added from
// a config source.
func (kl *Kubelet) HandlePodAdditions(pods []*v1.Pod) {
	start := kl.clock.Now()
	sort.Sort(sliceutils.PodsByCreationTime(pods))
	if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
		kl.podResizeMutex.Lock()
		defer kl.podResizeMutex.Unlock()
	}
	for _, pod := range pods {
		existingPods := kl.podManager.GetPods()
		// Always add the pod to the pod manager. Kubelet relies on the pod
		// manager as the source of truth for the desired state. If a pod does
		// not exist in the pod manager, it means that it has been deleted in
		// the apiserver and no action (other than cleanup) is required.
		kl.podManager.AddPod(pod)

		if kubetypes.IsMirrorPod(pod) {
			kl.handleMirrorPod(pod, start)
			continue
		}

		// Only go through the admission process if the pod is not requested
		// for termination by another part of the kubelet. If the pod is already
		// using resources (previously admitted), the pod worker is going to be
		// shutting it down. If the pod hasn't started yet, we know that when
		// the pod worker is invoked it will also avoid setting up the pod, so
		// we simply avoid doing any work.
		if !kl.podWorkers.IsPodTerminationRequested(pod.UID) {
			// We failed pods that we rejected, so activePods include all admitted
			// pods that are alive.
			activePods := kl.filterOutInactivePods(existingPods)

			if utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) {
				// To handle kubelet restarts, test pod admissibility using AllocatedResources values
				// (for cpu & memory) from checkpoint store. If found, that is the source of truth.
				podCopy := pod.DeepCopy()
				for _, c := range podCopy.Spec.Containers {
					allocatedResources, found := kl.statusManager.GetContainerResourceAllocation(string(pod.UID), c.Name)
					if c.Resources.Requests != nil && found {
						c.Resources.Requests[v1.ResourceCPU] = allocatedResources[v1.ResourceCPU]
						c.Resources.Requests[v1.ResourceMemory] = allocatedResources[v1.ResourceMemory]
					}
				}
				// Check if we can admit the pod; if not, reject it.
				if ok, reason, message := kl.canAdmitPod(activePods, podCopy); !ok {
					kl.rejectPod(pod, reason, message)
					continue
				}
				// For new pod, checkpoint the resource values at which the Pod has been admitted
				if err := kl.statusManager.SetPodAllocation(podCopy); err != nil {
					//TODO(vinaykul,InPlacePodVerticalScaling): Can we recover from this in some way? Investigate
					klog.ErrorS(err, "SetPodAllocation failed", "pod", klog.KObj(pod))
				}
			} else {
				// Check if we can admit the pod; if not, reject it.
				if ok, reason, message := kl.canAdmitPod(activePods, pod); !ok {
					kl.rejectPod(pod, reason, message)
					continue
				}
			}
		}
		mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod)
		kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start)
	}
}

dispatchWork

  • dispatchWorkではpodWorkersのUpdatePodを実行します。
    • kl.podWorkersはNewMainKubeletの中で初期化(newPodWorkers)されています。
    • (interfaceがPodWorkersで実装がpodWorkers)
  • syncTypeがkubetypes.SyncPodCreateの時はObserveも実行されます。
// dispatchWork starts the asynchronous sync of the pod in a pod worker.
// If the pod has completed termination, dispatchWork will perform no action.
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) {
	// Run the sync in an async worker.
	kl.podWorkers.UpdatePod(UpdatePodOptions{
		Pod:        pod,
		MirrorPod:  mirrorPod,
		UpdateType: syncType,
		StartTime:  start,
	})
	// Note the number of containers for new pods.
	if syncType == kubetypes.SyncPodCreate {
		metrics.ContainersPerPodCount.Observe(float64(len(pod.Spec.Containers)))
	}
}

UpdatePod

  • 受け取ったUpdatePodOptionsによって様々な処理をしている
    • Podがrunnableなのか、terminatingやevictedの状態なのかに応じて処理を行います。
  • 最終的にp.podWorkerLoop(uid, outCh)のゴルーチンを実行しています。
// UpdatePod carries a configuration change or termination state to a pod. A pod is either runnable,
// terminating, or terminated, and will transition to terminating if: deleted on the apiserver,
// discovered to have a terminal phase (Succeeded or Failed), or evicted by the kubelet.
func (p *podWorkers) UpdatePod(options UpdatePodOptions) {
	// Handle when the pod is an orphan (no config) and we only have runtime status by running only
	// the terminating part of the lifecycle. A running pod contains only a minimal set of information
	// about the pod
  ...

    // spawn a pod worker
		go func() {
			// TODO: this should be a wait.Until with backoff to handle panics, and
			// accept a context for shutdown
			defer runtime.HandleCrash()
			defer klog.V(3).InfoS("Pod worker has stopped", "podUID", uid)
			p.podWorkerLoop(uid, outCh)
		}()

...
	  select {
		case podUpdates <- struct{}{}:
		default:
		}
	  }
}

podWorkerLoop

  • Podの更新の状態(update.WorkType)によって処理を行う即時関数の実行します
  • Podが作成される場合はp.podSyncer.SyncPodが実行されます。
  • p.podSyncer.SyncPodのpはpodWorkersでnewPodWorkersで初期化した際に、podSyncerとしてKubelet(構造体)を渡しているので、KubeletのSyncPodが実行されます。
// podWorkerLoop manages sequential state updates to a pod in a goroutine, exiting once the final
// state is reached. The loop is responsible for driving the pod through four main phases:
//
// 1. Wait to start, guaranteeing no two pods with the same UID or same fullname are running at the same time
// 2. Sync, orchestrating pod setup by reconciling the desired pod spec with the runtime state of the pod
// 3. Terminating, ensuring all running containers in the pod are stopped
// 4. Terminated, cleaning up any resources that must be released before the pod can be deleted
//
// The podWorkerLoop is driven by updates delivered to UpdatePod and by SyncKnownPods. If a particular
// sync method fails, p.workerQueue is updated with backoff but it is the responsibility of the kubelet
// to trigger new UpdatePod calls. SyncKnownPods will only retry pods that are no longer known to the
// caller. When a pod transitions working->terminating or terminating->terminated, the next update is
// queued immediately and no kubelet action is required.
func (p *podWorkers) podWorkerLoop(podUID types.UID, podUpdates <-chan struct{}) {
	var lastSyncTime time.Time
  ...

  err := func() error {
      // Take the appropriate action (illegal phases are prevented by UpdatePod)
			switch {
			case update.WorkType == TerminatedPod:
				err = p.podSyncer.SyncTerminatedPod(ctx, update.Options.Pod, status)

			case update.WorkType == TerminatingPod:
				var gracePeriod *int64
				if opt := update.Options.KillPodOptions; opt != nil {
					gracePeriod = opt.PodTerminationGracePeriodSecondsOverride
				}
				podStatusFn := p.acknowledgeTerminating(podUID)

				// if we only have a running pod, terminate it directly
				if update.Options.RunningPod != nil {
					err = p.podSyncer.SyncTerminatingRuntimePod(ctx, update.Options.RunningPod)
				} else {
					err = p.podSyncer.SyncTerminatingPod(ctx, update.Options.Pod, status, gracePeriod, podStatusFn)
				}

			default:
				isTerminal, err = p.podSyncer.SyncPod(ctx, update.Options.UpdateType, update.Options.Pod, update.Options.MirrorPod, status)
			}
  }()
}

SyncPod

  • SyncPod関数で、そのPodを実行して良いか判定します。
  • 判定後、kl.containerRuntime.SyncPodを実行して、コンテナランタイムのSyncPodを実行します。
    • containerRuntimeはNewMainKubeletの中で初期化(NewKubeGenericRuntimeManager)されています。
    • 実装はkubeGenericRuntimeManagerなので、kubeGenericRuntimeManagerのSyncPodが実行されます。
func (kl *Kubelet) SyncPod(_ context.Context, updateType kubetypes.SyncPodType, pod, mirrorPod *v1.Pod, podStatus *kubecontainer.PodStatus) (isTerminal bool, err error) {
	// TODO(#113606): connect this with the incoming context parameter, which comes from the pod worker.
	// Currently, using that context causes test failures.
	ctx, otelSpan := kl.tracer.Start(context.TODO(), "syncPod", trace.WithAttributes(
		attribute.String("k8s.pod.uid", string(pod.UID)),
		attribute.String("k8s.pod", klog.KObj(pod).String()),
		attribute.String("k8s.pod.name", pod.Name),
		attribute.String("k8s.pod.update_type", updateType.String()),
		attribute.String("k8s.namespace.name", pod.Namespace),
	))
  ...
  // If the pod should not be running, we request the pod's containers be stopped. This is not the same
	// as termination (we want to stop the pod, but potentially restart it later if soft admission allows
	// it later). Set the status and phase appropriately
	runnable := kl.canRunPod(pod)
  ...

  // Call the container runtime's SyncPod callback
	result := kl.containerRuntime.SyncPod(todoCtx, pod, podStatus, pullSecrets, kl.backOff)
}

SyncPod(kubeGenericRuntimeManager)

  • kubeGenericRuntimeManagerのSyncPodでは以下の処理を行います。
    • sandboxの変更を確認
    • 必要に応じてpod sandboxを削除
    • 実行する必要のないコンテナを削除
    • sandboxの作成
    • ephemeralコンテナの作成
    • initコンテナの作成
    • コンテナのリサイズ処理
      • これは1.27から入ったPodを再起動せずにPodのresourceを更新する処理です。
    • 通常のコンテナを作成する
  • コンテナ(ephemeral, init含む)の作成ではhelper関数としてstart関数を作成して、その関数を実行しています。
  • start関数の中で m.startContainerの部分で実際にコンテナを作成しています。
// SyncPod syncs the running pod into the desired pod by executing following steps:
//
//  1. Compute sandbox and container changes.
//  2. Kill pod sandbox if necessary.
//  3. Kill any containers that should not be running.
//  4. Create sandbox if necessary.
//  5. Create ephemeral containers.
//  6. Create init containers.
//  7. Resize running containers (if InPlacePodVerticalScaling==true)
//  8. Create normal containers.
func (m *kubeGenericRuntimeManager) SyncPod(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, backOff *flowcontrol.Backoff) (result kubecontainer.PodSyncResult) {
	// Step 1: Compute sandbox and container changes.
  podContainerChanges := m.computePodActions(ctx, pod, podStatus)
  ...

  // Helper containing boilerplate common to starting all types of containers.
	// typeName is a description used to describe this type of container in log messages,
	// currently: "container", "init container" or "ephemeral container"
	// metricLabel is the label used to describe this type of container in monitoring metrics.
	// currently: "container", "init_container" or "ephemeral_container"
	start := func(ctx context.Context, typeName, metricLabel string, spec *startSpec) error {
		startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, spec.container.Name)
		result.AddSyncResult(startContainerResult)

		isInBackOff, msg, err := m.doBackOff(pod, spec.container, podStatus, backOff)
		if isInBackOff {
			startContainerResult.Fail(err, msg)
			klog.V(4).InfoS("Backing Off restarting container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
			return err
		}

		metrics.StartedContainersTotal.WithLabelValues(metricLabel).Inc()
		if sc.HasWindowsHostProcessRequest(pod, spec.container) {
			metrics.StartedHostProcessContainersTotal.WithLabelValues(metricLabel).Inc()
		}
		klog.V(4).InfoS("Creating container in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod))
		// NOTE (aramase) podIPs are populated for single stack and dual stack clusters. Send only podIPs.
		if msg, err := m.startContainer(ctx, podSandboxID, podSandboxConfig, spec, pod, podStatus, pullSecrets, podIP, podIPs); err != nil {
			// startContainer() returns well-defined error codes that have reasonable cardinality for metrics and are
			// useful to cluster administrators to distinguish "server errors" from "user errors".
			metrics.StartedContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
			if sc.HasWindowsHostProcessRequest(pod, spec.container) {
				metrics.StartedHostProcessContainersErrorsTotal.WithLabelValues(metricLabel, err.Error()).Inc()
			}
			startContainerResult.Fail(err, msg)
			// known errors that are logged in other places are logged at higher levels here to avoid
			// repetitive log spam
			switch {
			case err == images.ErrImagePullBackOff:
				klog.V(3).InfoS("Container start failed in pod", "containerType", typeName, "container", spec.container, "pod", klog.KObj(pod), "containerMessage", msg, "err", err)
			default:
				utilruntime.HandleError(fmt.Errorf("%v %+v start failed in pod %v: %v: %s", typeName, spec.container, format.Pod(pod), err, msg))
			}
			return err
		}

		return nil
	}

  // Step 8: start containers in podContainerChanges.ContainersToStart.
	for _, idx := range podContainerChanges.ContainersToStart {
		start(ctx, "container", metrics.Container, containerStartSpec(&pod.Spec.Containers[idx]))
	}
}

startContainer

  • startContainerではコメントの通り、以下の処理を行っています
    • イメージをプルする
    • コンテナを作成する
    • コンテナを起動する
    • 必要であれば、起動後の処理を行う
  • コンテナに関する処理はkubeGenericRuntimeManagerのruntimeServiceのCreateContainerやStartContainerで行っています。
// startContainer starts a container and returns a message indicates why it is failed on error.
// It starts the container through the following steps:
// * pull the image
// * create the container
// * start the container
// * run the post start lifecycle hooks (if applicable)
func (m *kubeGenericRuntimeManager) startContainer(ctx context.Context, podSandboxID string, podSandboxConfig *runtimeapi.PodSandboxConfig, spec *startSpec, pod *v1.Pod, podStatus *kubecontainer.PodStatus, pullSecrets []v1.Secret, podIP string, podIPs []string) (string, error) {
	container := spec.container
  // Step 1: pull the image.
	imageRef, msg, err := m.imagePuller.EnsureImageExists(ctx, pod, container, pullSecrets, podSandboxConfig)
  
  ....
  containerID, err := m.runtimeService.CreateContainer(ctx, podSandboxID, containerConfig, podSandboxConfig)

  // Step 3: start the container.
	err = m.runtimeService.StartContainer(ctx, containerID)
}

newInstrumentedRuntimeService

  • runtimeServiceはkubeGenericRuntimeManagerを初期化するNewKubeGenericRuntimeManager関数の中で、newInstrumentedRuntimeService関数で初期化されています。
// NewKubeGenericRuntimeManager creates a new kubeGenericRuntimeManager
func NewKubeGenericRuntimeManager(
	recorder record.EventRecorder,
  ...
  runtimeService internalapi.RuntimeService,
  ...
	tracerProvider trace.TracerProvider,
) (KubeGenericRuntime, error) {
	ctx := context.Background()
	runtimeService = newInstrumentedRuntimeService(runtimeService)
	imageService = newInstrumentedImageManagerService(imageService)
	tracer := tracerProvider.Tracer(instrumentationScope)
	kubeRuntimeManager := &kubeGenericRuntimeManager{
  }

}
  • newInstrumentedRuntimeServiceは実際にはinstrumentedRuntimeServiceを返しているだけです。
// Creates an instrumented RuntimeInterface from an existing RuntimeService.
func newInstrumentedRuntimeService(service internalapi.RuntimeService) internalapi.RuntimeService {
	return &instrumentedRuntimeService{service: service}
}
  • startContainerやcreateContainerはinstrumentedRuntimeServiceでは以下のように定義されています。
  • in.service.CreateContainerの実行、つまりinstrumentedRuntimeServiceのserviceのCreateContainerを実行しています。
func (in instrumentedRuntimeService) CreateContainer(ctx context.Context, podSandboxID string, config *runtimeapi.ContainerConfig, sandboxConfig *runtimeapi.PodSandboxConfig) (string, error) {
	const operation = "create_container"
	defer recordOperation(operation, time.Now())

	out, err := in.service.CreateContainer(ctx, podSandboxID, config, sandboxConfig)
	recordError(operation, err)
	return out, err
}

func (in instrumentedRuntimeService) StartContainer(ctx context.Context, containerID string) error {
	const operation = "start_container"
	defer recordOperation(operation, time.Now())

	err := in.service.StartContainer(ctx, containerID)
	recordError(operation, err)
	return err
}
  • NewKubeGenericRuntimeManagerの引数で渡されているruntimeServiceは何か、確認すると、kubeDeps.RemoteRuntimeServiceであることがわかります。
// NewMainKubelet instantiates a new Kubelet object along with all the required internal modules.
// No initialization of Kubelet and its modules should happen here.
func NewMainKubelet(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ... )
    ...

    runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
    kubecontainer.FilterEventRecorder(kubeDeps.Recorder),
    ... 
    kubeDeps.RemoteRuntimeService,
    ...
    )
    
    
}
  • RemoteRuntimeServiceはappパッケージ(cmdディレクトリ)で定義されたrun関数の中でPreInitRuntimeService関数で初期化されています。
func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Dependencies, featureGate featuregate.FeatureGate) (err error) {
	// Set global feature gates based on the value on the initial KubeletServer

  // 779行目あたり
  err = kubelet.PreInitRuntimeService(&s.KubeletConfiguration, kubeDeps)

PreInitRuntimeService

  • PreInitRuntimeServiceの中でNewRemoteRuntimeServiceを呼び出し、kubeDeps.RemoteRuntimeServiceを設定しています。
// PreInitRuntimeService will init runtime service before RunKubelet.
func PreInitRuntimeService(kubeCfg *kubeletconfiginternal.KubeletConfiguration, kubeDeps *Dependencies) error {
	remoteImageEndpoint := kubeCfg.ImageServiceEndpoint
	if remoteImageEndpoint == "" && kubeCfg.ContainerRuntimeEndpoint != "" {
		remoteImageEndpoint = kubeCfg.ContainerRuntimeEndpoint
	}
	var err error
	if kubeDeps.RemoteRuntimeService, err = remote.NewRemoteRuntimeService(kubeCfg.ContainerRuntimeEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
		return err
	}
	if kubeDeps.RemoteImageService, err = remote.NewRemoteImageService(remoteImageEndpoint, kubeCfg.RuntimeRequestTimeout.Duration, kubeDeps.TracerProvider); err != nil {
		return err
	}

	kubeDeps.useLegacyCadvisorStats = cadvisor.UsingLegacyCadvisorStats(kubeCfg.ContainerRuntimeEndpoint)

	return nil
}

NewRemoteRuntimeService

  • NewRemoteRuntimeServiceの中で、endpointとの接続確認をします。
  • そして、kubeDeps.RemoteRuntimeServiceをremoteRuntimeServiceで初期化します。
  • また、validateServiceConnectionの中でruntimeClientを初期化しています。
// NewRemoteRuntimeService creates a new internalapi.RuntimeService.
func NewRemoteRuntimeService(endpoint string, connectionTimeout time.Duration, tp trace.TracerProvider) (internalapi.RuntimeService, error) {
	klog.V(3).InfoS("Connecting to runtime service", "endpoint", endpoint)
	addr, dialer, err := util.GetAddressAndDialer(endpoint)
	if err != nil {
		return nil, err
	}
	ctx, cancel := context.WithTimeout(context.Background(), connectionTimeout)
	defer cancel()

	dialOpts := []grpc.DialOption{}
	dialOpts = append(dialOpts,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
		grpc.WithContextDialer(dialer),
		grpc.WithDefaultCallOptions(grpc.MaxCallRecvMsgSize(maxMsgSize)))
	if utilfeature.DefaultFeatureGate.Enabled(features.KubeletTracing) {
		tracingOpts := []otelgrpc.Option{
			otelgrpc.WithPropagators(tracing.Propagators()),
			otelgrpc.WithTracerProvider(tp),
		}
		// Even if there is no TracerProvider, the otelgrpc still handles context propagation.
		// See https://github.com/open-telemetry/opentelemetry-go/tree/main/example/passthrough
		dialOpts = append(dialOpts,
			grpc.WithUnaryInterceptor(otelgrpc.UnaryClientInterceptor(tracingOpts...)),
			grpc.WithStreamInterceptor(otelgrpc.StreamClientInterceptor(tracingOpts...)))
	}
	conn, err := grpc.DialContext(ctx, addr, dialOpts...)
	if err != nil {
		klog.ErrorS(err, "Connect remote runtime failed", "address", addr)
		return nil, err
	}

	service := &remoteRuntimeService{
		timeout:      connectionTimeout,
		logReduction: logreduction.NewLogReduction(identicalErrorDelay),
	}

	if err := service.validateServiceConnection(ctx, conn, endpoint); err != nil {
		return nil, fmt.Errorf("validate service connection: %w", err)
	}

	return service, nil
}

さかのぼると

  • 知りたかったのはin.service.StartContainer(ctx, containerID)で「instrumentedRuntimeServiceのserviceは何か」でした。
  • それはNewKubeGenericRuntimeManagerの引数で渡されたkubeDeps.RemoteRuntimeServiceであり、実装はremoteRuntimeServiceであることがわかります。
  • つまり、in.service.StartContainerはremoteRuntimeServiceのStartContainerを実行しています。

StartContainer

  • remoteRuntimeServiceのStartContainerではruntimeClientのStartContainerを呼び出しています。

// StartContainer starts the container.
func (r *remoteRuntimeService) StartContainer(ctx context.Context, containerID string) (err error) {
	klog.V(10).InfoS("[RemoteRuntimeService] StartContainer", "containerID", containerID, "timeout", r.timeout)
	ctx, cancel := context.WithTimeout(ctx, r.timeout)
	defer cancel()

	if _, err := r.runtimeClient.StartContainer(ctx, &runtimeapi.StartContainerRequest{
		ContainerId: containerID,
	}); err != nil {
		klog.ErrorS(err, "StartContainer from runtime service failed", "containerID", containerID)
		return err
	}
	klog.V(10).InfoS("[RemoteRuntimeService] StartContainer Response", "containerID", containerID)

	return nil
}

validateServiceConnection

  • runtimeClientはNewRemoteRuntimeServiceの中でvalidateServiceConnectionを利用して初期化されています
// validateServiceConnection tries to connect to the remote runtime service by
// using the CRI v1 API version and fails if that's not possible.
func (r *remoteRuntimeService) validateServiceConnection(ctx context.Context, conn *grpc.ClientConn, endpoint string) error {
	klog.V(4).InfoS("Validating the CRI v1 API runtime version")
	r.runtimeClient = runtimeapi.NewRuntimeServiceClient(conn)

	if _, err := r.runtimeClient.Version(ctx, &runtimeapi.VersionRequest{}); err != nil {
		return fmt.Errorf("validate CRI v1 runtime API for endpoint %q: %w", endpoint, err)
	}

	klog.V(2).InfoS("Validated CRI v1 runtime API")
	return nil
}
  • NewRuntimeServiceClientはgrpcのクライアントを持つruntimeServiceClientを返します
  • r.runtimeClient.StartContainerを実行するということはruntimeServiceClientのstartContainerを実行することになります。
func NewRuntimeServiceClient(cc *grpc.ClientConn) RuntimeServiceClient {
	return &runtimeServiceClient{cc}
}

StartContainer

  • runtimeServiceClientのStartContainerはari-apiでgrpcを利用して/runtime.v1.RuntimeService/StartContainerにリクエストを送っています。
  • c.ccのccはgrpcライブラリのClientConnです。
  • この後はcontainerdなどがコンテナを起動する処理を行うことになります。

func (c *runtimeServiceClient) StartContainer(ctx context.Context, in *StartContainerRequest, opts ...grpc.CallOption) (*StartContainerResponse, error) {
	out := new(StartContainerResponse)
	err := c.cc.Invoke(ctx, "/runtime.v1.RuntimeService/StartContainer", in, out, opts...)
	if err != nil {
		return nil, err
	}
	return out, nil
}

最後に

ここまででKubeletの処理を追っていき、Podが作成される流れをざっくりと追うことができました。
自分でソースコードを読んで実際の処理の流れを追ってみると、Kubernetesのコンポーネントの理解学深まると思います。

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?