LoginSignup
7
4

More than 3 years have passed since last update.

envoy xDS により動的な設定変更を実現する(入門編)

Posted at

概要

  • envoy の設定を動的に変更する機構である xDS を試してみた
  • ちょっとつっこんだ内容になると日本語の情報がほとんどなくて辛かったのでここに記しておきたい

対象読者

xDS とは?

  • 掻い摘んで言うと複数の envoy proxy サーバーの設定を gRPC などにより動的に制御するために用意された仕組み
  • envoy では設定ファイルの構造が Protocol Buffers により定義されている
    • この定義を元にした gRPC インタフェイスも用意されている
    • 定義に従い gRPC サーバーを自分で実装して配置することにより、動的設定の変更を envoy に通知するような仕組みを実現することができる

サンプル

下記の前提条件をもとに、実際の設定・コードを追いながら解説していく。

前提条件

  • バックエンドに2つのサービス群がある
    • 片方を blue もう片方を green とする
  • そのサービス群のうち片方はインターネットに公開しても良いが、もう片方はIP制限をかけたい
  • どのサービス群が公開されているか取得するAPIと、それを入れ替えるAPIを REST API として実装したい

envoy本体の設定

  • まず、クラスタ名とIDを設定する必要があり、これを後述のコントロールプレーンにおいて識別子として利用する
  • static_resources には通常の envoy の設定と同じように、動的に設定する必要がない設定を入れる
    • ここでは実際にロードバランスしたいクラスタ設定(grpc_blue, grpc_green)と、動的設定を通知するためのコントロールプレーン(xds_cluster)を設定している
  • dynamic_resources に動的に設定したい項目を入れる
    • 今回はリスナー設定 listeners を変更したいので lds_config を設定する
      • それ以外のを変えたい場合はそれに応じたキーの設定を入れる必要がある
      • 例えば clusters であれば cds_config とか
    • 今回は gRPC でコントロールプレーンを実装するので api_type: GRPC とする
    • grpc_services.envoy_grpc.cluster_name に上で設定した xds_cluster を設定する
  • admin はあってもなくてもよいが、動作確認に便利なので一応入れておくと良い
  • ちなみに envoy が yaml をパースするのに利用している yaml-cppalias merge に対応していないらしいので、実際に読み込ませるファイルは何らかの方法で展開しておく必要がある
    • こういうやつ→ <<: *alias
node:
  cluster: front-envoy
  id: node1
static_resources:
  clusters:
    - &cluster
      name: grpc_blue
      connect_timeout: 1.0s
      type: strict_dns
      lb_policy: round_robin
      http2_protocol_options: {}
      health_checks:
        - timeout: 1s
          interval: 5s
          unhealthy_threshold: 3
          healthy_threshold: 3
          tcp_health_check: {}
      hosts:
        - socket_address:
            address: ap-server
            port_value: 30001
    - <<: *cluster
      name: grpc_green
      hosts:
        - socket_address:
            address: ap-server
            port_value: 30002
    - name: xds_cluster
      connect_timeout: 0.5s
      type: strict_dns
      lb_policy: round_robin
      http2_protocol_options: {}
      load_assignment:
        cluster_name: xds_cluster
        endpoints:
          - lb_endpoints:
            - endpoint:
                address:
                  socket_address: {address: xds-server, port_value: 30000}
dynamic_resources:
  lds_config:
    api_config_source:
      api_type: GRPC
      grpc_services:
        envoy_grpc:
          cluster_name: xds_cluster
admin:
  access_log_path: /dev/null
  address:
    socket_address:
      address: 0.0.0.0
      port_value: 9000

コントロールプレーンの実装

コードの全体像

コントロールプレーンとは直接関係ない処理

  • 現在公開中のサービスタイプを保持する処理とSnapshotCache を作成する際に必要になる構造体を定義しておく
  • 実際にはコントロールプレーンが突然落ちた場合にもサービスタイプが保持されていてほしいはずなので、何らかの外部キャッシュに入れておくのが良いと思われる

const (
    ClusterName = "front-envoy"
    NodeID      = "node1"
)

// define blue/green service
type serviceType string

const (
    BlueService  serviceType = "blue"
    GreenService serviceType = "green"
)

type currentService struct {
    mu *sync.RWMutex
    st serviceType
}

func (s *currentService) Get() serviceType {
    s.mu.RLock()
    defer s.mu.RUnlock()
    return s.st
}
func (s *currentService) GetString() string {
    return string(s.Get())
}
func (s *currentService) Swap() serviceType {
    s.mu.Lock()
    defer s.mu.Unlock()
    if s.st == BlueService {
        s.st = GreenService
    } else {
        s.st = BlueService
    }
    return s.st
}

var cs = &currentService{
    mu: &sync.RWMutex{},
    st: BlueService,
}

cache.NodeHash インターフェイスに適合した構造体を定義

  • SnapshotCache を作成する際に必要になるので用意する
type hash struct{}

func (h hash) ID(node *core.Node) string {
    if node == nil {
        return "unknown"
    }
    return node.Cluster + "/" + node.Id
}

SnapshotCache を返す関数を定義

  • cache.NewSnapshot() 関数に対して動的に設定したい項目を渡す
    • 動的に設定する必要がないものに関しては nil を渡して良い
  • ここで肝となるのは cache.NewSnapshot() の第一引数に渡すバージョン文字列
    • 設定のキャッシュの動的な変更を検知するために、このバージョン文字列を使用している
    • そのため、設定に変更が掛かったときには、この文字列も一緒に変更される必要がある
    • 本来なら設定値からハッシュ値みたいなのを生成したりするのが良さそうではあるが、ここではただ単に snapshot を生成する度に新しい UUID を割り振るようにしてある
func newSnapshot(current serviceType) cache.Snapshot {
    return cache.NewSnapshot(uuid.New().String(), nil, nil, nil, getListeners(current), nil)
}

リスナー設定を返す関数を定義

  • blue / green への接続を待ち受けるリスナー設定をここで定義する
  • 渡されたサービスタイプに応じて、そうでないリスナーにはIP制限をかけるフィルターを追加する
  • 設定の定義するためのドキュメントがあまり存在しないので、公式の設定ファイルに関するドキュメントを見つつ、適宜 envoyproxy/go-control-plane を見て定義すると良いと思われる
    • 構造としては、設定ファイルで yaml を書く場合と大体は同じであるが、若干書き方に癖があるものもある
    • .protoファイルやそれを元に生成された xxxx.pb.go のようなファイルに構造が定義されているのでその辺を中心に見ながら書くと良さそう
    • IDEを使っている場合には設定ファイルでのキーを入力し、コード補完されるものに定義ジャンプしたりすると効率的である
func getListeners(current serviceType) []cache.Resource {
    var listeners []cache.Resource

    ipRestrictFilter := &listener.Filter{
        Name: "envoy.filters.network.rbac",
        ConfigType: &listener.Filter_TypedConfig{
            TypedConfig: rbacConfig(),
        },
    }
    ports := map[serviceType]uint32{
        BlueService:  8001,
        GreenService: 8002,
    }
    for st, port := range ports {
        l := &api.Listener{
            Name: "listener_" + string(st),
            Address: &core.Address{
                Address: &core.Address_SocketAddress{
                    SocketAddress: &core.SocketAddress{
                        Protocol: core.SocketAddress_TCP,
                        Address:  "0.0.0.0",
                        PortSpecifier: &core.SocketAddress_PortValue{
                            PortValue: port,
                        },
                    },
                },
            },
            FilterChains: []*listener.FilterChain{{
                Name:    "envoy.http_connection_manager",
                Filters: []*listener.Filter{},
            }},
        }
        httpConnectionManager := &listener.Filter{
            Name: "envoy.http_connection_manager",
            ConfigType: &listener.Filter_TypedConfig{
                TypedConfig: httpConnectionManagerConfig(st),
            },
        }
        filters := &l.FilterChains[0].Filters
        if current != st {
            *filters = append(*filters, ipRestrictFilter)
        }
        *filters = append(*filters, httpConnectionManager)
        listeners = append(listeners, l)
    }

    return listeners
}

// ruled base access control
func rbacConfig() *any.Any {
    conf := &filterNetworkRbac.RBAC{
        StatPrefix: "ip_restrictions.",
        Rules: &rbac.RBAC{
            Action: rbac.RBAC_ALLOW,
            Policies: map[string]*rbac.Policy{
                "allow_host": {
                    Permissions: []*rbac.Permission{{
                        Rule: &rbac.Permission_Any{Any: true},
                    }},
                    Principals: []*rbac.Principal{{
                        Identifier: &rbac.Principal_SourceIp{SourceIp: &core.CidrRange{
                            AddressPrefix: "1.1.1.1",
                            PrefixLen:     &wrappers.UInt32Value{Value: 16},
                        }},
                    }},
                },
            },
        },
    }
    rbacConf, err := ptypes.MarshalAny(conf)
    if err != nil {
        panic(err)
    }
    return rbacConf
}

// http listener config
func httpConnectionManagerConfig(t serviceType) *any.Any {
    accessLogFields := map[string]string{
        "protocol":                          "",
        "duration":                          "",
        "start_time":                        "",
        "bytes_received":                    "",
        "response_code":                     "",
        "bytes_sent":                        "",
        "response_flags":                    "",
        "route_name":                        "",
        "upstream_host":                     "",
        "upstream_transport_failure_reason": "",
        "downstream_remote_address":         "",
        "method":                            "%REQ(:METHOD)%",
        "path":                              "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%",
        "x-forwarded-for":                   "%REQ(X-FORWARDED-FOR)%",
        "user-agent":                        "%REQ(USER-AGENT)%",
    }
    jsonFormat := &spb.Struct{
        Fields: map[string]*spb.Value{},
    }
    for k, v := range accessLogFields {
        format := v
        if format == "" {
            format = "%" + strcase.ToScreamingSnake(k) + "%"
        }
        jsonFormat.Fields[k] = &spb.Value{
            Kind: &spb.Value_StringValue{StringValue: format},
        }
    }
    accessLogConf := &accessLog.FileAccessLog{
        Path:            "/dev/stdout",
        AccessLogFormat: &accessLog.FileAccessLog_JsonFormat{JsonFormat: jsonFormat},
    }
    accessLogTypedConf, err := ptypes.MarshalAny(accessLogConf)
    if err != nil {
        panic(err)
    }
    httpConnMgrConfig := &manager.HttpConnectionManager{
        CodecType:  manager.HttpConnectionManager_AUTO,
        StatPrefix: "ingress_http_" + string(t),
        AccessLog: []*filterAccessLog.AccessLog{{
            Name:       "envoy.file_access_log",
            ConfigType: &filterAccessLog.AccessLog_TypedConfig{TypedConfig: accessLogTypedConf},
        }},
        UseRemoteAddress: &wrappers.BoolValue{Value: true},
        HttpFilters: []*manager.HttpFilter{{
            Name: "envoy.router",
        }},
        RouteSpecifier: &manager.HttpConnectionManager_RouteConfig{RouteConfig: &api.RouteConfiguration{
            Name: "route_to_grpc",
            VirtualHosts: []*route.VirtualHost{{
                Name:    "backend",
                Domains: []string{"*"},
                Routes: []*route.Route{{
                    Match: &route.RouteMatch{
                        PathSpecifier: &route.RouteMatch_Prefix{
                            Prefix: "/",
                        },
                    },
                    Action: &route.Route_Route{Route: &route.RouteAction{
                        ClusterSpecifier: &route.RouteAction_Cluster{
                            Cluster: "grpc_" + string(t),
                        },
                        Timeout: &duration.Duration{Seconds: 0, Nanos: 0},
                    }},
                }},
            }},
        }},
    }
    httpConnMgrTypedConf, err := ptypes.MarshalAny(httpConnMgrConfig)
    if err != nil {
        panic(err)
    }
    return httpConnMgrTypedConf
}

main処理

  • SnapShotCache を初期化し、上記の本体の設定ファイルで記述した cluster.idcluster.node に対する設定を cache に入れておく
  • NewServer() を呼び出し、生成した cache を入れる
  • RegisterListenerDiscoveryServiceServer() を呼び出し、gRPC サーバーとしての処理を登録する
    • 動的に設定したいものに応じて呼び出す関数は異なるので注意
  • あとは普通の gRPC サーバーや REST API を書くケースと大体同じように実装すれば良い
    • 下記では現在のサービス名を取得する GET /service と、サービスを入れ替える POST /service/swap を httprouter に登録している
    • 設定の動的な変更は SetSnapshot() を呼び出すことにより実現できる
func main() {
    var (
        xdsPort   string
        adminPort string
    )
    flag.StringVar(&xdsPort, "port", "30000", "listen xds server port")
    flag.StringVar(&adminPort, "admin-port", "40000", "listen admin api port")
    flag.Parse()

    ctx := context.Background()

    c := cache.NewSnapshotCache(false, hash{}, &stdLogger{})
    nodeName := ClusterName + "/" + NodeID
    if err := c.SetSnapshot(nodeName, newSnapshot(cs.Get())); err != nil {
        panic(err)
    }

    server := xds.NewServer(ctx, c, &callbacks{})
    grpcServer := grpc.NewServer()
    api.RegisterListenerDiscoveryServiceServer(grpcServer, server)

    router := httprouter.New()
    router.GET("/service", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
        w.WriteHeader(200)
        if _, err := w.Write([]byte(cs.GetString())); err != nil {
            log.Printf("err: %+v", err)
        }
    })
    router.POST("/service/swap", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
        current := cs.Swap()
        if err := c.SetSnapshot(nodeName, newSnapshot(current)); err != nil {
            w.WriteHeader(500)
            if _, err := w.Write([]byte("cannot set snapshot")); err != nil {
                log.Printf("err: %+v", err)
            }
            return
        }
        w.WriteHeader(200)
        if _, err := w.Write([]byte(current)); err != nil {
            log.Printf("err: %+v", err)
        }
    })

    xdsHost := "0.0.0.0:" + xdsPort
    xdsSock, err := net.Listen("tcp", xdsHost)
    if err != nil {
        panic(err)
    }
    go func() {
        log.Printf("listen xds sever on %s", xdsHost)
        if err := grpcServer.Serve(xdsSock); err != nil {
            log.Printf("closed: %+v", err)
        }
    }()

    adminHost := "0.0.0.0:" + adminPort
    adminSock, err := net.Listen("tcp", adminHost)
    if err != nil {
        panic(err)
    }
    defer adminSock.Close()
    go func() {
        log.Printf("listen admin api server on %s", adminHost)
        if err := http.Serve(adminSock, router); err != nil {
            log.Printf("closed: %+v", err)
        }
    }()

    sig := make(chan os.Signal, 1)
    signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
    <-sig

    grpcServer.GracefulStop()
}
7
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
4