LoginSignup
11
0

More than 1 year has passed since last update.

OpenTelemetryからStackdriverにメトリクスを送る方法

Last updated at Posted at 2022-12-15

はじめに

本記事はQualiArts Advent Calendar 2022 15日目の記事です。
GolangでOpenTelemetryを使ってGCPのCompute(今回CloudRunでの検証)からTraceとMetricを転送する方法についての記事になります。
OpenTelemetryは、Trace機能はStableになったもののMetric機能に関してはまだAlpha段階( https://opentelemetry.io/docs/instrumentation/go/ )ですので、今後この記事の内容とは大きく変更される可能性があります。そのあたりはご了承ください。

OpenTelemetryとOpenCensus

これに関してはググればおそらくたくさん出てくるので多くは書きませんが、Google社内で作られたCensusというライブラリが一般公開されたのがOpenCensusで、一方CNCFでもOpenTracingというものがあり、その2つが統合されて出来上がったのがOpenTelemetryという認識です。自分はOpenCensusしか使ってこなかったのでOpenTracingは詳しくないですが、つまりはOpenCensusはOpenTelemetryとして生まれ変わったなーぐらいに思ってます。もうopencensus-goもあまり更新されていません。
簡単に言うと今後はOpenTelemetryを使おうってことです。(まだMetric機能はAlpha段階です)

Stackdriver

TraceやMetricsをモニタリングできるサービスです。ただ現在はStackdriverというのは正式名称ではないです。GoogleがStackdriverを買収してしばらくはサービス名もStackdriverのままだったのですが、現在はGoogle Cloud のオペレーション スイート(旧称 Stackdriver)という表現をしています。とはいえライブラリ内で使われている変数名だったり検索のしやすさだったりで未だにStackdriverと言われているので、この記事でもStackdriverという表記をしています。(というか正式名称がよくわかってない。OperationSuite?)

TraceとMetric

OpenTelemetryで現段階で扱うことができる機能はTraceとMetricの2つです。この2つを分けて考えたほうが理解が早いと思います。自分はDatadogでOpenCensusのTracerを使ってメトリクスを送っていた記憶があってごっちゃに覚えてしまったせいで理解が遅れました。
この辺もググれば出てくると思うので多くは書きません(他力本願)が簡単に書くと、Traceは一連の処理時間を複数のSpanで管理してどこで時間がかかっているのかを特定するのに使うもので、MetricはCPU使用率だったりリクエスト時間だったりの様々なメトリクスを時間と関連して統計したデータです。

まぁ厳密にはTraceデータでも時系列と組み合わせてグラフ表示することあるので覚え方としては微妙かもですが。それはたぶんTraceの経過時間をMetricとして捉えているのでその時点でTraceではなくなっていると思ってます。Traceはあくまで各Spanがどう時間かかっているを見ることができる機能だと思っていますので。

Trace

Traceに関してはCloudTraceのドキュメントでもしばらくβ版扱いになっていたのですが今年になってそれが取れて正式にGAされたようです。したがってドキュメントを見るのが一番手っ取り早いです。このドキュメントに書いてないことをここでは書いていきます。

CloudLoadBalancingのSpanをRootSpanとして引き継ぐ

GCPでサービスを運用するとGlobalLBやデフォルトでLB機能があるサービス(GAEやCloudRunなど)はX-Cloud-Trace-Contextというヘッダ名でリクエストヘッダにSpanコンテキストが添付されます。それをRootSpanとすることでCloudTrace上でLBのSpanとアプリのSpanを一連のTraceとしてみることができます。(たとえばアプリケーション上ではSpanがめちゃくちゃ短かったとしてもLBとアプリケーションの間で何らかの問題があってLBのSpanが長いってことがわかりやすくなります。そんなケース起きたことないんですけども。)これはCloudLoggingでも利用できる小技でLBのアクセスログとアプリケーションのログをTraceIDで紐付けることができますが今回は割愛します(ヘッダ名でググれば出ると思います。)
これをやろうとしたとき、OpenCensusではhttpとgrpcでそれぞれ実装をしなくては行けなかったのですが(特にgrpcは公式サポートがないので完全に自分でコード書くことになります。httpの真似るだけだけど)、OpenTelemetryでは公式サポートされています。ドキュメントはこちらです。

OpenCensus Bridge

spanner等のGoSDKはOpenCensusでTraceが出力されているため自サービスのコード部分だけをOpenTelemetryに切り替えると今まで出ていたSpanner等のTraceが出力されなくなってしまいます。その辺りをうまいことやってくれるのがBridge機能です。ドキュメントはこちらです。一部手を加えているのでサンプルコード載せておきます。(CloudTraceの初期セットアップの続きだと思ってください)

import (
	octrace "go.opencensus.io/trace"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/bridge/opencensus"
)

func main() {
	...

	tracer := otel.GetTracerProvider().Tracer("example.com/trace")

	octrace.DefaultTracer = &wrapTracer{Tracer: opencensus.NewTracer(tracer)}

	...
}

type wrapTracer struct {
	octrace.Tracer
}

func (t *wrapTracer) StartSpan(ctx context.Context, name string, s ...octrace.StartOption) (context.Context, *octrace.Span) {
	var ocOpts octrace.StartOptions
	for _, fn := range s {
		fn(&ocOpts)
	}
	// ocOpts.Samplerをbridgeに渡すとエラーログを吐きながらspan開始する
	// spanner.pingというspanがSampler.Neverで開始しようとするためエラーログは出るしNeverなのにspanも開始されてしまうためここで早期returnする
	if ocOpts.Sampler != nil {
		return ctx, nil
	}
	return t.Tracer.StartSpan(ctx, name, s...)
}

この調査をしたのが2022年10月頃なのでコード上のコメントのようなバグ?はもう直っているかもです。こういうとき簡単にWrapできるGoのInterfaceは便利だなーと思います。GoじゃなくてもInterfaceぐらいありますけども。丁寧に書くのであればOpenCensusのSamplerをどうにかしてBridgeでも認識できるように修正すればいいのかもしれませんが、そのときはNeverぐらいしか飛んできてなかったので横着してます。

Metric

Metricに関してはCloud Monitoring側のドキュメントには一切記述がありません。ライブラリ自体もAlpha段階なので仕方ありません。OpenCensusに関してのドキュメントはこちらです。
ちなみに以下はCloudRun環境用のコードになりますがCloudRunはそもそもカスタムメトリクス自体サポートされていません。Stackdriverには決められたリソースタイプでメトリクスを送らないといけないのですが、対応されてないのでgeneric_nodeというリソースタイプで送ることになります。この辺りはOpenCensusではCloudRunをサポートされていなかったのですがOpenTelemetryではサポートしそうな気配?はあります。

import (
	"context"
	"os"

	"cloud.google.com/go/compute/metadata"
	mexporter "github.com/GoogleCloudPlatform/opentelemetry-operations-go/exporter/metric"
	"go.opentelemetry.io/contrib/detectors/gcp"
	"go.opentelemetry.io/otel/metric/global"
	"go.opentelemetry.io/otel/sdk/metric"
	"go.opentelemetry.io/otel/sdk/resource"
	semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
)

func main() {
	...

	exporter, err := mexporter.New(
		// mexporter.WithContext(ctx) TODO: ctx指定手段が現状無い
		mexporter.WithProjectID(projectID),
	)
	if err != nil { return }

	var hostID string
	if metadata.OnGCE() {
		hostID, _ = metadata.InstanceID()
	}

	res, err := resource.New(ctx,
		// Use the GCP resource detector to detect information about the GCP platform
		resource.WithDetectors(gcp.NewDetector()),
		// Keep the default detectors
		resource.WithTelemetrySDK(),
		// Add your own custom attributes to identify your application
		resource.WithAttributes(
			semconv.ServiceNameKey.String(serviceName),
			semconv.ServiceVersionKey.String(serviceVersion),
			// generic_nodeに必要なリソース
			// @see https://github.com/GoogleCloudPlatform/opentelemetry-operations-go/blob/cf2959b4fd9539e9404aee0a18d506331007b826/internal/resourcemapping/resourcemapping.go#L119-L129
			semconv.ServiceNamespaceKey.String(os.Getenv("K_SERVICE")),
			semconv.HostIDKey.String(hostID),
		),
	)
	if err != nil { return }

	// initialize a MeterProvider with that periodically exports to the GCP exporter.
	mt := metric.NewMeterProvider(
		metric.WithReader(metric.NewPeriodicReader(exporter)),
		metric.WithResource(res),
	)
	defer mt.ShutDown(ctx)

	global.SetMeterProvider(mt)
	// TODO: ライブラリ側のバグで利用できない
	// view.RegisterExporter(opencensus.NewMetricExporter(mt))

	...
}

いろいろ不穏なコメントを残してますがとりあえず現在はこんな感じのコードで動きました。保証はしません。基本的にはtrace側を真似て書いてます。
gcp.NewDetectorの中でGCEで動いているのかGKEで動いているのかCloudRunで動いているのかなどの判定がされています。

grpc対応

上記は初期セットアップに過ぎないので今度は実際にMetricを集計する部分をgrpcで実装してみました。基本的にググってもgrpcのサンプルコードは見当たらなかったのでmeter部分はhttpのものを参考に実装しています。

import (
	"context"
	"fmt"
	"net"
	"strings"
	"time"

	"go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/metric"
	"go.opentelemetry.io/otel/metric/global"
	"go.opentelemetry.io/otel/metric/instrument"
	"go.opentelemetry.io/otel/metric/instrument/syncfloat64"
	"go.opentelemetry.io/otel/metric/instrument/syncint64"
	"go.opentelemetry.io/otel/metric/unit"
	semconv "go.opentelemetry.io/otel/semconv/v1.7.0"
	"google.golang.org/grpc"
	"google.golang.org/grpc/status"
	"google.golang.org/protobuf/proto"
)

const (
	clientRequestCount          = "grpc.client.request_count"
	clientRequestContentLength  = "grpc.client.request_content_length"
	clientResponseContentLength = "grpc.client.response_content_length"
	clientLatency               = "grpc.client.duration"
	serverRequestCount          = "grpc.server.request_count"
	serverRequestContentLength  = "grpc.server.request_content_length"
	serverResponseContentLength = "grpc.server.response_content_length"
	serverLatency               = "grpc.server.duration"
)

var (
	clientMeasures = &measures{
		counterMap: map[string][]instrument.Option{
			clientRequestCount:          {instrument.WithDescription("Outgoing request count total"), instrument.WithUnit(unit.Dimensionless)},
			clientRequestContentLength:  {instrument.WithDescription("Outgoing request bytes total"), instrument.WithUnit(unit.Bytes)},
			clientResponseContentLength: {instrument.WithDescription("Outgoing response bytes total"), instrument.WithUnit(unit.Bytes)},
		},
		histogramMap: map[string][]instrument.Option{
			clientLatency: {instrument.WithDescription("Outgoing end to end duration, microseconds"), instrument.WithUnit(unit.Milliseconds)},
		},
	}
	serverMeasures = &measures{
		counterMap: map[string][]instrument.Option{
			serverRequestCount:          {instrument.WithDescription("Incoming request count total"), instrument.WithUnit(unit.Dimensionless)},
			serverRequestContentLength:  {instrument.WithDescription("Incoming request bytes total"), instrument.WithUnit(unit.Bytes)},
			serverResponseContentLength: {instrument.WithDescription("Incoming response bytes total"), instrument.WithUnit(unit.Bytes)},
		},
		histogramMap: map[string][]instrument.Option{
			serverLatency: {instrument.WithDescription("Incoming end to end duration, microseconds"), instrument.WithUnit(unit.Milliseconds)},
		},
	}
)

type measures struct {
	counterMap   map[string][]instrument.Option
	histogramMap map[string][]instrument.Option
}

func UnaryClientInterceptor() grpc.UnaryClientInterceptor {
	counters, valueRecorders := newMeasures(clientMeasures)
	return func(
		ctx context.Context,
		method string,
		req, reply interface{},
		cc *grpc.ClientConn,
		invoker grpc.UnaryInvoker,
		callOpts ...grpc.CallOption,
	) error {
		requestStartTime := time.Now()
		attrs := []attribute.KeyValue{otelgrpc.RPCSystemGRPC}
		_, mAttrs := parseFullMethod(method)
		attrs = append(attrs, mAttrs...)
		attrs = append(attrs, peerAttr(cc.Target())...)

		err := invoker(ctx, method, req, reply, cc, callOpts...)
		serr, _ := status.FromError(err)
		attrs = append(attrs, otelgrpc.GRPCStatusCodeKey.Int64(int64(serr.Code())))

		counters[clientRequestCount].Add(ctx, 1, attrs...)

		if size, ok := getContentSize(req); ok {
			counters[clientRequestContentLength].Add(ctx, size, attrs...)
		}
		if size, ok := getContentSize(reply); ok {
			counters[clientResponseContentLength].Add(ctx, size, attrs...)
		}

		// Use floating point division here for higher precision (instead of Millisecond method).
		elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)
		valueRecorders[clientLatency].Record(ctx, elapsedTime, attrs...)

		return err
	}
}

func UnaryServerInterceptor() grpc.UnaryServerInterceptor {
	counters, valueRecorders := newMeasures(serverMeasures)
	return func(
		ctx context.Context,
		req interface{},
		info *grpc.UnaryServerInfo,
		handler grpc.UnaryHandler,
	) (interface{}, error) {
		requestStartTime := time.Now()
		attrs := []attribute.KeyValue{otelgrpc.RPCSystemGRPC}
		_, mAttrs := parseFullMethod(info.FullMethod)
		attrs = append(attrs, mAttrs...)

		res, err := handler(ctx, req)

		serr, _ := status.FromError(err)
		attrs = append(attrs, otelgrpc.GRPCStatusCodeKey.Int64(int64(serr.Code())))

		counters[serverRequestCount].Add(ctx, 1, attrs...)

		if size, ok := getContentSize(req); ok {
			counters[serverRequestContentLength].Add(ctx, size, attrs...)
		}
		if size, ok := getContentSize(res); ok {
			counters[serverResponseContentLength].Add(ctx, size, attrs...)
		}

		// Use floating point division here for higher precision (instead of Millisecond method).
		elapsedTime := float64(time.Since(requestStartTime)) / float64(time.Millisecond)
		valueRecorders[serverLatency].Record(ctx, elapsedTime, attrs...)

		return res, err
	}
}

func newMeter() metric.Meter {
	return global.MeterProvider().Meter("example.com/metric")
}

func newMeasures(m *measures) (counters map[string]syncint64.Counter, histograms map[string]syncfloat64.Histogram) {
	ctx := context.Background()
	meter := newMeter()
	counters = make(map[string]syncint64.Counter, len(m.counterMap))
	histograms = make(map[string]syncfloat64.Histogram, len(m.histogramMap))

	for name, opts := range m.counterMap {
		counter, err := meter.SyncInt64().Counter(name, opts...)
		if err != nil {
			fmt.Println(err)
		}
		counters[name] = counter
	}
	for name, opts := range m.histogramMap {
		histogram, err := meter.SyncFloat64().Histogram(name, opts...)
		if err != nil {
			fmt.Println(err)
		}
		histograms[name] = histogram
	}

	return counters, histograms
}

// parseFullMethod returns a span name following the OpenTelemetry semantic
// conventions as well as all applicable span attribute.KeyValue attributes based
// on a gRPC's FullMethod.
// https://github.com/open-telemetry/opentelemetry-go-contrib/blob/v1.11.0/instrumentation/google.golang.org/grpc/otelgrpc/internal/parse.go
func parseFullMethod(fullMethod string) (string, []attribute.KeyValue) {
	name := strings.TrimLeft(fullMethod, "/")
	parts := strings.SplitN(name, "/", 2)
	if len(parts) != 2 {
		// Invalid format, does not follow `/package.service/method`.
		return name, []attribute.KeyValue(nil)
	}

	var attrs []attribute.KeyValue
	if service := parts[0]; service != "" {
		attrs = append(attrs, semconv.RPCServiceKey.String(service))
	}
	if method := parts[1]; method != "" {
		attrs = append(attrs, semconv.RPCMethodKey.String(method))
	}
	return name, attrs
}

// https://github.com/open-telemetry/opentelemetry-go-contrib/blob/v1.11.0/instrumentation/google.golang.org/grpc/otelgrpc/interceptor.go#L456
// peerAttr returns attributes about the peer address.
func peerAttr(addr string) []attribute.KeyValue {
	host, port, err := net.SplitHostPort(addr)
	if err != nil {
		return []attribute.KeyValue(nil)
	}

	if host == "" {
		host = "127.0.0.1"
	}

	return []attribute.KeyValue{
		semconv.NetPeerIPKey.String(host),
		semconv.NetPeerPortKey.String(port),
	}
}

func getContentSize(msg interface{}) (int64, bool) {
	p, ok := msg.(proto.Message)
	if !ok {
		return 0, false
	}
	return int64(proto.Size(p)), true
}

grpc-clinetとgrpc-server用のInterceptorです。参考にしたコードはコメントのURLの箇所です。otelgrpcは現段階だとmeterが入っていないのとwrapがしづらいので大部分をコピってきています。issueはあるので将来的にはこの辺のコードは全部消えてSDK利用するだけになりそうです。(ここのコードは実験的に実装したのでPR投げれるレベルのものではないです。)

まとめ

現段階ではTraceだけの利用に関してはOpenTelemetryに移行して大丈夫です。Bridge機能も用意されているのでおそらく既存機能が多かったとしても困らないと思います(パフォーマンス面は把握していません)Metricも利用する場合は現段階のOpenTelemetryではまだまだ対応されていないものが多く、もう少し様子見たほうがいいと思います。

11
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
11
0