LoginSignup
7
5

More than 3 years have passed since last update.

Envoy 初心者が xDS の EDS サーバー実装でハマったこと

Last updated at Posted at 2020-12-31

この記事について

Envoy 初心者が勉強用に xDS サーバーを実装してみてハマったことをまとめてみました(xDS や EDS が何かについてはよくまとまったサイトがたくさんあるので省きます)。

実装したサーバー以下のようなものです。

  • EDS(Endpoint Discovery Service)のみ実装
  • Envoy の v3 API を使用
  • gRPC を使用
  • 言語は Go(1.15.6) を使用

全体像

よくある基本的な構成です。Envoy が、EDS を実装した xDS サーバーからクラスターのエンドポイント情報を取得します。
なお、curl 以外は全てコンテナとして同一 docker ネットワーク上で起動させてます。

overview.png

参考にしたサイト

各コンポの実装

Envoy と xDS サーバーの部分だけ記載します。

Envoy

Envoy の設定ファイルでは、クラスターのエンドポイントを xDS サーバーから取得するようにしてます。該当箇所にはコメントを入れてます。

ハマったところ:

  • 最初、eds_cluster_config 配下に resource_api_version: V3transport_api_version: V3 を付けなかったところ、xDS サーバーからエンドポイント情報が取得できなかった。
    • その場合、Envoy は xDS サーバーに対して v2 API を呼び出すが、xDS サーバーは v3 API を使うよう実装したので gRPC で unknown service envoy.api.v2.EndpointDiscoveryService というエラーが返されていた。
/etc/envoy/envoy.yaml
node:
  cluster: mycluster
  id: node1

admin:
  access_log_path: /tmp/admin_access.log
  address:
    socket_address: { address: 0.0.0.0, port_value: 9901 }

static_resources:
  listeners:
  - name: listener_0
    address:
      socket_address: { address: 0.0.0.0, port_value: 10000 }
    filter_chains:
    - filters:
      - name: envoy.filters.network.http_connection_manager
        typed_config:
          "@type": type.googleapis.com/envoy.extensions.filters.network.http_connection_manager.v3.HttpConnectionManager
          stat_prefix: ingress_http
          codec_type: AUTO
          route_config:
            name: local_route
            virtual_hosts:
            - name: local_service
              domains: ["*"]
              routes:
              - match: { prefix: "/" }
                route: { cluster: target_cluster }
          http_filters:
          - name: envoy.filters.http.router

  clusters:
  - name: target_cluster
    connect_timeout: 10s
    type: EDS
    eds_cluster_config:
      # service_name: target_services
      eds_config:
        resource_api_version: V3     # v3 API の xDS リソースを使う
        api_config_source:           # xDSサーバーからエンドポイントを取得
          api_type: GRPC             # GRPC を使用(他にはRESTも指定可能)
          transport_api_version: V3  # V3 API の xDS トランスポートプロトコルを使う
          grpc_services: 
          - envoy_grpc: 
              cluster_name: xds_cluster # xDS サーバーもクラスターとして指定
          refresh_delay: 5s
  - name: xds_cluster
    connect_timeout: 10s
    type: STRICT_DNS # xDS サーバーを名前で指定してるので STRICT_DNS か LOGICAL_DNS が必要
    http2_protocol_options: {}
    load_assignment:
      cluster_name: xds_cluster
      endpoints:
      - lb_endpoints:
        - endpoint:
            address:
              socket_address:
                address: xds_server # ここでは xDS サーバーをホスト名で指定した(IPでもよい)
                port_value: 20000  

xDS サーバー

大枠の仕組みとやることは以下の2つのようです。

  • キャッシュとスナップショットの生成・登録
    • クラスターのエンドポイント情報は、あらかじめ xDS サーバーのキャッシュにスナップショットとして登録しておく
    • エンドポイント情報に更新があると、スナップショットのバージョンを更新して再度キャッシュに登録 する
  • コールバック関数の用意と登録
    • Envoy からの API 呼び出しがあると、xDS サーバーのフレームワークがスナップショットを自動で返してくれる
    • なお、フレームワークはAPI 呼び出しの受信時やレスポンス送信の直前に開発者が登録したコールバック関数を呼んでくれる

今回はこんなファイル構成にしてます。

└ src
    ├── callbacks.go
    ├── go.mod
    ├── go.sum
    ├── main.go
    └── snapshot.go

main.go の実装

キャッシュの生成とスナップショットの登録、xDS サーバーの開始を行ってます。

ハマったところ:

  • NewSnapshotCache() に渡してる IDHash は Envoy のノードID(この場合はnode1)を返す。この値と SetSnapshot()の第一引数を合わせる必要があるが、当初 Envoy のクラスター名を含めて SetSnapshot("mycluster/node1", ss) としてしまった。その結果、Envoy からの API 呼び出しに対して応答が返らなかった。
    • もちろん IDHash 相当を自前で用意して独自のノードIDを返してもよいが、同じように SetSnapshot()の第一引数と合わせる必要がある。
package main

import (
    "context"
    "log"
    "net"

    "google.golang.org/grpc"

    v3service "github.com/envoyproxy/go-control-plane/envoy/service/endpoint/v3"
    v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
    v3server "github.com/envoyproxy/go-control-plane/pkg/server/v3"
    xdslog "github.com/envoyproxy/go-control-plane/pkg/log"
)

const (
    serverAddr = ":20000"
)

func main() {
    var xdsLogger = xdslog.LoggerFuncs{
        DebugFunc: log.Printf,
        InfoFunc:  log.Printf,
        WarnFunc:  log.Printf,
        ErrorFunc: log.Printf,
    }

    // Envoy へのレスポンスを保存しておくキャッシュを生成する。
    // IDHash は単純にノードのID(この場合は "node1")を返す。
    cache := v3cache.NewSnapshotCache(true, v3cache.IDHash{}, xdsLogger)

    // スナップショットを作成してキャッシュに登録。
    // IDHash が返すノードのID(この場合は "node1")をキーにして登録。
    ss := getSnapshot()
    if err := cache.SetSnapshot("node1", ss); err != nil {
        log.Fatalf("SetSnapshot error : %v", err)
    }

    // 以下、xDS サーバーの生成と gRPC サーバーの開始
    xdsServer := v3server.NewServer(context.Background(), cache, &callbacks{})

    lis, err := net.Listen("tcp", serverAddr)
    if err != nil {
        log.Fatalf("Listen error : %v", err)
    }

    grpcServer := grpc.NewServer()
    v3service.RegisterEndpointDiscoveryServiceServer(grpcServer, xdsServer)

    if err := grpcServer.Serve(lis); err != nil {
        log.Fatalf("Failed to serve GRPC server : %v", err)
    }
}

snapshot.go の実装

テスト用にハードコードしたエンドポイント情報を元にスナップショットを作成。

ハマったところ:

  • ClusterLoadAssignment 構造体のネストが凄く深いので、自分は今どの階層にいるんだっけ?となる
  • ClusterLoadAssignment には ClusterName というメンバーがあるが、これには Envoy が属するクラスター名(例: "mycluster") ではなく、xDS サーバーが属するクラスター名(例: "target_cluster") を指定する必要がある。
    • envoy.yaml で service_name が記載されてる場合は、その名前を指定する必要がある(例: "target_services")
package main

import (
    "log"

    v3core "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
    v3endpoint "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"
    "github.com/envoyproxy/go-control-plane/pkg/cache/types"
    v3cache "github.com/envoyproxy/go-control-plane/pkg/cache/v3"
)

type upstreamAddr struct {
    address string
    port    uint32
}

// エンドポイントの情報(キーは xDS サーバーが属するクラスター名)。
// あくまでテスト用なのでハードコードしてる。
var upstreamClusters = map[string][]*upstreamAddr{
    // クラスター名は envoy 自身が属してるクラスター名(この場合の "mycluster")ではなく、
    // xDS サーバーが属してる名前(この場合は "target_cluster") を指定する。
    "target_cluster": {
        {address: "192.168.158.197", port: 10001},
        {address: "192.168.158.197", port: 10002},
    },
}

// 指定されたクラスターの EDS リソースを取得
func getEdsResourceForCluster(clusterName string) *v3endpoint.ClusterLoadAssignment {

    upstreams, ok := upstreamClusters[clusterName]
    if !ok {
        log.Printf("Cluster not found : %s", clusterName)
        return nil
    }

    lbEndpoints := []*v3endpoint.LbEndpoint{}

    for _, u := range upstreams {
        addr := &v3core.Address{
            Address: &v3core.Address_SocketAddress{
                SocketAddress: &v3core.SocketAddress{
                    Protocol: v3core.SocketAddress_TCP,
                    Address:  u.address,
                    PortSpecifier: &v3core.SocketAddress_PortValue{
                        PortValue: u.port,
                    },
                },
            },
        }

        lep := &v3endpoint.LbEndpoint{
            HostIdentifier: &v3endpoint.LbEndpoint_Endpoint{
                Endpoint: &v3endpoint.Endpoint{
                    Address: addr,
                },
            },
        }

        lbEndpoints = append(lbEndpoints, lep)
    }

    assignment := &v3endpoint.ClusterLoadAssignment{
        ClusterName: clusterName,
        Endpoints: []*v3endpoint.LocalityLbEndpoints{
            {LbEndpoints: lbEndpoints},
        },
    }

    return assignment
}

// EDS のリソースを取得
func getEdsResource() []types.Resource {
    resources := []types.Resource{}

    for clusterName := range upstreamClusters {
        r := getEdsResourceForCluster(clusterName)
        if r != nil {
            resources = append(resources, r)
        }
    }

    return resources
}

// スナップショットを生成
func getSnapshot() v3cache.Snapshot {

    // EDS のリソースを取得
    eds := getEdsResource()
    log.Printf("EDS : %+v", eds)

    // 各リソースからスナップショットを生成
    return v3cache.NewSnapshot(
        "1", // version ※ 1で固定してるが、本当はスナップショットを更新するたびに上げる。
        eds, // endpoints
        nil, // clusters
        nil, // routes
        nil, // listeners
        nil, // runtimes
        nil, // secrets
    )
}

callbacks.go の実装

Envoy が xDS サーバーの API を呼ぶと OnStreamRequest, OnStreamResponse などが呼ばれるが、ログを出してるだけで何もしてない。

package main

import (
    "context"
    discovery "github.com/envoyproxy/go-control-plane/envoy/service/discovery/v3"
    "log"
)

type callbacks struct{}

// --------------------------------------------------------------
// REST-JSON part of XDS server
// --------------------------------------------------------------

// OnFetchRequest is called for each Fetch request. Returning an error will end processing of the
// request and respond with an error.
func (cb *callbacks) OnFetchRequest(ctx context.Context, req *discovery.DiscoveryRequest) error {
    log.Printf("OnFetchRequest")
    return nil
}

// OnFetchResponse is called immediately prior to sending a response.
func (cb *callbacks) OnFetchResponse(req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
    log.Printf("OnFetchResponse")
    return
}

// --------------------------------------------------------------
// GRPC SoTW (State of The World) part of XDS server
// --------------------------------------------------------------

// OnStreamOpen is called once an xDS stream is open with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (cb *callbacks) OnStreamOpen(ctx context.Context, streamID int64, typeURL string) error {
    log.Printf("OnStreamOpen: StreamID [%d], Type URL [%s]", streamID, typeURL)
    return nil
}

// OnStreamClosed is called immediately prior to closing an xDS stream with a stream ID.
func (cb *callbacks) OnStreamClosed(streamID int64) {
    log.Printf("OnStreamClosed: StreamID [%d]", streamID)
}

// OnStreamRequest is called once a request is received on a stream.
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
func (cb *callbacks) OnStreamRequest(streamID int64, req *discovery.DiscoveryRequest) error {
    log.Printf("OnStreamRequest: StreamID [%d]", streamID)
    return nil
}

// OnStreamResponse is called immediately prior to sending a response on a stream.
func (cb *callbacks) OnStreamResponse(streamID int64, req *discovery.DiscoveryRequest, resp *discovery.DiscoveryResponse) {
    log.Printf("OnStreamResponse: StreamID [%d]", streamID)
}

以上、すごくテスト的なコードですが、Envoy が xDS サーバーからエンドポイント情報を取得してリクエストを転送してくれるようになりました。

7
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5