6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

サイバーエージェント24卒内定者Advent Calendar 2023

Day 12

kube-apiserver がサーバーを起動するまで

Last updated at Posted at 2023-12-12

はじめに

この記事はサイバーエージェント24卒内定者 Advent Calendarの12日目の記事です。

コンテナオーケストレーションツールである Kubernetes は様々なコンポーネントで構成されています。ユーザにとって馴染み深いコンポーネントの一つに client-go や kubectl などとやりとりする kube-apiserver があります。この kube-apiserver はその名の通り API Server でサーバーを起動してハンドラーを登録して Kubernetes のコアコンポーネントの役割を担っています。今回は kube-apiserver が実際にサーバーを起動して動くに至っているのかコードを追っていきたいと思います。

kube-apiserver とは

kube-apiserver とは Kubernetes Control Plane のコンポーネントの一つで様々な Kuberntes コンポーネントとやりとりをする中核的役割を担います。例えば、kubectl とやりとりする API Server の提供、etcd をデータストアとして使用、Custom Resource Definition(CRD)による独自リソースの作成と利用など様々な役割を担っています。
components-of-kubernetes.png(引用:https://github.com/kubernetes/website/blob/fb6364da0afd19e8a9515aaae2de9bc74a0a6abd/static/images/docs/components-of-kubernetes.png)

kube-apiserver の概要

kube-apiserver の内部では主に API ServerAPI Extensions ServerAggregator Serverの3つになります。API Server は Deployment などの Kubernetes のコアリソースを扱うための API Server で、API Extensions Server は Custom Resource Definition(CRD) の作成に基づき、それぞれのカスタムリソースを処理するための HTTP ハンドラーなどを扱うコンポーネントで、最後に Aggregator Server は API Extension Server や API Server の HTTP ハンドラーの制御を担っています。また、認証認可などのリクエストの前処理を行う ServerChain や Webhook などのバリデーション処理を行う Addmission Controller など様々な機能がありますが今回の記事では実装を読む際にこれらの用語が出るので紹介しましたが、詳細には触れないため紹介のみに留めておきます。詳しくはこちらを確認してみて下さい。
 2023-12-10 15.42.38.png(引用:https://speakerdeck.com/bells17/kube-api-server?slide=23)

kube-apiserver の実装

今回は kubernetes v1.28.2 を参考に追っていきたいと思います。

エントリーポイント

kube-apiserver のエントリーポイントはhttps://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/apiserver.go に存在し app.NewAPIServerCommand() 関数で kube-apiservercobra.Command を作成し実行しています。

func main() {
	command := app.NewAPIServerCommand()
	code := cli.Run(command)
	os.Exit(code)
}

実行時の挙動

cli.Run(command)が呼ばれると https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/server.go#L78 に記述されたcobra.CommandRunEの実装が呼び出されます。RunE 内ではフラグやログなどの処理を行いサーバーを起動するRun関数を実行します。このRun関数が kube-apiserver のメインロジックを含む実装となっています。

func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
	// To help debugging, immediately log version
	klog.Infof("Version: %+v", version.Get())

	klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))

	config, err := NewConfig(opts)
	if err != nil {
		return err
	}
	completed, err := config.Complete()
	if err != nil {
		return err
	}
	server, err := CreateServerChain(completed)
	if err != nil {
		return err
	}

	prepared, err := server.PrepareRun()
	if err != nil {
		return err
	}

	return prepared.Run(stopCh)
}

この関数のNewConfig()でサーバーの設定情報を格納する構造体を作成し、config.Complete()で足りない設定情報を埋めます。CreateServerChain()では前述した認証認可などのリクエストの前処理を行う ServerChain を作成し、PrepareRun()で Server を実行するのに必要な処理をいくつか実行し、Run()で実際にサーバーを起動します。これらの関数について掘り下げていきます。

NewConfig

NewConfig は https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/config.go#L69 に実装があり API Server(Control Plane)、API Extension Server、Aggregator Server の設定用の Config 構造体を作成しています。

func NewConfig(opts options.CompletedOptions) (*Config, error) {
	c := &Config{
		Options: opts,
	}

	controlPlane, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(opts)
	if err != nil {
		return nil, err
	}
	c.ControlPlane = controlPlane

	apiExtensions, err := apiserver.CreateAPIExtensionsConfig(*controlPlane.GenericConfig, controlPlane.ExtraConfig.VersionedInformers, pluginInitializer, opts.CompletedOptions, opts.MasterCount,
		serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(controlPlane.ExtraConfig.ProxyTransport, controlPlane.GenericConfig.EgressSelector, controlPlane.GenericConfig.LoopbackClientConfig, controlPlane.GenericConfig.TracerProvider))
	if err != nil {
		return nil, err
	}
	c.ApiExtensions = apiExtensions

	aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
	if err != nil {
		return nil, err
	}
	c.Aggregator = aggregator

	return c, nil
}

config.Complete

config.Completehttps://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/config.go#L56 に実装があり、各種サーバーの Config 構造体をCompletedConfig に代入して、サーバー起動に必要な設定値を全て埋めます。

func (c *Config) Complete() (CompletedConfig, error) {
	return CompletedConfig{&completedConfig{
		Options: c.Options,

		Aggregator:    c.Aggregator.Complete(),
		ControlPlane:  c.ControlPlane.Complete(),
		ApiExtensions: c.ApiExtensions.Complete(),

		ExtraConfig: c.ExtraConfig,
	}}, nil
}

CreateServerChain

CreateServerChain の実装は https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/server.go#L174 に実装があり、API Extensions Server と API Server を作成し Aggregator Server に統合します。これにより API Extension Server や API Server で作成した HTTP ハンドラーなどを制御することを可能としています。

func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
	notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
	apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
	if err != nil {
		return nil, err
	}
	crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))

	kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
	if err != nil {
		return nil, err
	}

	// aggregator comes last in the chain
	aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
	if err != nil {
		// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
		return nil, err
	}

	return aggregatorServer, nil
}

PreparedRun

PreparedRunhttps://github.com/kubernetes/kubernetes/blob/v1.28.2/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go#L400 で実装されており、Aggregator Server の起動に必要なコンポーネントを実装します。具体的な内容に関しては本記事から逸脱するので省きますが、詳しく知りたい方は、aggregated discovery documentKubernetes API health endpointsを参照下さい。

Run

Run の実装は https://github.com/kubernetes/kubernetes/blob/v1.28.2/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go#L466 に記載しており、この関数が呼び出している s.runnable.Run の実装は https://github.com/kubernetes/kubernetes/blob/v1.28.2/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go#L496 に記載されています。ここではサーバーの起動に関しての実装が書かれておりここは本記事のメインとなるので詳しく説明していきたいと思います。

サーバー起動処理

s.runnable.Runの実装 はとても長く読むのが大変ですがサーバー起動処理は以下の NonBlockingRun 関数にまとまっています。

stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
if err != nil {
    return err
}

NonBlockingRun は non-blocking-scoket を用いた非同期通信を行うサーバーを起動する関数で、このうち以下のような s.SecureServingInfo.Serve 関数にサーバー起動処理がまとまっています。

stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
if err != nil {
    close(internalStopCh)
    return nil, nil, err
}

そして、s.SecureServingInfo.Serve にサーバーに関する構造体や設定などを行う実装があり最終行に当たる RunServer でサーバーを起動します。最後に RunServerserver.Serveが呼び出されることでサーバーが起動します。

func RunServer(
	server *http.Server,
	ln net.Listener,
	shutDownTimeout time.Duration,
	stopCh <-chan struct{},
) (<-chan struct{}, <-chan struct{}, error) {
	if ln == nil {
		return nil, nil, fmt.Errorf("listener must not be nil")
	}

	// Shutdown server gracefully.
	serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{})
	go func() {
		defer close(serverShutdownCh)
		<-stopCh
		ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
		server.Shutdown(ctx)
		cancel()
	}()

	go func() {
		defer utilruntime.HandleCrash()
		defer close(listenerStoppedCh)

		var listener net.Listener
		listener = tcpKeepAliveListener{ln}
		if server.TLSConfig != nil {
			listener = tls.NewListener(listener, server.TLSConfig)
		}

		err := server.Serve(listener)

		msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String())
		select {
		case <-stopCh:
			klog.Info(msg)
		default:
			panic(fmt.Sprintf("%s due to error: %v", msg, err))
		}
	}()

	return serverShutdownCh, listenerStoppedCh, nil
}

まとめ

今回は kube-apiserver のエントリーポイントから実際にサーバーを起動させる処理まで内部実装を追っていきました。サーバー起動処理まで追うことで kube-apiserverの実装の全体像を掴めたのではないかと思います。今後機会があればハンドラーの処理や今回省略した内容も含めてまとめられたらと思います。最後までお付き合い頂きありがとうございました。

参考文献

6
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
6
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?