LoginSignup
11
3

More than 3 years have passed since last update.

Amazon VPC CNI plugin for KubernetesのソースコードリーディングでEKSのネットワーキングについて理解を深める #2

Last updated at Posted at 2019-12-17

はじめに

この記事はAmazon EKS #2 Advent Calendar 2019 17日目の記事です。

今回は #2 になります。前回からの続きになりますので、先に以下の記事をご覧ください。

  1. Amazon VPC CNI plugin for KubernetesのソースコードリーディングでEKSのネットワーキングについて理解を深める #1
  2. Amazon VPC CNI plugin for KubernetesのソースコードリーディングでEKSのネットワーキングについて理解を深める #2 (イマココ
  3. Amazon VPC CNI plugin for KubernetesのソースコードリーディングでEKSのネットワーキングについて理解を深める #3

前回までで概要を整理しましたので、今回から実際にリポジトリを見ていきます。
Amazon VPC CNIプラグインは、EKSを起動するとdaemonsetで起動するaws-nodeというコンテナです。

Next Generation AWS VPC CNI Plugin

本題に入る前に、前回書ききれなかった内容となりますが、「各ノード(daemonset)で自分のノード内のIPアドレスの管理を行っている」L-IPAMDという仕組みは、"現在"のAmazon VPC CNIプラグインの大きな特徴の1つです。

しかし、Podごとのセキュリティーグループが設定できないことなどのいくつかの課題があがっており、これらの見直しのためにAmazon VPC CNIプラグインのアーキテクチャごと変える動きがあります。

ドキュメントにも将来のL-IPAMDのアーキテクチャの改善について記載があります。

Future
https://github.com/aws/amazon-vpc-cni-k8s/blob/master/docs/cni-proposal.md#future

In the future, we would like to investigate whether to have a cluster ENI manager which manages allocating and freeing ENIs for all worker nodes in the cluster. Here are a few benefits of having a cluster ENI manager:

Administrators can remove EC2 ENI write permission from worker nodes.
It may be easier to troubleshoot.
It may be easier to allow ENIs of different security groups and subnets get attached to a single worker node.

L-IPAMDのように各ノードが自分のENIとセカンダリIPを管理するのではなく、クラスター全体のノードのENIを管理するようになるとのこと。

containers-roadmapにもissueが上がっています。

[EKS]: Next Generation AWS VPC CNI Plugin #398
https://github.com/aws/containers-roadmap/issues/398

もしかすると、明日にもこの記事が古くなっている可能性も大いにあります。笑
が、現在の仕組みを知ることは無駄にはならないと思いますので、自分の記録としてまとめます。

Github Repository Overview

EKSを利用している方がどの程度このリポジトリを見たことがあるのかわかりませんが、今回触れるものを中心に、主要なファイルをピックアップするとこんな感じでしょうか。

amazon-vpc-cni-k8s
https://github.com/aws/amazon-vpc-cni-k8s

amazon-vpc-cni-k8s
    ├─ client/health-check 
    │    └─ grpc_health_probe.go
    ├─ cni-metrics-helper
    │    ├─ cni-metrics-helper.go
    │    ├─ README.md
    │    └─ metrics
    │         ├─ metrics.go
    │         └─ cni_metrics.go
    ├─ config
    │    └─ vX.X
    │         └─ aws-k8s-cni.yaml
    ├─ docs
    │    ├─ cni-proposal.md
    │    └─ mtroubleshooting.md
    ├─ ipamd
    │    ├─ ipamd.go
    │    ├─ rpc_handler.go
    │    └─ datastore
    │         └─ data_store.go
    ├─ misc
    │    └─ 10-aws.conflist
    ├─ pkg
    │    ├─ awsutils
    │    ├─ eniconfig
    │    │    └─ eniconfig.go
    │    ├─ k8sapi
    │    │    └─ discovery.go
    │    └─ networkutils
    ├─ plugins/routed-eni
    │    ├─ cni.go
    │    └─ driver
    │         └─ driver.go
    ├─ scripts
    │    ├─ dockerfiles
    │    │    └─ Dockerfile.release
    │    ├─ aws-cni-support.sh
    │    └─ entrypoint.sh               
    ├─ CHANGELOG.md        
    ├─ CODE_OF_CONDUCT.md  
    ├─ CONTRIBUTING.md     
    ├─ LICENSE             
    ├─ Makefile            
    ├─ README.md           
    ├─ go.mod              
    ├─ go.sum              
    └── main.go 

早速直下にipamdplugins/routed-eniといった、前回確認したCNIプラグインの主要コンポーネントたちがお出迎えしてくれてます。

ビルド〜起動するまで

まずはMakefileからDockerfileの場所を探して、ビルド〜起動するまでを見てみます。
Dockerfileはこちら -> amazon-vpc-cni-k8s/scripts/dockerfiles/Dockerfile.release
マルチステージビルドでGoをbuildし、AmazonLinux2ベースイメージのコンテナに配置しています。

go buildしているのは、aws-k8s-agent, aws-cni, grpc_health_probeという3つのモジュールです。
前回確認したL-IPAMDがaws-k8s-agent、CNIプラグインがaws-cniです。またgrpc_health_probeは、gRPCクライアントのヘルスチェックモジュールで、L-IPAMDの起動確認に利用します。

続いて、このコンテナが起動するまでの流れとして、DockerfileのENTRYPOINTを見ていきます。
ENTRYPOINTはこちら -> amazon-vpc-cni-k8s/scripts/entrypoint.sh

entrypoint.shの流れは以下の通りです。
1. L-IPAMDを起動します。
2. gRPCヘルスチェッカーでL-IPAMDが問題なく起動できたことを確認する
3. CNIプラグインのバイナリとConfigファイルをkubeletの所定のディレクトリに配置する(※)
4. コンテナの起動処理完了(以降L-IPAMDがずっと起動)

(※) KubernetesではkubeletがこのCNIプラグインのバイナリとconfigファイル配置するディレクトリを監視しており、ファイルが配置されると NodeStatus が Ready となります。

entrypoint.sh
# Checks for IPAM connectivity on localhost port 50051, retrying connectivity
# check with a timeout of 36 seconds
wait_for_ipam() {
    local __sleep_time=0

    until [ $__sleep_time -eq 8 ]; do
        sleep $(( __sleep_time++ ))
        if $(./grpc_health_probe -addr 127.0.0.1:50051 >/dev/null 2>&1); then # <------------- gRPCでL-IPAMDのヘルスチェック
            return 0
        fi
    done
    return 1
}

echo -n "starting IPAM daemon in background ... "
./aws-k8s-agent > $AGENT_LOG_PATH 2>&1 & # <------------- L-IPAMD起動
echo "ok."

echo -n "checking for IPAM connectivity ... "

if ! wait_for_ipam; then # <------------- L-IPAMDが起動するまで待機
    echo " failed."
    echo "timed out waiting for IPAM daemon to start."
    exit 1
fi

echo "ok."

echo -n "copying CNI plugin binaries and config files ... "

# 〜〜〜省略
cp aws-cni $HOST_CNI_BIN_PATH$
cp 10-aws.conflist $HOST_CNI_CONFDIR_PATH
# これでNodeStatusがReadyになる。

まずL-IPAMDであるaws-k8s-agentをバックグラウンドで起動し、
grpc_health_probeで、L-IPAMDが起動するのを監視しています。

grpc_health_probeが問題なく完了すると、CNIプラグインのバイナリであるaws-cniと、CNIプラグインの Configファイルである10-aws.conflistをコピーして配置しています。

そこまで終わると最後にaws-k8s-agentをフォアグラウンドに戻して起動完了です。

entrypoint.sh#L86
# bring the aws-k8s-agent process back into the foreground
echo "foregrounding IPAM daemon ... "
fg %1 >/dev/null 2>&1 || $(echo "failed (process terminated)" && cat "$AGENT_LOG_PATH" && exit 1)

次にAmazon VPC CNIプラグインの主要コンポーネントである、Goのモジュールたちを見ていきます。
今回はL-IPAMDである、aws-k8s-agentから見ていきましょう。

Go1: aws-k8s-agent (L-IPAMD)

IPアドレスの管理を行うAmazon VPC CNIプラグインのコアモジュールです。内部にデータストアをもち、Localと頭につく通り、各ノードで自分のノード内のIPアドレスの管理やENIの管理を行っています。

なお、処理の流れを中心に見ていくため、引用部分は一部省略しています。
またexternalSnat等のオプション設定に関する説明等も省きますので、ご了承ください。

メインルーチンはリポジトリルート直下のmain.goです。流れとしては以下の通りです。
1. discoverControllerを起動する
2. eniConfigControllerを起動する
3. ipamdのNodeIPPoolManagerを起動する
4. ipamdのRPCHandlerを起動する

main.go
func _main() int {
    defer log.Flush()
    logger.SetupLogger(logger.GetLogFileLocation(defaultLogFilePath))

    log.Infof("Starting L-IPAMD %s  ...", version)

    kubeClient, err := k8sapi.CreateKubeClient()
    if err != nil {
        log.Errorf("Failed to create client: %v", err)
        return 1
    }

    discoverController := k8sapi.NewController(kubeClient)
    go discoverController.DiscoverK8SPods() // <--------------- discoverControllerの起動

    eniConfigController := eniconfig.NewENIConfigController()
    if ipamd.UseCustomNetworkCfg() {
        go eniConfigController.Start() // <--------------- eniConfigControllerの起動
    }

    ipamContext, err := ipamd.New(discoverController, eniConfigController)

    if err != nil {
        log.Errorf("Initialization failure: %v", err)
        return 1
    }

    // Pool manager
    go ipamContext.StartNodeIPPoolManager() // <--------------- ipamdのNodeIPPoolManagerの起動

    // Prometheus metrics
    go ipamContext.ServeMetrics()

    // CNI introspection endpoints
    go ipamContext.ServeIntrospection()

    // Start the RPC listener
    err = ipamContext.RunRPCHandler() // <--------------- ipamdのRPCHandlerの起動
    if err != nil {
        log.Errorf("Failed to set up gRPC handler: %v", err)
        return 1
    }
    return 0
}

起動している順に内容を確認していきます。

1. discoverController

ソースはこちら -> amazon-vpc-cni-k8s/pkg/k8sapi/discovery.go

KubernetesからPodの情報を取得し、自分のノードで稼働しているPodの情報を持つコントローラーです。
先ほどのmain.goでの呼び出し部分は以下の通りです。

main.go
kubeClient, err := k8sapi.CreateKubeClient()
discoverController := k8sapi.NewController(kubeClient)
go discoverController.DiscoverK8SPods()

k8sapi.CreateKubeClient()

CreateKubeClient()から見ていきます。
Operator-sdkのkubeclientを取得しています。

discovery.go
// Package k8sapi contains logic to retrieve pods running on local node
package k8sapi

import (
    "github.com/operator-framework/operator-sdk/pkg/k8sclient"
)

// CreateKubeClient creates a k8s client
func CreateKubeClient() (clientset.Interface, error) {
    kubeClient := k8sclient.GetKubeClient()
    // 省略
    return kubeClient, nil
}

ここで調べていて分かったのですが、github.com/operator-framework/operator-sdk/pkg/k8sclientはすでに存在していないライブラリでした。。

go.modを見ると、operator-sdk v0.0.7というかなり古いモジュールを使っておりました。。(最初0.7と見間違えました)
https://github.com/operator-framework/operator-sdk/releases/tag/v0.0.7

v0.0.7は2018年10月にリリースされています。(現在の最新は12月10日にリリースされたv0.13.0)

operator-sdk/pkg/k8sclient が現在どこに行ったのかわかりませんが、kubernetesのクライアントということはわかるので、ひとまず続けます。

discoverController.DiscoverK8SPods()

続いてdiscoverController作成後に実行している、discoverController.DiscoverK8SPods()を見てみます。

ここでは、Podの変更通知を受け取るコントローラーを実装しています。
その前に、まずはKubernertesのコントローラーにおける事前知識としてcache, Informerを整理しましょう。

cacheはKubernetesのコントローラーにおける考え方の一つです。コントローラー側はリソースの変更検知のためにAPI Serverを直接監視するのではなく、非同期で同期されるキャッシュを監視して、API Serverへの負荷を一定にする仕組みです。

package cache
https://godoc.org/k8s.io/client-go/tools/cache

Package cache is a client-side caching mechanism. It is useful for reducing the number of server calls you'd otherwise need to make. Reflector watches a server and updates a Store.

InformerもKubernetesコントローラーにおける、リソースの変更を通知する仕組みです。
今回はIndexserInformaerという参照用のInformerを使って、変更のあったPodのkeyをworkqueueというキューに入れていきます。

discover.go
// DiscoverK8SPods discovers Pods running in the cluster
func (d *Controller) DiscoverK8SPods() {
    // create the pod watcher
    podListWatcher := cache.NewListWatchFromClient(d.kubeClient.CoreV1().RESTClient(), "pods", metav1.NamespaceAll, fields.OneTermEqualSelector("spec.nodeName", d.myNodeName)) // <-------- listWatcherの作成

    // create the workqueue
    queue := workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()) // <-------- workqueueの作成

    // Bind the workqueue to a cache with the help of an informer. This way we make sure that
    // whenever the cache is updated, the pod key is added to the workqueue.
    // Note that when we finally process the item from the workqueue, we might see a newer version
    // of the Pod than the version which was responsible for triggering the update.
    indexer, informer := cache.NewIndexerInformer(podListWatcher, &v1.Pod{}, 0, cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
        UpdateFunc: func(old interface{}, new interface{}) {
            key, err := cache.MetaNamespaceKeyFunc(new)
            if err == nil {
                queue.Add(key)
            }
        },
        DeleteFunc: func(obj interface{}) {
            // IndexerInformer uses a delta queue, therefore for deletes we have to use this
            // key function.
            key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
            if err == nil {
                queue.Add(key)
            }
        },
    }, cache.Indexers{}) // <-------- IndexerInformerの作成。Podの追加・変更・削除時のkeyをworkqueueに追加するよう指定

    d.controller = newController(queue, indexer, informer) // <-------- コントローラーの起動

    // Now let's start the controller
    stop := make(chan struct{})
    defer close(stop)
    go d.run(1, stop)  // <-------- コントローラーの実行

    // Wait forever
    select {}
}

cache.NewListWatchFromClient()でPodのlistWatcherを作成し、このlistWatcherをcache.NewIndexerInformer()に渡しています。そしてIndexerInformerは変更のあったPodのkeyをworkqueueに入れています。

discoverController.handlePodUpdate()

最後にコントローラーの実行部分ですが、run()から辿っていくと、handlePodUpdate()にてworkqueueからPodのkeyを取り出し、自分のノードのPodであれば、メモリに保存するという処理になっています。これはNodeIPPoolManagerが自分のノードで稼働中のPodを参照する際に利用します。

discover.go
func (d *Controller) handlePodUpdate(key string) error {
    obj, exists, err := d.controller.indexer.GetByKey(key)

    if !exists {
        log.Infof("Pods deleted on my node: %v", key)
        if strings.HasPrefix(key, metav1.NamespaceSystem+"/"+cniPodName) {
            d.cniPodsLock.Lock()
            defer d.cniPodsLock.Unlock()
            delete(d.cniPods, key)
        } else {
            d.workerPodsLock.Lock()
            defer d.workerPodsLock.Unlock()
            delete(d.workerPods, key)
        }
        return nil
    }

    pod, ok := obj.(*v1.Pod)
    podName := pod.GetName()

    // Check to see if this is a pod on this node
    if d.myNodeName == pod.Spec.NodeName && !pod.Spec.HostNetwork {
        d.workerPodsLock.Lock()
        defer d.workerPodsLock.Unlock()

        log.Tracef("Update for pod %s: %+v, %+v", podName, pod.Status, pod.Spec)

        // Save pod info
        d.workerPods[key] = &K8SPodInfo{
            Name:      podName,
            Namespace: pod.GetNamespace(),
            IP:        pod.Status.PodIP,
            UID:       string(pod.GetUID()),
        }

        log.Infof("Add/Update for Pod %s on my node, namespace = %s, IP = %s", podName, d.workerPods[key].Namespace, d.workerPods[key].IP)
    }
    return nil
}

2. eniConfigController

こちらは前回少しだけ触れたEKSのデフォルトで作成されるeniconfigsというCRDのコントローラーです。

今回は省きますので詳しい説明は公式ドキュメントをどうぞ。

CNI カスタムネットワーク
https://docs.aws.amazon.com/ja_jp/eks/latest/userguide/cni-custom-network.html

discovercControllerはclient-goの低レイヤーライブラリでコントローラーを実装していましたが、こちらはOperator-sdkのお作法でコントローラーやリコンサイルを行っています。Operator-sdkのバージョンが古いので残念ですが、興味がありましたらdiscoverControllerと比較して見てください。

3. NodeIPPoolManager

こちらがL-IPAMDの本体です。
ソースはこちら -> amazon-vpc-cni-k8s/ipamd/ipamd.go

セカンダリIPやENIの管理を行い、内部にはdata_store.goというIPプールをもちます。
main.goの呼び出し部分を見る以下となっていますので、順番に見ていきます。

main.go
ipamContext, err := ipamd.New(discoverController, eniConfigController)
go ipamContext.StartNodeIPPoolManager()

ipamd.New & nodeInit

ここでIPプールであるデータストア等のオブジェクトの生成、およびノードの初回セットアップを行います。
ノードの初回セットアップはnodeInit()というファンクションで実行しています。
ただこのファンクションが長すぎて、「分割するように」とTODOコメントがついてます。メインのロジックのみ抜粋します。

流れとしては、以下の通りです。
1. ホストネットワークの初期化
2. IPプールデータストアの生成
3. アタッチされたENI分(通常、EC2起動時はeth0のみ)、ENIのセットアップを行う
4. discoverControllerから、自分のノードで起動中のPodを取得(基本的にはaws-nodeのみのはず)
5. ENIに割当可能な限りセカンダリIPをアタッチしてIPプールに保存する

ipamd.go#L266~
//TODO need to break this function down(comments from CR)
func (c *IPAMContext) nodeInit() error {
    enis, err := c.awsClient.GetAttachedENIs()
    _, vpcCIDR, err := net.ParseCIDR(c.awsClient.GetVPCIPv4CIDR())
    primaryIP := net.ParseIP(c.awsClient.GetLocalIPv4())

    // ホストネットワークのセットアップ
    err = c.networkClient.SetupHostNetwork(vpcCIDR, c.awsClient.GetVPCIPv4CIDRs(), c.awsClient.GetPrimaryENImac(), &primaryIP)

    // IPプールデータストアの作成
    c.dataStore = datastore.NewDataStore()

    // アタッチされたENI分ループ
    for _, eni := range enis {
        log.Debugf("Discovered ENI %s, trying to set it up", eni.ENIID)
        // Retry ENI sync
        retry := 0
        for {
            retry++
            // ENIのセットアップ
            err = c.setupENI(eni.ENIID, eni)
            if retry > maxRetryCheckENI {
                log.Errorf("Unable to discover attached IPs for ENI from metadata service")
                ipamdErrInc("waitENIAttachedMaxRetryExceeded")
                break
            }

            log.Infof("ENI %s set up.", eni.ENIID)
            break
        }
    }

    // discoverControllerから、自分のノードで起動中のPodを取得
    localPods, err := c.getLocalPodsWithRetry()
    rules, err := c.networkClient.GetRuleList()
    // 各PodにIPアドレスをアサインし、データストアに保存
    for _, ip := range localPods {
        _, _, err = c.dataStore.AssignPodIPv4Address(ip)
    }
    // For a new node, attach IPs (セカンダリIPを可能限りアタッチしてプールする)
    increasedPool, err := c.tryAssignIPs()
    return err
}

ここまでがNewで実行される内容で、コンテナ起動時の初回やaws-nodeが再起動した際に実行されます。

ipamContext.StartNodeIPPoolManager

NodeIPPoolManagerbはL-IPAMDのメインとなる無限ループプロセスです。StartNodeIPPoolManager()を実行すると以下の2つのファンクションをループで実行します。

  • updateIPPoolIfRequired()
  • nodeIPPoolReconcile()

updateIPPoolIfRequired()はIPプール内のIPがWARM_IP_TARGETより少ないか判定し、少ない場合は新規セカンダリIPおよびENIのアタッチを行います。反対にWARM_IP_TARGETより多く、開放可能なIPがある場合はENIからセカンダリIPを開放します。

CNI 設定変数
https://docs.aws.amazon.com/ja_jp/eks/latest/userguide/cni-env-vars.html

WARM_IP_TARGET
タイプ: 整数
デフォルト: なし
ipamD​ デーモンがノード上のポッド割り当てに使用できるようにしておく必要があるフリーの IP アドレスの数を指定します。たとえば、WARM_IP_TARGET​ が 10 に設定されると、ipamD​ は常に 10 個のフリーの IP アドレスを維持しようとします。ノードの Elastic Network Interface がフリーのアドレスを提供できない場合、ipamD はWARM_IP_TARGET フリー IP アドレスが使用できるようになるまでより多くのインターフェイスの割り当てを試みます。

WARM_IP_TARGETはデフォルトでは設定されていないため、WARM_ENI_TARGET(デフォルト: 1)が有効になります。
WARM_ENI_TARGETは、設定した分のENIをノードに余分にアタッチする方式で、常にENIに設定可能な全セカンダリIPをプールするので、WARM_IP_TARGETを設定するよりはIPアドレスを多く確保してしまう可能性があります。

datastore

ソースはこちら -> amazon-vpc-cni-k8s/ipamd/datastore/data_store.go

L -IPAMのIPプールのデータストア部分は、全てスクラッチで実装されています。
mapでデータを保持し、sync.RWMutexで排他制御するだけの非常にシンプルな実装です。

data_store.go
// DataStore contains node level ENI/IP
type DataStore struct {
    total      int
    assigned   int
    eniIPPools map[string]*ENIIPPool
    podsIP     map[PodKey]PodIPInfo
    lock       sync.RWMutex
}

データストア部分はたったこれだけです!

sync.RWMutexはGoにおける相互排他制御の仕組みです。
詳細は割愛しますので、関連リンクを掲載します。
https://godoc.org/sync#RWMutex
https://qiita.com/y_matsuwitter/items/36565a3a53ac52732cae

例えば、Pod作成時にIPアドレスを割り当てる時は、sync.RWMutexでロックしてmapに追加するだけです。

data_store.go
// AssignPodIPv4Address assigns an IPv4 address to pod
// It returns the assigned IPv4 address, device number, error
func (ds *DataStore) AssignPodIPv4Address(k8sPod *k8sapi.K8SPodInfo) (ip string, deviceNumber int, err error) {
    ds.lock.Lock() // <-------------- ロック
    defer ds.lock.Unlock() // <-------------- 最後にロック解除

    log.Debugf("AssignIPv4Address: IP address pool stats: total: %d, assigned %d", ds.total, ds.assigned)
    podKey := PodKey{
        name:      k8sPod.Name,
        namespace: k8sPod.Namespace,
        sandbox:   k8sPod.Sandbox,
    }
    // Podの情報からkeyを生成し、IPアドレスをアサイン
    return ds.assignPodIPv4AddressUnsafe(podKey, k8sPod)
}

// It returns the assigned IPv4 address, device number, error
func (ds *DataStore) assignPodIPv4AddressUnsafe(podKey PodKey, k8sPod *k8sapi.K8SPodInfo) (ip string, deviceNumber int, err error) {
    // 全eniをループ
    for _, eni := range ds.eniIPPools {
        if (k8sPod.IP == "") && (len(eni.IPv4Addresses) == eni.AssignedIPv4Addresses) {
            // Skip this ENI, since it has no available IP addresses
            log.Debugf("AssignPodIPv4Address: Skip ENI %s that does not have available addresses", eni.ID)
            continue
        }
        // eniの全セカンダリIPをループ
        for _, addr := range eni.IPv4Addresses {
            // 空いているセカンダリIPの場合
            if !addr.Assigned && k8sPod.IP == "" && !addr.inCoolingPeriod() {
                // This is triggered by a pod's Add Network command from CNI plugin
                incrementAssignedCount(ds, eni, addr)
                log.Infof("AssignPodIPv4Address: Assign IP %v to pod (name %s, namespace %s sandbox %s)",
                    addr.Address, k8sPod.Name, k8sPod.Namespace, k8sPod.Sandbox)

                // ds.podsIPにPodのIP情報を付加してmapに保存
                ds.podsIP[podKey] = PodIPInfo{IP: addr.Address, DeviceNumber: eni.DeviceNumber}
                return addr.Address, eni.DeviceNumber, nil
            }
        }
    }
}

排他制御を行う必要がありますが、deferによりロックと解除の実装が非常にシンプルです。

4. RPCHandler

最後にgRPCサーバの起動ですが、CNIプラグインからの「新規Pod用の未割当IPの取得」および「削除されたPodIPの解放」という2つのリクエストに対応する、AddNetworkDelNetworkという2つのサービスだけをもつシンプルなgRPCサーバです。

ソースはこちら -> amazon-vpc-cni-k8s/ipamd/rpc_handler.go

例えば、AddNetworkの場合、リクエストパラメータからPodの情報を受け取り、データストアのAssignPodIPv4Addressを呼ぶだけです。

rpc_handler.go
// AddNetwork processes CNI add network request and return an IP address for container
func (s *server) AddNetwork(ctx context.Context, in *pb.AddNetworkRequest) (*pb.AddNetworkReply, error) {
    log.Infof("Received AddNetwork for NS %s, Pod %s, NameSpace %s, Sandbox %s, ifname %s",
        in.Netns, in.K8S_POD_NAME, in.K8S_POD_NAMESPACE, in.K8S_POD_INFRA_CONTAINER_ID, in.IfName)

    // 新規Pod用のIPアドレス割当を行い、セカンダリIpとそのENI情報を取得
    addr, deviceNumber, err := s.ipamContext.dataStore.AssignPodIPv4Address(&k8sapi.K8SPodInfo{
        Name:      in.K8S_POD_NAME,
        Namespace: in.K8S_POD_NAMESPACE,
        Sandbox:   in.K8S_POD_INFRA_CONTAINER_ID})

    var pbVPCcidrs []string
    for _, cidr := range s.ipamContext.awsClient.GetVPCIPv4CIDRs() {
        log.Debugf("VPC CIDR %s", *cidr)
        pbVPCcidrs = append(pbVPCcidrs, *cidr)
    }

    useExternalSNAT := s.ipamContext.networkClient.UseExternalSNAT()
    if !useExternalSNAT {
        for _, cidr := range s.ipamContext.networkClient.GetExcludeSNATCIDRs() {
            log.Debugf("CIDR SNAT Exclusion %s", cidr)
            pbVPCcidrs = append(pbVPCcidrs, cidr)
        }
    }

    // 割り当てたIPアドレス等の情報を返却
    resp := pb.AddNetworkReply{
        Success:         err == nil,
        IPv4Addr:        addr,
        IPv4Subnet:      "",
        DeviceNumber:    int32(deviceNumber),
        UseExternalSNAT: useExternalSNAT,
        VPCcidrs:        pbVPCcidrs,
    }

    log.Infof("Send AddNetworkReply: IPv4Addr %s, DeviceNumber: %d, err: %v", addr, deviceNumber, err)
    addIPCnt.Inc()
    return &resp, nil
}

終わりに

今回はL-IPAMDを中心とした実装を確認しました。
L-IPAMDだけでもGoを使った以下のような機能や実装方法を確認することができました。

  • client-go
  • Operator-sdk
  • sync.RWMutex
  • gRPC Server

次でやっと最後となります。CNIプラグインを中心にGoによるネットワーク設定周りを実装を確認していきます。

11
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
3