44
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

go-grpc-middlewareを一通り試してみる

Posted at

gRPCでは関数実行時に指定した処理をインターセプトすることができます。
golangでgRPCアプリを作成する際にこのインターセプトを利用するときに便利なミドルウェアが揃ってます。
それがgo-grpc-middlewareです。

今回はこのgo-grpc-middlewareの機能を一通り試してみたいと思います。

今回作ったサンプルアプリは以下になります。
https://github.com/morix1500/sample-go-grpc-middleware

準備

まずはgRPCサーバーを作りましょう。

proto/hello.proto
syntax = "proto3";

package hello;

message HelloRequest {
  string name = 1;
}

message HelloResponse {
  string message = 1;
}

service HelloService {
  rpc Hello(HelloRequest) returns (HelloResponse) {}
}

以下のコマンドでgoファイルを生成します。

docker run --rm -v $(pwd):$(pwd) \
-w $(pwd) znly/protoc:0.4.0 \
-I ./proto \
--go_out=plugins=grpc:./proto/ \
proto/hello.proto

サーバーコードをgolangで書きましょう。

main.go
package main

import (
        "context"
        pb "github.com/morix1500/sample-go-grpc-middleware/proto"
        "google.golang.org/grpc"
        "net"
)

type HelloService struct{}

func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
        return &pb.HelloResponse{
                Message: "Hello, " + in.Name,
        }, nil
}

func main() {
        s := grpc.NewServer()
        pb.RegisterHelloServiceServer(s, HelloService{})

        lis, err := net.Listen("tcp", ":5000")
        if err != nil {
                panic(err)
        }
        if err := s.Serve(lis); err != nil {
                panic(err)
        }
}

最後に起動してgrpcurlで動作確認します。

$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}

動作確認できました。

試してみる

この記事の執筆時点でgo-grpc-middlewareでは以下のようなものが提供されています。

  • grpc_auth - a customizable (via AuthFunc) piece of auth middleware
  • grpc_ctxtags - a library that adds a Tag map to context, with data populated from request body
  • grpc_zap - integration of zap logging library into gRPC handlers.
  • grpc_logrus - integration of logrus logging library into gRPC handlers.
  • grpc_prometheus - Prometheus client-side and server-side monitoring middleware
  • otgrpc - OpenTracing client-side and server-side interceptors
  • grpc_opentracing - OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags
  • grpc_validator - codegen inbound message validation from .proto options
  • grpc_recovery - turn panics into gRPC errors

grpc_auth

処理を実行する前に認証処理を行いたい場合使うミドルウェアです。
クライアントから認証用のトークンを受け取り、意図したトークンでない場合認証エラーを出す処理を書いてみます。

main.go
// NewServerのところでInterceptorを設定します
s := grpc.NewServer(
                grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authFunc)),
        )
---

// 認証用の関数
func authFunc(ctx context.Context) (context.Context, error) {
        token, err := grpc_auth.AuthFromMD(ctx, "bearer")
        if err != nil {
                return nil, err
        }
        fmt.Printf("receive token: %s\n", token)
        if token != "hoge" {
                return nil, grpc.Errorf(codes.Unauthenticated, "invalid token")
        }
        newCtx := context.WithValue(ctx, "result", "ok")
        return newCtx, nil
}

上記の処理は、Authorizationヘッダー内にBearer Tokenを埋め込み、Tokenが「hoge」であればリクエストが成功するようにしてます。
試してみます。

# ヘッダーなし
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
ERROR:
  Code: Unauthenticated
  Message: Request unauthenticated with bearer

# ヘッダーあり、token間違い
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "Authorization: bearer fuga" localhost:5000 hello.HelloService/Hello
ERROR:
  Code: Unauthenticated
  Message: invalid token

# ヘッダーあり(正常)
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "Authorization: bearer hoge" localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}

というふうに、お手軽に認証の共通処理を書けました。

grpc_ctxtags

リクエストの情報をContext内のMetadataに埋め込んでくれるものです。他のミドルウェアと併用して使います。

grpc_zap

ZapというLoggerをgRPCのロガーとして設定し、ログ出力をするためのミドルウェアです。

main.go
        opts := []grpc_zap.Option{
                grpc_zap.WithDurationField(func(duration time.Duration) zapcore.Field {
                        return zap.Int64("grpc.time_ns", duration.Nanoseconds())
                }),
        }
        zapLogger, _ := zap.NewProduction()
        grpc_zap.ReplaceGrpcLogger(zapLogger)

        s := grpc.NewServer(
                grpc_middleware.WithUnaryServerChain(
                        grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
                        grpc_zap.UnaryServerInterceptor(zapLogger, opts...),
                ),
        )

実行した結果が以下になります。

$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "te
st: hoge" localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}

# gRPCサーバーのログに以下が出力される
{"level":"info","ts":1547104771.884754,"caller":"zap/server_interceptors.go:40","msg":"finished unary call with code OK","peer.address":"[::1]:50033","grpc.start_time":"2019-01-10T16:19:31+09:00","system":"grpc","span.kind":"server","grpc.service":"hello.HelloService","grpc.method":"Hello","peer.address":"[::1]:50033","grpc.code":"OK","grpc.time_ns":162981}

gRPCのアクセスログがいい感じに出力されました。

grpc_logrus

grpc_zapと同様にlogrus用のミドルウェアが提供されています。

main.go
logrus.SetLevel(logrus.DebugLevel)
        logrus.SetOutput(os.Stdout)
        logrus.SetFormatter(&logrus.JSONFormatter{})
        logger := logrus.WithFields(logrus.Fields{})

        opts := []grpc_logrus.Option{
                grpc_logrus.WithDurationField(func(duration time.Duration) (key string, value interface{}) {
                        return "grpc.time_ns", duration.Nanoseconds()
                }),
        }

        grpc_logrus.ReplaceGrpcLogger(logger)

        s := grpc.NewServer(
                grpc_middleware.WithUnaryServerChain(
                        grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
                        grpc_logrus.UnaryServerInterceptor(logger, opts...),
                ),
        )

実行してみます。

$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "te
st: hoge" localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}

# gRPCサーバーのログに以下が出力される
{"grpc.code":"OK","grpc.method":"Hello","grpc.service":"hello.HelloService","grpc.start_time":"2019-01-10T16:34:13+09:00","grpc.time_ns":16512,"level":"info","msg":"finished unary call with code OK","peer.address":"[::1]:53643","span.kind":"server","system":"grpc","time":"2019-01-10T16:34:13+09:00"}

zapと同じですね。

grpc_prometheus

Prometheusで監視する際のメトリクスを出力するミドルウェアです。

変更箇所が多いので全部のせます。

main.go
package main

import (
        "context"
        "fmt"
        grpc_prome "github.com/grpc-ecosystem/go-grpc-prometheus"
        pb "github.com/morix1500/sample-go-grpc-middleware/proto"
        "github.com/prometheus/client_golang/prometheus"
        "github.com/prometheus/client_golang/prometheus/promhttp"
        "google.golang.org/grpc"
        "net"
        "net/http"
)

var (
        grpcMetrics = grpc_prome.NewServerMetrics()

        reg = prometheus.NewRegistry()

        customizedCounterMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
                Name: "demo_server_say_hello_method_handle_count",
                Help: "Total number of RPCs handled on the server.",
        }, []string{"name"})
)

type HelloService struct{}

func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
        // Custom Metrics Count up
        customizedCounterMetric.WithLabelValues("Test").Inc()
        return &pb.HelloResponse{
                Message: "Hello, " + in.Name,
        }, nil
}

func main() {
        lis, err := net.Listen("tcp", ":5000")
        if err != nil {
                panic(err)
        }

        // カスタムメトリクス登録
        reg.MustRegister(grpcMetrics, customizedCounterMetric)
        customizedCounterMetric.WithLabelValues("Test")

        // create http server
        httpServer := &http.Server{
                Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
                Addr:    fmt.Sprintf("0.0.0.0:%d", 5001),
        }

        s := grpc.NewServer(
                grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
        )
        pb.RegisterHelloServiceServer(s, HelloService{})

        // メトリクス初期化
        grpcMetrics.InitializeMetrics(s)

        go func() {
                if err := httpServer.ListenAndServe(); err != nil {
                        panic(err)
                }
        }()

        if err := s.Serve(lis); err != nil {
                panic(err)
        }
}

実行します

$ go run main.go
# helloを3回呼び出したあとに
$ curl localhost:5001

# HELP demo_server_say_hello_method_handle_count Total number of RPCs handled on the server.
# TYPE demo_server_say_hello_method_handle_count counter
demo_server_say_hello_method_handle_count{name="Test"} 3
# HELP grpc_server_handled_total Total number of RPCs completed on the server, regardless of success or failure.
# TYPE grpc_server_handled_total counter
grpc_server_handled_total{grpc_code="Aborted",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="AlreadyExists",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Canceled",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="DataLoss",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="DeadlineExceeded",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="FailedPrecondition",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Internal",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="InvalidArgument",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="NotFound",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="OK",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
grpc_server_handled_total{grpc_code="OutOfRange",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="PermissionDenied",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="ResourceExhausted",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unauthenticated",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unavailable",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unimplemented",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unknown",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
# HELP grpc_server_msg_received_total Total number of RPC stream messages received on the server.
# TYPE grpc_server_msg_received_total counter
grpc_server_msg_received_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
# HELP grpc_server_msg_sent_total Total number of gRPC stream messages sent by the server.
# TYPE grpc_server_msg_sent_total counter
grpc_server_msg_sent_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
# HELP grpc_server_started_total Total number of RPCs started on the server.
# TYPE grpc_server_started_total counter
grpc_server_started_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3

このようにPrometheusが拾える形式でメトリクスを出力しています。
もちろんカスタムメトリクスも設定できます。

otgrpc / grpc_opentracing

OpenTracingを利用するミドルウェア。
そもそもOpenTracingの使い方を知らないので割愛。

grpc_validator

Requestのバリデートを行なうミドルウェアです。
protoにバリデートの方法を記載します。

hello.proto
syntax = "proto3";

import "github.com/mwitkow/go-proto-validators/validator.proto";
package hello;

message HelloRequest {
  string name = 1 [(validator.field) = {string_not_empty: true}];
}

goファイルを出すコマンドも少し変更になります。

docker run --rm -v $(pwd):$(pwd) \
-w $(pwd) znly/protoc:0.4.0 \
-I ./proto \
--go_out=plugins=grpc:./proto/ \
--govalidators_out=./proto/ \
proto/hello.proto

これでprotoディレクトリ配下に hello.validator.pb.go というファイルが出力されます。

あとはgRPCのIntercepterの設定です。

main.go
        s := grpc.NewServer(
                grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
                        grpc_validator.UnaryServerInterceptor(),
                )),
        )

これで実行してみます。

$ go run main.go
# 正常
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}
# リクエストなし
$ grpcurl -plaintext -import-path . -proto proto/hello.proto localhost:5000 hello.HelloSer
vice/Hello
ERROR:
  Code: InvalidArgument
  Message: invalid field Name: value '' must not be an empty string

protoファイルに定義してバリデートができるので、インターフェイス定義書の役割をprotoファイルに寄せることが可能です。

grpc_recovery

panicをgRPCエラーに変えてくれるミドルウェアです。

main.go
// 意図的にpanicが起こるようにします
func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
        if in.Name == "panic" {
                panic("failed")
        }
---
func main() {
        opts := []grpc_recovery.Option{
                grpc_recovery.WithRecoveryHandler(recoveryFunc),
        }

        s := grpc.NewServer(
                grpc_middleware.WithUnaryServerChain(
                        grpc_recovery.UnaryServerInterceptor(opts...),
                ),
        )

---

// gRPCのerrorを返します
func recoveryFunc(p interface{}) error {
        fmt.Printf("p: %+v\n", p)
        return grpc.Errorf(codes.Internal, "Unexpected error")
}

実行します

$ go run main.go
# 正常
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localh
ost:5000 hello.HelloService/Hello
{
  "message": "Hello, Morix"
}
# 以上
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "panic"}' localhost:5000 hello.HelloService/Hello
ERROR:
  Code: Internal
  Message: Unexpected error

panicが起きるとサーバーが停止してしまいますが、gPRCのエラーとして返却できるようになりました。

TIPS

gRPCのエラーコード

gRPCのエラーを発生させる際は以下の中からコードを選びます。
https://github.com/grpc/grpc-go/blob/master/codes/codes.go

contextのMetadataを操作する便利関数

contextに付随するMetadataを編集するのはちょっとめんどくさいんですが、
go-grpc-middlewareでそういう機能を提供してくれてました。
https://github.com/grpc-ecosystem/go-grpc-middleware/tree/master/util/metautils

// これでMetadataのKeyからValueが取れる
val := metautils.ExtractIncoming(ctx).Get("Hoge")
44
30
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
44
30

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?