Help us understand the problem. What is going on with this article?

ingress-nginxはどうやって動いているのか(2)

前回の記事 ingress-nginxはどうやって動いているのか(1) で、ingress-nginxのアーキテクチャの超概要を図示した。この記事では、ingress-nginxがどのようにリクエストを処理していくかについて、さらに詳細に見ていく。ingress-nginxのversionは0.26.1

見ていく主な処理は以下。

まずはコードの理解をしやすくするために、Golangのコード中に登場するオブジェクトの関連を整理してみる。

登場するオブジェクトの整理

オブジェクト一覧

Name Description 主な項目
Configuration nginx設定に利用する値 Server[]Backend[]
Server nginx.confのServerに相当 HostnameLocation[]
Location nginx.confのLocationに相当 PathServicePortBackend(string)
Backend 一意に特定された接続先Service+Port ServicePortEndpoint[]
Endpoint Podの接続先 IPPort
Service(Spec) k8sのServiceに相当 ClusterIPPorts
Ingress k8sのIngressに相当 ParsedAnnotations

オブジェクトの関連図

ingress-nginx_uml.png

処理①:設定情報のアップデート

処理の流れ

  1. k8sオブジェクトの情報からConfigurationオブジェクトの作成
  2. Configuration.Serversの情報を元にnginx.confの作成
  3. Configuration.Backendsの情報をLuaのconfigurationモジュールに連携

コードを追ってみる

controller.go

  • syncIngress()はk8sのリソース情報に更新がある毎に呼び出される
  • Ingress[]を元にConfigurationを取得
  • OnUpdate()を呼び出す
  • configureDynamically()を呼び出す
internal/ingress/controller/controller.go
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
    ings := n.store.ListIngresses(nil)
    hosts, servers, pcfg := n.getConfiguration(ings)
    n.OnUpdate(*pcfg)
    err := configureDynamically(pcfg, n.cfg.DynamicCertificatesEnabled)
}

nginx.go

OnUpdate()

  • Configurationとk8sリソース情報を元にnginx.conf作成
  • nginxをリロード
internal/ingress/controller/nginx.go
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
    cfg := n.store.GetBackendConfiguration()
    content, err := n.generateTemplate(cfg, ingressCfg)
    ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser)
    n.command.ExecCommand("-s", "reload").CombinedOutput()
}

configureDynamically()

  • 渡されたConfigurationの情報をもとにBackends[]を作成
  • Backends[]をBodyにセットしてlocalhostにリクエストを送る
internal/ingress/controller/nginx.go
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func configureDynamically(pcfg *ingress.Configuration, isDynamicCertificatesEnabled bool) error {
    configureBackends(pcfg.Backends)
}

func configureBackends(rawBackends []*ingress.Backend) error {
    backends := make([]*ingress.Backend, len(rawBackends))
    for i, backend := range rawBackends {
        luaBackend := &ingress.Backend{
            :
        }
        backends[i] = luaBackend
    }
    statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
}

nginx.tmpl (nginx.conf)

  • LuaのconfigurationモジュールをLoad
  • localhost:10246/configurationにリクエストが来ると、configurationモジュールのcall()に処理を渡す
rootfs/etc/nginx/template/nginx.tmpl
# LuaモジュールのLoad
init_by_lua_block {
    ok, res = pcall(require, "configuration")
    configuration = res
}

server {
    listen 127.0.0.1:10246;

    location /configuration {
        content_by_lua_block {
            configuration.call()
        }
    }
}

configuration.lua

call()では、backend設定をリクエストのBodyで受け取り共有メモリ上に保存する

rootfs/etc/nginx/lua/configuration.lua
-- this is the Lua representation of Configuration struct in internal/ingress/types.go
local configuration_data = ngx.shared.configuration_data

function _M.call()
  local backends = fetch_request_body()
  local success, err = configuration_data:set("backends", backends)
end

処理②:リクエストのルーティング

処理の流れ

  1. nginx.confの設定に基づきリクエストを処理
  2. リクエストをLuaのbalancerモジュールに連携
  3. Luaのbalancerモジュールでルーティング先を決定

コードを追ってみる

nginx.tmpl (nginx.conf)

  • 初期処理
    • Luaモジュール(balancer)のLoad
    • init_worker()の呼び出し
  • リクエストのルーティング
    • Server/Locationで設定されている、Hostname+Pathを確認
    • 対応するBackendの名前をnginxの変数にセット
    • balancer()の呼び出し
rootfs/etc/nginx/template/nginx.tmpl
init_by_lua_block {
    ok, res = pcall(require, "balancer")
    balancer = res
}

init_worker_by_lua_block {
    balancer.init_worker()
}

# すべてのリクエストをこのupstreamで受けて、Luaモジュール(balanacer)でバランシングを行う
upstream upstream_balancer {
    balancer_by_lua_block {
        balancer.balance()
    }
}

# 各Hostname/Pathごとに、対応するBackendの名前をセットして、upstream_balancerにリクエストを流す
server {
    location /foo {
        set $proxy_upstream_name "ingress-nginx-foo-service-5678";
        proxy_pass http://upstream_balancer;
    }
    location /bar {
        set $proxy_upstream_name "ingress-nginx-bar-service-5678";
        proxy_pass http://upstream_balancer;
    }
}

balancer.lua

rootfs/etc/nginx/lua/balancer.lua
local _M = {}
local balancers = {}

初期化

  • Luaモジュール(configuration)からbackendsのデータを受け取る
  • バランシングロジックに応じて初期化したbackendをbalancers[]にセット
rootfs/etc/nginx/lua/balancer.lua
function _M.init_worker()
  sync_backends()
end

local function sync_backends()
  local backends_data = configuration.get_backends_data()

  for _, new_backend in ipairs(new_backends) do
    sync_backend(new_backend)
  end
end

local function sync_backend(backend)
  local implementation = backend["load-balance"] or DEFAULT_LB_ALG
  local balancer = balancers[backend.name]

  if not balancer then
    balancers[backend.name] = implementation:new(backend)
    return
  end
end

バランシング

  • nginx.confに設定されているproxy_upstream_nameの値から、利用するbackendを決定
  • バランシングロジックに応じて、最終的な接続先を決定
rootfs/etc/nginx/lua/balancer.lua
function _M.balance()
  local balancer = get_balancer()
  # 接続先を決定する
  local peer = balancer:balance()
  local ok, err = ngx_balancer.set_current_peer(peer)
end

local function get_balancer()
  local backend_name = ngx.var.proxy_upstream_name
  local balancer = balancers[backend_name]
  return balancer
end

return _M

round_robin.lua

  • backend.endpoints(PodのIP/Portのリスト)をinstanceとして登録
  • .balance()で、round_robinロジックで接続先を決定
rootfs/etc/nginx/lua/balancer/round_robin.lua
local balancer_resty = require("balancer.resty")
local resty_roundrobin = require("resty.roundrobin")
local util = require("util")

local _M = balancer_resty:new({ factory = resty_roundrobin, name = "round_robin" })

function _M.new(self, backend)
  local nodes = util.get_nodes(backend.endpoints)
  local o = {
    instance = self.factory:new(nodes),
  }
  return o
end

function _M.balance(self)
  return self.instance:find()
end

処理③:Canary

処理フロー

  1. Configuration生成時に、
    1. Hostname + Pathを元に、Backend.AlternativeBackendsにCanaryリリース用のBackendの名前をセットする
    2. Backend.TrafficShapingPolicyにCanaryリリースの条件をセットする
  2. リクエストのルーティング時に、traffic_shaping_policyに応じてリクエスト先のBackendを判定する

コードを追ってみる

controller.go

  • Hostname + Pathを元に、Backend.AlternativeBackendsにCanaryリリース用のBackendの名前をセットする
internal/ingress/controller/controller.go
// getBackendServers returns a list of Upstream and Server to be used by the
// backend.  An upstream can be used in multiple servers if the namespace,
// service name and port are the same.
func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*ingress.Backend, []*ingress.Server) {

    upstreams := n.createUpstreams(ingresses, du)

    var canaryIngresses []*ingress.Ingress
    for _, ing := range ingresses {

        // set aside canary ingresses to merge later
        if anns.Canary.Enabled {
            canaryIngresses = append(canaryIngresses, ing)
        }
    }

    if nonCanaryIngressExists(ingresses, canaryIngresses) {
        for _, canaryIng := range canaryIngresses {
            mergeAlternativeBackends(canaryIng, upstreams, servers)
        }
    }
}

func mergeAlternativeBackends(ing *ingress.Ingress, upstreams map[string]*ingress.Backend,
    servers map[string]*ingress.Server) {

    for _, rule := range ing.Spec.Rules {
        for _, path := range rule.HTTP.Paths {
            upsName := upstreamName(ing.Namespace, path.Backend.ServiceName, path.Backend.ServicePort)

            altUps := upstreams[upsName]

            server, ok := servers[rule.Host]

            // find matching paths
            for _, loc := range server.Locations {
                priUps := upstreams[loc.Backend]

                if canMergeBackend(priUps, altUps) && loc.Path == path.Path {
                    klog.V(2).Infof("matching backend %v found for alternative backend %v",
                        priUps.Name, altUps.Name)

                    merged = mergeAlternativeBackend(priUps, altUps)
                }
            }

}

func canMergeBackend(primary *ingress.Backend, alternative *ingress.Backend) bool {
    return alternative != nil && primary.Name != alternative.Name && primary.Name != defUpstreamName && !primary.NoServer
}

// Performs the merge action and checks to ensure that one two alternative backends do not merge into each other
func mergeAlternativeBackend(priUps *ingress.Backend, altUps *ingress.Backend) bool {
    if priUps.NoServer {
        klog.Warningf("unable to merge alternative backend %v into primary backend %v because %v is a primary backend",
            altUps.Name, priUps.Name, priUps.Name)
        return false
    }

    for _, ab := range priUps.AlternativeBackends {
        if ab == altUps.Name {
            klog.V(2).Infof("skip merge alternative backend %v into %v, it's already present", altUps.Name, priUps.Name)
            return true
        }
    }

    priUps.AlternativeBackends =
        append(priUps.AlternativeBackends, altUps.Name)

    return true
}
  • Backend.TrafficShapingPolicyにCanaryリリースの条件をセットする
internal/ingress/controller/controller.go
func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.Backend) map[string]*ingress.Backend {
                // configure traffic shaping for canary
                if anns.Canary.Enabled {
                    upstreams[name].NoServer = true
                    upstreams[name].TrafficShapingPolicy = ingress.TrafficShapingPolicy{
                        Weight:      anns.Canary.Weight,
                        Header:      anns.Canary.Header,
                        HeaderValue: anns.Canary.HeaderValue,
                        Cookie:      anns.Canary.Cookie,
                    }
                }
}

balancer.lua

Canary用のBackendが設定されている場合は、traffic_shaping_policyに応じてリクエスト先のBackendを判定する。

rootfs/etc/nginx/lua/balancer.lua
local function get_balancer()
  local backend_name = ngx.var.proxy_upstream_name
  local balancer = balancers[backend_name]

  if route_to_alternative_balancer(balancer) then
    local alternative_balancer = balancers[balancer.alternative_backends[1]]
    return alternative_balancer
  end

  return balancer
end

local function route_to_alternative_balancer(balancer)
  -- TODO: support traffic shaping for n > 1 alternative backends
  local backend_name = balancer.alternative_backends[1]

  local target_cookie = traffic_shaping_policy.cookie
  local cookie = ngx.var["cookie_" .. target_cookie]
  if cookie then
    if cookie == "always" then
      return true
    elseif cookie == "never" then
      return false
    end
  end
end

return _M
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした