前回の記事 ingress-nginxはどうやって動いているのか(1) で、ingress-nginxのアーキテクチャの超概要を図示した。この記事では、ingress-nginxがどのようにリクエストを処理していくかについて、さらに詳細に見ていく。ingress-nginxのversionは0.26.1
。
見ていく主な処理は以下。
まずはコードの理解をしやすくするために、Golangのコード中に登場するオブジェクトの関連を整理してみる。
登場するオブジェクトの整理
オブジェクト一覧
Name | Description | 主な項目 |
---|---|---|
Configuration | nginx設定に利用する値 |
Server[] 、Backend[]
|
Server | nginx.confのServerに相当 |
Hostname 、Location[]
|
Location | nginx.confのLocationに相当 |
Path 、Service 、Port 、Backend(string)
|
Backend | 一意に特定された接続先Service+Port |
Service 、Port 、Endpoint[]
|
Endpoint | Podの接続先 |
IP 、Port
|
Service(Spec) | k8sのServiceに相当 |
ClusterIP 、Ports
|
Ingress | k8sのIngressに相当 | ParsedAnnotations |
オブジェクトの関連図
処理①:設定情報のアップデート
処理の流れ
- k8sオブジェクトの情報から
Configuration
オブジェクトの作成 -
Configuration.Servers
の情報を元にnginx.confの作成 -
Configuration.Backends
の情報をLuaのconfigurationモジュールに連携
コードを追ってみる
controller.go
-
syncIngress()
はk8sのリソース情報に更新がある毎に呼び出される -
Ingress[]
を元にConfiguration
を取得 -
OnUpdate()
を呼び出す -
configureDynamically()
を呼び出す
internal/ingress/controller/controller.go
// syncIngress collects all the pieces required to assemble the NGINX
// configuration file and passes the resulting data structures to the backend
// (OnUpdate) when a reload is deemed necessary.
func (n *NGINXController) syncIngress(interface{}) error {
ings := n.store.ListIngresses(nil)
hosts, servers, pcfg := n.getConfiguration(ings)
n.OnUpdate(*pcfg)
err := configureDynamically(pcfg, n.cfg.DynamicCertificatesEnabled)
}
nginx.go
OnUpdate()
-
Configuration
とk8sリソース情報を元にnginx.conf作成 - nginxをリロード
internal/ingress/controller/nginx.go
func (n *NGINXController) OnUpdate(ingressCfg ingress.Configuration) error {
cfg := n.store.GetBackendConfiguration()
content, err := n.generateTemplate(cfg, ingressCfg)
ioutil.WriteFile(cfgPath, content, file.ReadWriteByUser)
n.command.ExecCommand("-s", "reload").CombinedOutput()
}
configureDynamically()
- 渡された
Configuration
の情報をもとにBackends[]
を作成 -
Backends[]
をBodyにセットしてlocalhost
にリクエストを送る
internal/ingress/controller/nginx.go
// configureDynamically encodes new Backends in JSON format and POSTs the
// payload to an internal HTTP endpoint handled by Lua.
func configureDynamically(pcfg *ingress.Configuration, isDynamicCertificatesEnabled bool) error {
configureBackends(pcfg.Backends)
}
func configureBackends(rawBackends []*ingress.Backend) error {
backends := make([]*ingress.Backend, len(rawBackends))
for i, backend := range rawBackends {
luaBackend := &ingress.Backend{
:
}
backends[i] = luaBackend
}
statusCode, _, err := nginx.NewPostStatusRequest("/configuration/backends", "application/json", backends)
}
nginx.tmpl (nginx.conf)
- LuaのconfigurationモジュールをLoad
-
localhost:10246/configuration
にリクエストが来ると、configurationモジュールのcall()
に処理を渡す
rootfs/etc/nginx/template/nginx.tmpl
# LuaモジュールのLoad
init_by_lua_block {
ok, res = pcall(require, "configuration")
configuration = res
}
server {
listen 127.0.0.1:10246;
location /configuration {
content_by_lua_block {
configuration.call()
}
}
}
configuration.lua
call()
では、backend設定をリクエストのBodyで受け取り共有メモリ上に保存する
rootfs/etc/nginx/lua/configuration.lua
-- this is the Lua representation of Configuration struct in internal/ingress/types.go
local configuration_data = ngx.shared.configuration_data
function _M.call()
local backends = fetch_request_body()
local success, err = configuration_data:set("backends", backends)
end
処理②:リクエストのルーティング
処理の流れ
- nginx.confの設定に基づきリクエストを処理
- リクエストをLuaのbalancerモジュールに連携
- Luaのbalancerモジュールでルーティング先を決定
コードを追ってみる
nginx.tmpl (nginx.conf)
- 初期処理
- Luaモジュール(balancer)のLoad
-
init_worker()
の呼び出し
- リクエストのルーティング
- Server/Locationで設定されている、Hostname+Pathを確認
- 対応するBackendの名前をnginxの変数にセット
-
balancer()
の呼び出し
rootfs/etc/nginx/template/nginx.tmpl
init_by_lua_block {
ok, res = pcall(require, "balancer")
balancer = res
}
init_worker_by_lua_block {
balancer.init_worker()
}
# すべてのリクエストをこのupstreamで受けて、Luaモジュール(balanacer)でバランシングを行う
upstream upstream_balancer {
balancer_by_lua_block {
balancer.balance()
}
}
# 各Hostname/Pathごとに、対応するBackendの名前をセットして、upstream_balancerにリクエストを流す
server {
location /foo {
set $proxy_upstream_name "ingress-nginx-foo-service-5678";
proxy_pass http://upstream_balancer;
}
location /bar {
set $proxy_upstream_name "ingress-nginx-bar-service-5678";
proxy_pass http://upstream_balancer;
}
}
balancer.lua
rootfs/etc/nginx/lua/balancer.lua
local _M = {}
local balancers = {}
初期化
- Luaモジュール(configuration)からbackendsのデータを受け取る
- バランシングロジックに応じて初期化したbackendをbalancers[]にセット
rootfs/etc/nginx/lua/balancer.lua
function _M.init_worker()
sync_backends()
end
local function sync_backends()
local backends_data = configuration.get_backends_data()
for _, new_backend in ipairs(new_backends) do
sync_backend(new_backend)
end
end
local function sync_backend(backend)
local implementation = backend["load-balance"] or DEFAULT_LB_ALG
local balancer = balancers[backend.name]
if not balancer then
balancers[backend.name] = implementation:new(backend)
return
end
end
バランシング
- nginx.confに設定されているproxy_upstream_nameの値から、利用するbackendを決定
- バランシングロジックに応じて、最終的な接続先を決定
rootfs/etc/nginx/lua/balancer.lua
function _M.balance()
local balancer = get_balancer()
# 接続先を決定する
local peer = balancer:balance()
local ok, err = ngx_balancer.set_current_peer(peer)
end
local function get_balancer()
local backend_name = ngx.var.proxy_upstream_name
local balancer = balancers[backend_name]
return balancer
end
return _M
round_robin.lua
-
backend.endpoints
(PodのIP/Portのリスト)をinstance
として登録 -
.balance()
で、round_robinロジックで接続先を決定
rootfs/etc/nginx/lua/balancer/round_robin.lua
local balancer_resty = require("balancer.resty")
local resty_roundrobin = require("resty.roundrobin")
local util = require("util")
local _M = balancer_resty:new({ factory = resty_roundrobin, name = "round_robin" })
function _M.new(self, backend)
local nodes = util.get_nodes(backend.endpoints)
local o = {
instance = self.factory:new(nodes),
}
return o
end
function _M.balance(self)
return self.instance:find()
end
処理③:Canary
処理フロー
-
Configuration
生成時に、- Hostname + Pathを元に、
Backend.AlternativeBackends
にCanaryリリース用のBackendの名前をセットする -
Backend.TrafficShapingPolicy
にCanaryリリースの条件をセットする
- Hostname + Pathを元に、
- リクエストのルーティング時に、
traffic_shaping_policy
に応じてリクエスト先のBackendを判定する
コードを追ってみる
controller.go
- Hostname + Pathを元に、
Backend.AlternativeBackends
にCanaryリリース用のBackendの名前をセットする
internal/ingress/controller/controller.go
// getBackendServers returns a list of Upstream and Server to be used by the
// backend. An upstream can be used in multiple servers if the namespace,
// service name and port are the same.
func (n *NGINXController) getBackendServers(ingresses []*ingress.Ingress) ([]*ingress.Backend, []*ingress.Server) {
upstreams := n.createUpstreams(ingresses, du)
var canaryIngresses []*ingress.Ingress
for _, ing := range ingresses {
// set aside canary ingresses to merge later
if anns.Canary.Enabled {
canaryIngresses = append(canaryIngresses, ing)
}
}
if nonCanaryIngressExists(ingresses, canaryIngresses) {
for _, canaryIng := range canaryIngresses {
mergeAlternativeBackends(canaryIng, upstreams, servers)
}
}
}
func mergeAlternativeBackends(ing *ingress.Ingress, upstreams map[string]*ingress.Backend,
servers map[string]*ingress.Server) {
for _, rule := range ing.Spec.Rules {
for _, path := range rule.HTTP.Paths {
upsName := upstreamName(ing.Namespace, path.Backend.ServiceName, path.Backend.ServicePort)
altUps := upstreams[upsName]
server, ok := servers[rule.Host]
// find matching paths
for _, loc := range server.Locations {
priUps := upstreams[loc.Backend]
if canMergeBackend(priUps, altUps) && loc.Path == path.Path {
klog.V(2).Infof("matching backend %v found for alternative backend %v",
priUps.Name, altUps.Name)
merged = mergeAlternativeBackend(priUps, altUps)
}
}
}
func canMergeBackend(primary *ingress.Backend, alternative *ingress.Backend) bool {
return alternative != nil && primary.Name != alternative.Name && primary.Name != defUpstreamName && !primary.NoServer
}
// Performs the merge action and checks to ensure that one two alternative backends do not merge into each other
func mergeAlternativeBackend(priUps *ingress.Backend, altUps *ingress.Backend) bool {
if priUps.NoServer {
klog.Warningf("unable to merge alternative backend %v into primary backend %v because %v is a primary backend",
altUps.Name, priUps.Name, priUps.Name)
return false
}
for _, ab := range priUps.AlternativeBackends {
if ab == altUps.Name {
klog.V(2).Infof("skip merge alternative backend %v into %v, it's already present", altUps.Name, priUps.Name)
return true
}
}
priUps.AlternativeBackends =
append(priUps.AlternativeBackends, altUps.Name)
return true
}
-
Backend.TrafficShapingPolicy
にCanaryリリースの条件をセットする
internal/ingress/controller/controller.go
func (n *NGINXController) createUpstreams(data []*ingress.Ingress, du *ingress.Backend) map[string]*ingress.Backend {
// configure traffic shaping for canary
if anns.Canary.Enabled {
upstreams[name].NoServer = true
upstreams[name].TrafficShapingPolicy = ingress.TrafficShapingPolicy{
Weight: anns.Canary.Weight,
Header: anns.Canary.Header,
HeaderValue: anns.Canary.HeaderValue,
Cookie: anns.Canary.Cookie,
}
}
}
balancer.lua
Canary用のBackendが設定されている場合は、traffic_shaping_policy
に応じてリクエスト先のBackendを判定する。
rootfs/etc/nginx/lua/balancer.lua
local function get_balancer()
local backend_name = ngx.var.proxy_upstream_name
local balancer = balancers[backend_name]
if route_to_alternative_balancer(balancer) then
local alternative_balancer = balancers[balancer.alternative_backends[1]]
return alternative_balancer
end
return balancer
end
local function route_to_alternative_balancer(balancer)
-- TODO: support traffic shaping for n > 1 alternative backends
local backend_name = balancer.alternative_backends[1]
local target_cookie = traffic_shaping_policy.cookie
local cookie = ngx.var["cookie_" .. target_cookie]
if cookie then
if cookie == "always" then
return true
elseif cookie == "never" then
return false
end
end
end
return _M