この記事について
Envoy 初心者が勉強用に xDS サーバーを実装してみてハマったことをまとめてみました(xDS や EDS が何かについてはよくまとまったサイトがたくさんあるので省きます)。
実装したサーバー以下のようなものです。
- EDS(Endpoint Discovery Service)のみ実装
- Envoy の v3 API を使用
- gRPC を使用
- 言語は Go(1.15.6) を使用
全体像
よくある基本的な構成です。Envoy が、EDS を実装した xDS サーバーからクラスターのエンドポイント情報を取得します。
なお、curl 以外は全てコンテナとして同一 docker ネットワーク上で起動させてます。
参考にしたサイト
各コンポの実装
Envoy と xDS サーバーの部分だけ記載します。
Envoy
Envoy の設定ファイルでは、クラスターのエンドポイントを xDS サーバーから取得するようにしてます。該当箇所にはコメントを入れてます。
ハマったところ:
- 最初、eds_cluster_config 配下に
resource_api_version: V3
とtransport_api_version: V3
を付けなかったところ、xDS サーバーからエンドポイント情報が取得できなかった。- その場合、Envoy は xDS サーバーに対して v2 API を呼び出すが、xDS サーバーは v3 API を使うよう実装したので gRPC で
unknown service envoy.api.v2.EndpointDiscoveryService
というエラーが返されていた。
- その場合、Envoy は xDS サーバーに対して v2 API を呼び出すが、xDS サーバーは v3 API を使うよう実装したので gRPC で
node:
cluster: mycluster
id: node1
admin:
access_log_path: /tmp/admin_access.log
address:
socket_address: { address: 0.0.0.0, port_value: 9901 }
static_resources:
listeners:
- name: listener_0
address:
socket_address: { address: 0.0.0.0, port_value: 10000 }
filter_chains:
- filters:
- name: envoy.filters.network.http_connection_manager
typed_config:
"@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
stat_prefix: ingress_http
codec_type: AUTO
route_config:
name: local_route
virtual_hosts:
- name: local_service
domains: ["*"]
routes:
- match: { prefix: "/" }
route: { cluster: target_cluster }
http_filters:
- name: envoy.filters.http.router
clusters:
- name: target_cluster
connect_timeout: 10s
type: EDS
eds_cluster_config:
# service_name: target_services
eds_config:
resource_api_version: V3 # v3 API の xDS リソースを使う
api_config_source: # xDSサーバーからエンドポイントを取得
api_type: GRPC # GRPC を使用(他にはRESTも指定可能)
transport_api_version: V3 # V3 API の xDS トランスポートプロトコルを使う
grpc_services:
- envoy_grpc:
cluster_name: xds_cluster # xDS サーバーもクラスターとして指定
refresh_delay: 5s
- name: xds_cluster
connect_timeout: 10s
type: STRICT_DNS # xDS サーバーを名前で指定してるので STRICT_DNS か LOGICAL_DNS が必要
http2_protocol_options: {}
load_assignment:
cluster_name: xds_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: xds_server # ここでは xDS サーバーをホスト名で指定した(IPでもよい)
port_value: 20000
xDS サーバー
大枠の仕組みとやることは以下の2つのようです。
- キャッシュとスナップショットの生成・登録
- クラスターのエンドポイント情報は、あらかじめ xDS サーバーのキャッシュにスナップショットとして登録しておく
- エンドポイント情報に更新があると、スナップショットのバージョンを更新して再度キャッシュに登録 する
- コールバック関数の用意と登録
- Envoy からの API 呼び出しがあると、xDS サーバーのフレームワークがスナップショットを自動で返してくれる
- なお、フレームワークはAPI 呼び出しの受信時やレスポンス送信の直前に開発者が登録したコールバック関数を呼んでくれる
今回はこんなファイル構成にしてます。
└ src
├── callbacks.go
├── go.mod
├── go.sum
├── main.go
└── snapshot.go
main.go の実装
キャッシュの生成とスナップショットの登録、xDS サーバーの開始を行ってます。
ハマったところ:
- NewSnapshotCache() に渡してる
IDHash
は Envoy のノードID(この場合はnode1
)を返す。この値と SetSnapshot()の第一引数を合わせる必要があるが、当初 Envoy のクラスター名を含めてSetSnapshot("mycluster/node1", ss)
としてしまった。その結果、Envoy からの API 呼び出しに対して応答が返らなかった。- もちろん IDHash 相当を自前で用意して独自のノードIDを返してもよいが、同じように SetSnapshot()の第一引数と合わせる必要がある。
package main
import (
"context"
"log"
"net"
"google.golang.org/grpc"
v3service "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
xdslog "github.com/envoyproxy/go-control-plane/pkg/log"
)
const (
serverAddr = ":20000"
)
func main() {
var xdsLogger = xdslog.LoggerFuncs{
DebugFunc: log.Printf,
InfoFunc: log.Printf,
WarnFunc: log.Printf,
ErrorFunc: log.Printf,
}
// Envoy へのレスポンスを保存しておくキャッシュを生成する。
// IDHash は単純にノードのID(この場合は "node1")を返す。
cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, xdsLogger)
// スナップショットを作成してキャッシュに登録。
// IDHash が返すノードのID(この場合は "node1")をキーにして登録。
ss := getSnapshot()
if err := cache.SetSnapshot("node1", ss); err != nil {
log.Fatalf("SetSnapshot error : %v", err)
}
// 以下、xDS サーバーの生成と gRPC サーバーの開始
xdsServer := v3server.NewServer(context.Background(), cache, &callbacks{})
lis, err := net.Listen("tcp", serverAddr)
if err != nil {
log.Fatalf("Listen error : %v", err)
}
grpcServer := grpc.NewServer()
v3service.RegisterEndpointDiscoveryServiceServer(grpcServer, xdsServer)
if err := grpcServer.Serve(lis); err != nil {
log.Fatalf("Failed to serve GRPC server : %v", err)
}
}
snapshot.go の実装
テスト用にハードコードしたエンドポイント情報を元にスナップショットを作成。
ハマったところ:
- ClusterLoadAssignment 構造体のネストが凄く深いので、自分は今どの階層にいるんだっけ?となる
-
ClusterLoadAssignment には
ClusterName
というメンバーがあるが、これには Envoy が属するクラスター名(例: "mycluster") ではなく、xDS サーバーが属するクラスター名(例: "target_cluster") を指定する必要がある。- envoy.yaml で service_name が記載されてる場合は、その名前を指定する必要がある(例: "target_services")
package main
import (
"log"
v3core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
"github.com/envoyproxy/go-control-plane/pkg/cache/types"
v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)
type upstreamAddr struct {
address string
port uint32
}
// エンドポイントの情報(キーは xDS サーバーが属するクラスター名)。
// あくまでテスト用なのでハードコードしてる。
var upstreamClusters = map[string][]*upstreamAddr{
// クラスター名は envoy 自身が属してるクラスター名(この場合の "mycluster")ではなく、
// xDS サーバーが属してる名前(この場合は "target_cluster") を指定する。
"target_cluster": {
{address: "192.168.158.197", port: 10001},
{address: "192.168.158.197", port: 10002},
},
}
// 指定されたクラスターの EDS リソースを取得
func getEdsResourceForCluster(clusterName string) *v3endpoint.ClusterLoadAssignment {
upstreams, ok := upstreamClusters[clusterName]
if !ok {
log.Printf("Cluster not found : %s", clusterName)
return nil
}
lbEndpoints := []*v3endpoint.LbEndpoint{}
for _, u := range upstreams {
addr := &v3core.Address{
Address: &v3core.Address_SocketAddress{
SocketAddress: &v3core.SocketAddress{
Protocol: v3core.SocketAddress_TCP,
Address: u.address,
PortSpecifier: &v3core.SocketAddress_PortValue{
PortValue: u.port,
},
},
},
}
lep := &v3endpoint.LbEndpoint{
HostIdentifier: &v3endpoint.LbEndpoint_Endpoint{
Endpoint: &v3endpoint.Endpoint{
Address: addr,
},
},
}
lbEndpoints = append(lbEndpoints, lep)
}
assignment := &v3endpoint.ClusterLoadAssignment{
ClusterName: clusterName,
Endpoints: []*v3endpoint.LocalityLbEndpoints{
{LbEndpoints: lbEndpoints},
},
}
return assignment
}
// EDS のリソースを取得
func getEdsResource() []types.Resource {
resources := []types.Resource{}
for clusterName := range upstreamClusters {
r := getEdsResourceForCluster(clusterName)
if r != nil {
resources = append(resources, r)
}
}
return resources
}
// スナップショットを生成
func getSnapshot() v3cache.Snapshot {
// EDS のリソースを取得
eds := getEdsResource()
log.Printf("EDS : %+v", eds)
// 各リソースからスナップショットを生成
return v3cache.NewSnapshot(
"1", // version ※ 1で固定してるが、本当はスナップショットを更新するたびに上げる。
eds, // endpoints
nil, // clusters
nil, // routes
nil, // listeners
nil, // runtimes
nil, // secrets
)
}
callbacks.go の実装
Envoy が xDS サーバーの API を呼ぶと OnStreamRequest, OnStreamResponse などが呼ばれるが、ログを出してるだけで何もしてない。
package main
import (
"context"
discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
"log"
)
type callbacks struct{}
// --------------------------------------------------------------
// REST-JSON part of XDS server
// --------------------------------------------------------------
// OnFetchRequest is called for each Fetch request. Returning an error will end processing of the
// request and respond with an error.
func (cb *callbacks) OnFetchRequest(ctx context.Context, req *discovery.DiscoveryRequest) error {
log.Printf("OnFetchRequest")
return nil
}
// OnFetchResponse is called immediately prior to sending a response.
func (cb *callbacks) OnFetchResponse(req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
log.Printf("OnFetchResponse")
return
}
// --------------------------------------------------------------
// GRPC SoTW (State of The World) part of XDS server
// --------------------------------------------------------------
// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (cb *callbacks) OnStreamOpen(ctx context.Context, streamID int64, typeURL string) error {
log.Printf("OnStreamOpen: StreamID [%d], Type URL [%s]", streamID, typeURL)
return nil
}
// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
func (cb *callbacks) OnStreamClosed(streamID int64) {
log.Printf("OnStreamClosed: StreamID [%d]", streamID)
}
// OnStreamRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (cb *callbacks) OnStreamRequest(streamID int64, req *discovery.DiscoveryRequest) error {
log.Printf("OnStreamRequest: StreamID [%d]", streamID)
return nil
}
// OnStreamResponse is called immediately prior to sending a response on a stream.
func (cb *callbacks) OnStreamResponse(streamID int64, req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
log.Printf("OnStreamResponse: StreamID [%d]", streamID)
}
以上、すごくテスト的なコードですが、Envoy が xDS サーバーからエンドポイント情報を取得してリクエストを転送してくれるようになりました。