0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

OpenTelemetryAdvent Calendar 2023

Day 14

k8s上のgRPCサーバー間の分散トレーシングの話

Last updated at Posted at 2023-12-14

こんにちは、都内の会社でバックエンドやインフラのエンジニアをやっているくぼたです.
この記事は OpenTelemetry Advent Calendar 2023 14日目の記事になります。

Google Kubernetes Engine上に複数のgRPCサーバーからなるマイクロサービスを構築した際のOpenTelemetryを用いた分散トレーシングの方法に関して、今回は簡単に解説させていただければと思います.
なお、言語はGoを用いています.

記事の流れは以下のとおりです.

  1. 今回のデモアプリに関する簡単な解説
  2. localでgRPC通信のトレース情報をzipkinにを流す
  3. GKEにアプリケーションをデプロイし、そのtrace情報をcloud traceに流す

また、この記事の中では詳しく説明しませんが、skaffoldという開発ツールを用いてlocal、Google Cloudの双方に対するk8sのリソースのデプロイを行っています.GCPとの相性がよく、試験的にGKE上にリソースをデプロイする場合には非常に使えるツールなので、是非調べてみてください.

デモアプリについて

今回のデモアプリは、uidサービスと、taskサービスからなります.
OpenTelemetryの実装が入る前のコードは以下のレポジトリのqiita/no-otelブランチにあります.
起動方法に関しては、レポジトリのREADMEを参照してください.

taskサービス、uidサービスはともにgRPCサーバーとして起動し、taskサービスの中からuidサービスに対してgRPCリクエストを送る形になっています.
また、localでの分散トレーシングの検証用のzipkinサービスも立ち上がります.
zipkinはOSSの分散トレーシングシステムです.

image.png

localで分散トレーシングする

OpenTelemetryを用いてトレースを計装する方法は大きく二つあります.

  1. 各サービスの中にExporterを実装し、各サービスからzipkin等のビジュアライザに直接情報を送る
  2. Collectorを用意し、各サービスからCollectorにトレース情報を流し、Collectorがzipkin等のツールに情報を送る

今回は、一旦1の方法で、Collectorを用いないパターンでの実装を行います.
まず、zipkinにトレースを流すためのexporterを作成します.
なお実装においてはGitHubのopentelemetry-goのexapmleにあるzipkinの実装が参考になります.

まず、tracer.goとして

package tracer

import (
	"context"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/trace"
	"log"
	"os"
)

func TraceSetting(service string) (trace.Tracer, func(context.Context) error) {
	tr := otel.GetTracerProvider().Tracer(service)
	otel.SetTextMapPropagator(
		propagation.NewCompositeTextMapPropagator(
			propagation.TraceContext{},
			propagation.Baggage{},
		),
	)

	var shutdown func(context.Context) error
	var err error
    shutdown, err = zipkinTracer(service)
	if err != nil {
		log.Fatal(err)
	}
	return tr, shutdown
}

zipkin.goとして

package tracer

import (
	"context"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/propagation"
	"go.opentelemetry.io/otel/trace"
	"log"
)

func TraceSetting(service string) (trace.Tracer, func(context.Context) error) {
	tr := otel.GetTracerProvider().Tracer(service)
	otel.SetTextMapPropagator(
		propagation.NewCompositeTextMapPropagator(
			propagation.TraceContext{},
			propagation.Baggage{},
		),
	)

	var shutdown func(context.Context) error
	var err error
	shutdown, err = zipkinTracer(service)
	if err != nil {
		log.Fatal(err)
	}
	return tr, shutdown
}

のようにして、Tracerを手にいれる関数を宣言します.
そして、そのtracerをgRPCサーバーの中で呼び出します.

func main() {
	port := 5051
	listener, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
	if err != nil {
		panic(err)
	}

+	ctx := context.Background()
+
+	//tracerを呼び出す
+	tr, shutdown := tracer.TraceSetting("uid")
+	defer func() {
+		if err = shutdown(ctx); err != nil {
+			log.Fatal("failed to shutdown TracerProvider: %w", err)
+		}
+	}()
+	_, span := tr.Start(ctx, "run", trace.WithSpanKind(trace.SpanKindServer))
+	defer span.End()

	s := grpc.NewServer()
	h, err := handler.NewService()
	api.RegisterUIDServiceServer(s, h)

	healthSrv := health.NewServer()
	healthpb.RegisterHealthServer(s, healthSrv)
	healthSrv.SetServingStatus("uid", healthpb.HealthCheckResponse_SERVING)

	reflection.Register(s)

	go func() {
		log.Printf("start gRPC server port: %v", port)
		err := s.Serve(listener)
		if err != nil {
			return
		}
	}()

	quit := make(chan os.Signal, 1)
	signal.Notify(quit, os.Interrupt)
	<-quit
	log.Println("stopping gRPC server...")
	s.GracefulStop()
}

次に、gRPC serverにoptionの設定を行います.

~~~~~~~~
- s := grpc.NewServer()
+ s := grpc.NewServer(
+		grpc.StatsHandler(otelgrpc.NewServerHandler()),
+	)
~~~~~~~~

まず、この設定をtaskサービス, uidサービスの双方に対して行います.
次に、taskサービスの中のuidサービスのgRPC Clientを呼び出してる部分を以下のように書き換えます.

	conn, err := grpc.Dial(
		address,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
+		grpc.WithStatsHandler(otelgrpc.NewClientHandler()),
	)

これで設定は終わりです.
この状態のブランチをqiita/otel-zipkin-with-healthcheckとして切っているので、細かい部分はこちらを参照してください.

この状態で各サービスを動かし、localで起動しているzipkinを見に行くと以下のようになっています.
spanが無事取れているのが確認できますね.

image.png

taskサービスからuidサービスが呼び出されている様子もこのように確認できます.

image.png

GKEで分散トレーシングする

gkeでの分散トレーシングにおいて必要なことは次の2つです.

  1. cloud trace用のexporterを作成する
  2. Workload Identityを用いてk8sのサービスアカウントからcloud traceに接続できるようにする

cloud trace用のexporterを用意する

以下のようにcloud trace用のexporterを用意します.

package tracer

import (
	"context"
	texporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/trace"
	"go.opentelemetry.io/contrib/detectors/gcp"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/sdk/resource"
	"log"
	"os"

	sdktrace "go.opentelemetry.io/otel/sdk/trace"
	semconv "go.opentelemetry.io/otel/semconv/v1.12.0"
)

func googleTracer(service string) (func(context.Context) error, error) {
	ctx := context.Background()
	projectID := os.Getenv("GOOGLE_CLOUD_PROJECT")
	exporter, err := texporter.New(texporter.WithProjectID(projectID))
	if err != nil {
		log.Fatalf("texporter.New: %v", err)
	}

	// Identify your application using resource detection
	res, err := resource.New(ctx,
		// Use the GCP resource detector to detect information about the GCP platform
		resource.WithDetectors(gcp.NewDetector()),
		// Keep the default detectors
		resource.WithTelemetrySDK(),
		// Add your own custom attributes to identify your application
		resource.WithAttributes(
			semconv.ServiceNameKey.String(service),
		),
	)
	if err != nil {
		return nil, err
	}
	traceRatio := 1.0
	tp := sdktrace.NewTracerProvider(
		sdktrace.WithSampler(sdktrace.TraceIDRatioBased(traceRatio)),
		sdktrace.WithBatcher(exporter),
		sdktrace.WithResource(res),
	)

	//defer tp.ForceFlush(ctx) // flushes any pending spans
	otel.SetTracerProvider(tp)
	return tp.Shutdown, nil
}

workload identityの設定

workload identityを用いることで、k8sのサービスアカウントに、GCPの各種リソースへのアクセス権を付与でき、GKEにデプロイしたpodからGCP上の各種リソースにアクセスできるようになります.
付与する際には次の手順で行います.

  1. GCPのサービスアカウントを作成する
    gcloud iam service-accounts create $GSA --project $PROJECT_ID
    
  2. サービスアカウントにロールを紐づける(今回はroles/cloudtrace.agentを付与します)
    gcloud projects add-iam-policy-binding $PROJECT_ID --member "serviceAccount:$GSA@${PROJECT_ID}.iam.gserviceaccount.com" --role roles/cloudtrace.agent
    
  3. workload identityの設定を行う
    gcloud iam service-accounts add-iam-policy-binding --role roles/iam.workloadIdentityUser --member "serviceAccount:${PROJECT_ID}.svc.id.goog[${NAMESPACE}/${K8S_SERVICE_ACCOUNT}]" $GSA@$PROJECT_ID.iam.gserviceaccount.com
    
  4. k8sのサービスアカウントをデプロイする
    kubectl create serviceaccount $K8S_SERVICE_ACCOUNT -n $NAMESPACE
    

です.これをuidサービス、taskサービスの双方に関して行います.

実際に見てみる

以上を行った上で、cloud code拡張機能を用いて、GKEにuidサービス、taskサービスをデプロイし、traceを見た様子が以下になります.
いい感じですね.

image.png

実装コードに関してはレポジトリのqiita/GCP-Traceブランチを参考にしてください.

質問など

また、現状の実装には一点微妙な点があります.
スクリーンショットの赤枠部分に注目してもらうと、healthcheckという名前のtraceが大量に存在することがわかります.これはreadinessProb, livenessProbeの際に行っている、gRPCのhealthcheckリクエストもzipkinやcloud traceに分散トレースの情報として送られてしまっていることを意味します.
スクリーンショット 2023-12-14 21.55.40.png

1年前くらいに触った際には、healthcheckをtraceから除外する方法などを用いて、healthcheckを除外できるようなのですが、otelgrpc.UnaryServerInterceptorが12月14日現在deprecatedになっているため、現在推奨される方法が現状よく分かってません.ちゃんと調べないといけないなぁと思っています.
ご存知の方いたら教えてくださいm(_ _)m

最後に

今回は投稿ギリギリになってしまいましたが、k8sでgRPCサーバーを動かしている際の分散トレーシングに関して、実装の流れを記事にしてみました.
実装においては、GitHubの既存実装などをみながらやれば、0から自分で書けるというレベル感に感じました.分散トレーシングは見てるだけでなんか楽しい気持ちになれますね.
是非皆さんもやってみてください.

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?