概要
-
envoy
の設定を動的に変更する機構であるxDS
を試してみた - ちょっとつっこんだ内容になると日本語の情報がほとんどなくて辛かったのでここに記しておきたい
対象読者
- envoy についてある程度知ってる
- Go言語が分かる
xDS とは?
- 掻い摘んで言うと複数の envoy proxy サーバーの設定を gRPC などにより動的に制御するために用意された仕組み
- envoy では設定ファイルの構造が Protocol Buffers により定義されている
- この定義を元にした gRPC インタフェイスも用意されている
- 定義に従い gRPC サーバーを自分で実装して配置することにより、動的設定の変更を envoy に通知するような仕組みを実現することができる
サンプル
下記の前提条件をもとに、実際の設定・コードを追いながら解説していく。
前提条件
- バックエンドに2つのサービス群がある
- 片方を
blue
もう片方をgreen
とする
- 片方を
- そのサービス群のうち片方はインターネットに公開しても良いが、もう片方はIP制限をかけたい
- どのサービス群が公開されているか取得するAPIと、それを入れ替えるAPIを REST API として実装したい
envoy本体の設定
- まず、クラスタ名とIDを設定する必要があり、これを後述のコントロールプレーンにおいて識別子として利用する
-
static_resources
には通常の envoy の設定と同じように、動的に設定する必要がない設定を入れる- ここでは実際にロードバランスしたいクラスタ設定(grpc_blue, grpc_green)と、動的設定を通知するためのコントロールプレーン(xds_cluster)を設定している
-
dynamic_resources
に動的に設定したい項目を入れる- 今回はリスナー設定
listeners
を変更したいのでlds_config
を設定する- それ以外のを変えたい場合はそれに応じたキーの設定を入れる必要がある
- 例えば
clusters
であればcds_config
とか
- 今回は gRPC でコントロールプレーンを実装するので
api_type: GRPC
とする -
grpc_services.envoy_grpc.cluster_name
に上で設定したxds_cluster
を設定する
- 今回はリスナー設定
-
admin
はあってもなくてもよいが、動作確認に便利なので一応入れておくと良い - ちなみに envoy が yaml をパースするのに利用している
yaml-cpp
はalias merge
に対応していないらしいので、実際に読み込ませるファイルは何らかの方法で展開しておく必要がある- こういうやつ→
<<: *alias
- こういうやつ→
node:
cluster: front-envoy
id: node1
static_resources:
clusters:
- &cluster
name: grpc_blue
connect_timeout: 1.0s
type: strict_dns
lb_policy: round_robin
http2_protocol_options: {}
health_checks:
- timeout: 1s
interval: 5s
unhealthy_threshold: 3
healthy_threshold: 3
tcp_health_check: {}
hosts:
- socket_address:
address: ap-server
port_value: 30001
- <<: *cluster
name: grpc_green
hosts:
- socket_address:
address: ap-server
port_value: 30002
- name: xds_cluster
connect_timeout: 0.5s
type: strict_dns
lb_policy: round_robin
http2_protocol_options: {}
load_assignment:
cluster_name: xds_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address: {address: xds-server, port_value: 30000}
dynamic_resources:
lds_config:
api_config_source:
api_type: GRPC
grpc_services:
envoy_grpc:
cluster_name: xds_cluster
admin:
access_log_path: /dev/null
address:
socket_address:
address: 0.0.0.0
port_value: 9000
コントロールプレーンの実装
コードの全体像
コントロールプレーンとは直接関係ない処理
- 現在公開中のサービスタイプを保持する処理と
SnapshotCache
を作成する際に必要になる構造体を定義しておく - 実際にはコントロールプレーンが突然落ちた場合にもサービスタイプが保持されていてほしいはずなので、何らかの外部キャッシュに入れておくのが良いと思われる
const (
ClusterName = "front-envoy"
NodeID = "node1"
)
// define blue/green service
type serviceType string
const (
BlueService serviceType = "blue"
GreenService serviceType = "green"
)
type currentService struct {
mu *sync.RWMutex
st serviceType
}
func (s *currentService) Get() serviceType {
s.mu.RLock()
defer s.mu.RUnlock()
return s.st
}
func (s *currentService) GetString() string {
return string(s.Get())
}
func (s *currentService) Swap() serviceType {
s.mu.Lock()
defer s.mu.Unlock()
if s.st == BlueService {
s.st = GreenService
} else {
s.st = BlueService
}
return s.st
}
var cs = ¤tService{
mu: &sync.RWMutex{},
st: BlueService,
}
cache.NodeHash
インターフェイスに適合した構造体を定義
- SnapshotCache を作成する際に必要になるので用意する
type hash struct{}
func (h hash) ID(node *core.Node) string {
if node == nil {
return "unknown"
}
return node.Cluster + "/" + node.Id
}
SnapshotCache を返す関数を定義
-
cache.NewSnapshot()
関数に対して動的に設定したい項目を渡す- 動的に設定する必要がないものに関しては nil を渡して良い
- ここで肝となるのは
cache.NewSnapshot()
の第一引数に渡すバージョン文字列- 設定のキャッシュの動的な変更を検知するために、このバージョン文字列を使用している
- そのため、設定に変更が掛かったときには、この文字列も一緒に変更される必要がある
- 本来なら設定値からハッシュ値みたいなのを生成したりするのが良さそうではあるが、ここではただ単に snapshot を生成する度に新しい
UUID
を割り振るようにしてある
func newSnapshot(current serviceType) cache.Snapshot {
return cache.NewSnapshot(uuid.New().String(), nil, nil, nil, getListeners(current), nil)
}
リスナー設定を返す関数を定義
- blue / green への接続を待ち受けるリスナー設定をここで定義する
- 渡されたサービスタイプに応じて、そうでないリスナーにはIP制限をかけるフィルターを追加する
- 設定の定義するためのドキュメントがあまり存在しないので、公式の設定ファイルに関するドキュメントを見つつ、適宜
envoyproxy/go-control-plane
を見て定義すると良いと思われる- 構造としては、設定ファイルで yaml を書く場合と大体は同じであるが、若干書き方に癖があるものもある
- .protoファイルやそれを元に生成された
xxxx.pb.go
のようなファイルに構造が定義されているのでその辺を中心に見ながら書くと良さそう - IDEを使っている場合には設定ファイルでのキーを入力し、コード補完されるものに定義ジャンプしたりすると効率的である
func getListeners(current serviceType) []cache.Resource {
var listeners []cache.Resource
ipRestrictFilter := &listener.Filter{
Name: "envoy.filters.network.rbac",
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: rbacConfig(),
},
}
ports := map[serviceType]uint32{
BlueService: 8001,
GreenService: 8002,
}
for st, port := range ports {
l := &api.Listener{
Name: "listener_" + string(st),
Address: &core.Address{
Address: &core.Address_SocketAddress{
SocketAddress: &core.SocketAddress{
Protocol: core.SocketAddress_TCP,
Address: "0.0.0.0",
PortSpecifier: &core.SocketAddress_PortValue{
PortValue: port,
},
},
},
},
FilterChains: []*listener.FilterChain{{
Name: "envoy.http_connection_manager",
Filters: []*listener.Filter{},
}},
}
httpConnectionManager := &listener.Filter{
Name: "envoy.http_connection_manager",
ConfigType: &listener.Filter_TypedConfig{
TypedConfig: httpConnectionManagerConfig(st),
},
}
filters := &l.FilterChains[0].Filters
if current != st {
*filters = append(*filters, ipRestrictFilter)
}
*filters = append(*filters, httpConnectionManager)
listeners = append(listeners, l)
}
return listeners
}
// ruled base access control
func rbacConfig() *any.Any {
conf := &filterNetworkRbac.RBAC{
StatPrefix: "ip_restrictions.",
Rules: &rbac.RBAC{
Action: rbac.RBAC_ALLOW,
Policies: map[string]*rbac.Policy{
"allow_host": {
Permissions: []*rbac.Permission{{
Rule: &rbac.Permission_Any{Any: true},
}},
Principals: []*rbac.Principal{{
Identifier: &rbac.Principal_SourceIp{SourceIp: &core.CidrRange{
AddressPrefix: "1.1.1.1",
PrefixLen: &wrappers.UInt32Value{Value: 16},
}},
}},
},
},
},
}
rbacConf, err := ptypes.MarshalAny(conf)
if err != nil {
panic(err)
}
return rbacConf
}
// http listener config
func httpConnectionManagerConfig(t serviceType) *any.Any {
accessLogFields := map[string]string{
"protocol": "",
"duration": "",
"start_time": "",
"bytes_received": "",
"response_code": "",
"bytes_sent": "",
"response_flags": "",
"route_name": "",
"upstream_host": "",
"upstream_transport_failure_reason": "",
"downstream_remote_address": "",
"method": "%REQ(:METHOD)%",
"path": "%REQ(X-ENVOY-ORIGINAL-PATH?:PATH)%",
"x-forwarded-for": "%REQ(X-FORWARDED-FOR)%",
"user-agent": "%REQ(USER-AGENT)%",
}
jsonFormat := &spb.Struct{
Fields: map[string]*spb.Value{},
}
for k, v := range accessLogFields {
format := v
if format == "" {
format = "%" + strcase.ToScreamingSnake(k) + "%"
}
jsonFormat.Fields[k] = &spb.Value{
Kind: &spb.Value_StringValue{StringValue: format},
}
}
accessLogConf := &accessLog.FileAccessLog{
Path: "/dev/stdout",
AccessLogFormat: &accessLog.FileAccessLog_JsonFormat{JsonFormat: jsonFormat},
}
accessLogTypedConf, err := ptypes.MarshalAny(accessLogConf)
if err != nil {
panic(err)
}
httpConnMgrConfig := &manager.HttpConnectionManager{
CodecType: manager.HttpConnectionManager_AUTO,
StatPrefix: "ingress_http_" + string(t),
AccessLog: []*filterAccessLog.AccessLog{{
Name: "envoy.file_access_log",
ConfigType: &filterAccessLog.AccessLog_TypedConfig{TypedConfig: accessLogTypedConf},
}},
UseRemoteAddress: &wrappers.BoolValue{Value: true},
HttpFilters: []*manager.HttpFilter{{
Name: "envoy.router",
}},
RouteSpecifier: &manager.HttpConnectionManager_RouteConfig{RouteConfig: &api.RouteConfiguration{
Name: "route_to_grpc",
VirtualHosts: []*route.VirtualHost{{
Name: "backend",
Domains: []string{"*"},
Routes: []*route.Route{{
Match: &route.RouteMatch{
PathSpecifier: &route.RouteMatch_Prefix{
Prefix: "/",
},
},
Action: &route.Route_Route{Route: &route.RouteAction{
ClusterSpecifier: &route.RouteAction_Cluster{
Cluster: "grpc_" + string(t),
},
Timeout: &duration.Duration{Seconds: 0, Nanos: 0},
}},
}},
}},
}},
}
httpConnMgrTypedConf, err := ptypes.MarshalAny(httpConnMgrConfig)
if err != nil {
panic(err)
}
return httpConnMgrTypedConf
}
main処理
- SnapShotCache を初期化し、上記の本体の設定ファイルで記述した
cluster.id
とcluster.node
に対する設定を cache に入れておく -
NewServer()
を呼び出し、生成した cache を入れる -
RegisterListenerDiscoveryServiceServer()
を呼び出し、gRPC サーバーとしての処理を登録する- 動的に設定したいものに応じて呼び出す関数は異なるので注意
- あとは普通の gRPC サーバーや REST API を書くケースと大体同じように実装すれば良い
- 下記では現在のサービス名を取得する
GET /service
と、サービスを入れ替えるPOST /service/swap
を httprouter に登録している - 設定の動的な変更は
SetSnapshot()
を呼び出すことにより実現できる
- 下記では現在のサービス名を取得する
func main() {
var (
xdsPort string
adminPort string
)
flag.StringVar(&xdsPort, "port", "30000", "listen xds server port")
flag.StringVar(&adminPort, "admin-port", "40000", "listen admin api port")
flag.Parse()
ctx := context.Background()
c := cache.NewSnapshotCache(false, hash{}, &stdLogger{})
nodeName := ClusterName + "/" + NodeID
if err := c.SetSnapshot(nodeName, newSnapshot(cs.Get())); err != nil {
panic(err)
}
server := xds.NewServer(ctx, c, &callbacks{})
grpcServer := grpc.NewServer()
api.RegisterListenerDiscoveryServiceServer(grpcServer, server)
router := httprouter.New()
router.GET("/service", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
w.WriteHeader(200)
if _, err := w.Write([]byte(cs.GetString())); err != nil {
log.Printf("err: %+v", err)
}
})
router.POST("/service/swap", func(w http.ResponseWriter, r *http.Request, p httprouter.Params) {
current := cs.Swap()
if err := c.SetSnapshot(nodeName, newSnapshot(current)); err != nil {
w.WriteHeader(500)
if _, err := w.Write([]byte("cannot set snapshot")); err != nil {
log.Printf("err: %+v", err)
}
return
}
w.WriteHeader(200)
if _, err := w.Write([]byte(current)); err != nil {
log.Printf("err: %+v", err)
}
})
xdsHost := "0.0.0.0:" + xdsPort
xdsSock, err := net.Listen("tcp", xdsHost)
if err != nil {
panic(err)
}
go func() {
log.Printf("listen xds sever on %s", xdsHost)
if err := grpcServer.Serve(xdsSock); err != nil {
log.Printf("closed: %+v", err)
}
}()
adminHost := "0.0.0.0:" + adminPort
adminSock, err := net.Listen("tcp", adminHost)
if err != nil {
panic(err)
}
defer adminSock.Close()
go func() {
log.Printf("listen admin api server on %s", adminHost)
if err := http.Serve(adminSock, router); err != nil {
log.Printf("closed: %+v", err)
}
}()
sig := make(chan os.Signal, 1)
signal.Notify(sig, os.Interrupt, syscall.SIGTERM, syscall.SIGINT, syscall.SIGABRT)
<-sig
grpcServer.GracefulStop()
}