はじめに
この記事はサイバーエージェント24卒内定者 Advent Calendarの12日目の記事です。
コンテナオーケストレーションツールである Kubernetes は様々なコンポーネントで構成されています。ユーザにとって馴染み深いコンポーネントの一つに client-go や kubectl などとやりとりする kube-apiserver があります。この kube-apiserver
はその名の通り API Server でサーバーを起動してハンドラーを登録して Kubernetes のコアコンポーネントの役割を担っています。今回は kube-apiserver
が実際にサーバーを起動して動くに至っているのかコードを追っていきたいと思います。
kube-apiserver とは
kube-apiserver
とは Kubernetes Control Plane のコンポーネントの一つで様々な Kuberntes コンポーネントとやりとりをする中核的役割を担います。例えば、kubectl
とやりとりする API Server の提供、etcd をデータストアとして使用、Custom Resource Definition(CRD)による独自リソースの作成と利用など様々な役割を担っています。
(引用:https://github.com/kubernetes/website/blob/fb6364da0afd19e8a9515aaae2de9bc74a0a6abd/static/images/docs/components-of-kubernetes.png)
kube-apiserver の概要
kube-apiserver の内部では主に API Server、API Extensions Server、Aggregator Serverの3つになります。API Server は Deployment などの Kubernetes のコアリソースを扱うための API Server で、API Extensions Server は Custom Resource Definition(CRD) の作成に基づき、それぞれのカスタムリソースを処理するための HTTP ハンドラーなどを扱うコンポーネントで、最後に Aggregator Server は API Extension Server や API Server の HTTP ハンドラーの制御を担っています。また、認証認可などのリクエストの前処理を行う ServerChain や Webhook などのバリデーション処理を行う Addmission Controller など様々な機能がありますが今回の記事では実装を読む際にこれらの用語が出るので紹介しましたが、詳細には触れないため紹介のみに留めておきます。詳しくはこちらを確認してみて下さい。
(引用:https://speakerdeck.com/bells17/kube-api-server?slide=23)
kube-apiserver の実装
今回は kubernetes v1.28.2 を参考に追っていきたいと思います。
エントリーポイント
kube-apiserver
のエントリーポイントはhttps://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/apiserver.go に存在し app.NewAPIServerCommand()
関数で kube-apiserver
の cobra.Command
を作成し実行しています。
func main() {
command := app.NewAPIServerCommand()
code := cli.Run(command)
os.Exit(code)
}
実行時の挙動
cli.Run(command)
が呼ばれると https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/server.go#L78 に記述されたcobra.Command
のRunE
の実装が呼び出されます。RunE 内ではフラグやログなどの処理を行いサーバーを起動するRun
関数を実行します。このRun
関数が kube-apiserver
のメインロジックを含む実装となっています。
func Run(opts options.CompletedOptions, stopCh <-chan struct{}) error {
// To help debugging, immediately log version
klog.Infof("Version: %+v", version.Get())
klog.InfoS("Golang settings", "GOGC", os.Getenv("GOGC"), "GOMAXPROCS", os.Getenv("GOMAXPROCS"), "GOTRACEBACK", os.Getenv("GOTRACEBACK"))
config, err := NewConfig(opts)
if err != nil {
return err
}
completed, err := config.Complete()
if err != nil {
return err
}
server, err := CreateServerChain(completed)
if err != nil {
return err
}
prepared, err := server.PrepareRun()
if err != nil {
return err
}
return prepared.Run(stopCh)
}
この関数のNewConfig()
でサーバーの設定情報を格納する構造体を作成し、config.Complete()
で足りない設定情報を埋めます。CreateServerChain()
では前述した認証認可などのリクエストの前処理を行う ServerChain を作成し、PrepareRun()
で Server を実行するのに必要な処理をいくつか実行し、Run()
で実際にサーバーを起動します。これらの関数について掘り下げていきます。
NewConfig
NewConfig は https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/config.go#L69 に実装があり API Server(Control Plane)、API Extension Server、Aggregator Server の設定用の Config 構造体を作成しています。
func NewConfig(opts options.CompletedOptions) (*Config, error) {
c := &Config{
Options: opts,
}
controlPlane, serviceResolver, pluginInitializer, err := CreateKubeAPIServerConfig(opts)
if err != nil {
return nil, err
}
c.ControlPlane = controlPlane
apiExtensions, err := apiserver.CreateAPIExtensionsConfig(*controlPlane.GenericConfig, controlPlane.ExtraConfig.VersionedInformers, pluginInitializer, opts.CompletedOptions, opts.MasterCount,
serviceResolver, webhook.NewDefaultAuthenticationInfoResolverWrapper(controlPlane.ExtraConfig.ProxyTransport, controlPlane.GenericConfig.EgressSelector, controlPlane.GenericConfig.LoopbackClientConfig, controlPlane.GenericConfig.TracerProvider))
if err != nil {
return nil, err
}
c.ApiExtensions = apiExtensions
aggregator, err := createAggregatorConfig(*controlPlane.GenericConfig, opts.CompletedOptions, controlPlane.ExtraConfig.VersionedInformers, serviceResolver, controlPlane.ExtraConfig.ProxyTransport, controlPlane.ExtraConfig.PeerProxy, pluginInitializer)
if err != nil {
return nil, err
}
c.Aggregator = aggregator
return c, nil
}
config.Complete
config.Complete
は https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/config.go#L56 に実装があり、各種サーバーの Config
構造体をCompletedConfig
に代入して、サーバー起動に必要な設定値を全て埋めます。
func (c *Config) Complete() (CompletedConfig, error) {
return CompletedConfig{&completedConfig{
Options: c.Options,
Aggregator: c.Aggregator.Complete(),
ControlPlane: c.ControlPlane.Complete(),
ApiExtensions: c.ApiExtensions.Complete(),
ExtraConfig: c.ExtraConfig,
}}, nil
}
CreateServerChain
CreateServerChain
の実装は https://github.com/kubernetes/kubernetes/blob/v1.28.2/cmd/kube-apiserver/app/server.go#L174 に実装があり、API Extensions Server と API Server を作成し Aggregator Server に統合します。これにより API Extension Server や API Server で作成した HTTP ハンドラーなどを制御することを可能としています。
func CreateServerChain(config CompletedConfig) (*aggregatorapiserver.APIAggregator, error) {
notFoundHandler := notfoundhandler.New(config.ControlPlane.GenericConfig.Serializer, genericapifilters.NoMuxAndDiscoveryIncompleteKey)
apiExtensionsServer, err := config.ApiExtensions.New(genericapiserver.NewEmptyDelegateWithCustomHandler(notFoundHandler))
if err != nil {
return nil, err
}
crdAPIEnabled := config.ApiExtensions.GenericConfig.MergedResourceConfig.ResourceEnabled(apiextensionsv1.SchemeGroupVersion.WithResource("customresourcedefinitions"))
kubeAPIServer, err := config.ControlPlane.New(apiExtensionsServer.GenericAPIServer)
if err != nil {
return nil, err
}
// aggregator comes last in the chain
aggregatorServer, err := createAggregatorServer(config.Aggregator, kubeAPIServer.GenericAPIServer, apiExtensionsServer.Informers, crdAPIEnabled)
if err != nil {
// we don't need special handling for innerStopCh because the aggregator server doesn't create any go routines
return nil, err
}
return aggregatorServer, nil
}
PreparedRun
PreparedRun
は https://github.com/kubernetes/kubernetes/blob/v1.28.2/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go#L400 で実装されており、Aggregator Server の起動に必要なコンポーネントを実装します。具体的な内容に関しては本記事から逸脱するので省きますが、詳しく知りたい方は、aggregated discovery documentやKubernetes API health endpointsを参照下さい。
Run
Run
の実装は https://github.com/kubernetes/kubernetes/blob/v1.28.2/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go#L466 に記載しており、この関数が呼び出している s.runnable.Run
の実装は https://github.com/kubernetes/kubernetes/blob/v1.28.2/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go#L496 に記載されています。ここではサーバーの起動に関しての実装が書かれておりここは本記事のメインとなるので詳しく説明していきたいと思います。
サーバー起動処理
s.runnable.Run
の実装 はとても長く読むのが大変ですがサーバー起動処理は以下の NonBlockingRun
関数にまとまっています。
stoppedCh, listenerStoppedCh, err := s.NonBlockingRun(stopHttpServerCh, shutdownTimeout)
if err != nil {
return err
}
NonBlockingRun
は non-blocking-scoket を用いた非同期通信を行うサーバーを起動する関数で、このうち以下のような s.SecureServingInfo.Serve
関数にサーバー起動処理がまとまっています。
stoppedCh, listenerStoppedCh, err = s.SecureServingInfo.Serve(s.Handler, shutdownTimeout, internalStopCh)
if err != nil {
close(internalStopCh)
return nil, nil, err
}
そして、s.SecureServingInfo.Serve
にサーバーに関する構造体や設定などを行う実装があり最終行に当たる RunServer
でサーバーを起動します。最後に RunServer
で server.Serve
が呼び出されることでサーバーが起動します。
func RunServer(
server *http.Server,
ln net.Listener,
shutDownTimeout time.Duration,
stopCh <-chan struct{},
) (<-chan struct{}, <-chan struct{}, error) {
if ln == nil {
return nil, nil, fmt.Errorf("listener must not be nil")
}
// Shutdown server gracefully.
serverShutdownCh, listenerStoppedCh := make(chan struct{}), make(chan struct{})
go func() {
defer close(serverShutdownCh)
<-stopCh
ctx, cancel := context.WithTimeout(context.Background(), shutDownTimeout)
server.Shutdown(ctx)
cancel()
}()
go func() {
defer utilruntime.HandleCrash()
defer close(listenerStoppedCh)
var listener net.Listener
listener = tcpKeepAliveListener{ln}
if server.TLSConfig != nil {
listener = tls.NewListener(listener, server.TLSConfig)
}
err := server.Serve(listener)
msg := fmt.Sprintf("Stopped listening on %s", ln.Addr().String())
select {
case <-stopCh:
klog.Info(msg)
default:
panic(fmt.Sprintf("%s due to error: %v", msg, err))
}
}()
return serverShutdownCh, listenerStoppedCh, nil
}
まとめ
今回は kube-apiserver
のエントリーポイントから実際にサーバーを起動させる処理まで内部実装を追っていきました。サーバー起動処理まで追うことで kube-apiserver
の実装の全体像を掴めたのではないかと思います。今後機会があればハンドラーの処理や今回省略した内容も含めてまとめられたらと思います。最後までお付き合い頂きありがとうございました。