gRPCでは関数実行時に指定した処理をインターセプトすることができます。
golangでgRPCアプリを作成する際にこのインターセプトを利用するときに便利なミドルウェアが揃ってます。
それがgo-grpc-middlewareです。
今回はこのgo-grpc-middlewareの機能を一通り試してみたいと思います。
今回作ったサンプルアプリは以下になります。
https://github.com/morix1500/sample-go-grpc-middleware
準備
まずはgRPCサーバーを作りましょう。
syntax = "proto3";
package hello;
message HelloRequest {
string name = 1;
}
message HelloResponse {
string message = 1;
}
service HelloService {
rpc Hello(HelloRequest) returns (HelloResponse) {}
}
以下のコマンドでgoファイルを生成します。
docker run --rm -v $(pwd):$(pwd) \
-w $(pwd) znly/protoc:0.4.0 \
-I ./proto \
--go_out=plugins=grpc:./proto/ \
proto/hello.proto
サーバーコードをgolangで書きましょう。
package main
import (
"context"
pb "github.com/morix1500/sample-go-grpc-middleware/proto"
"google.golang.org/grpc"
"net"
)
type HelloService struct{}
func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
return &pb.HelloResponse{
Message: "Hello, " + in.Name,
}, nil
}
func main() {
s := grpc.NewServer()
pb.RegisterHelloServiceServer(s, HelloService{})
lis, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
if err := s.Serve(lis); err != nil {
panic(err)
}
}
最後に起動してgrpcurlで動作確認します。
$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
動作確認できました。
試してみる
この記事の執筆時点でgo-grpc-middlewareでは以下のようなものが提供されています。
- grpc_auth - a customizable (via AuthFunc) piece of auth middleware
- grpc_ctxtags - a library that adds a Tag map to context, with data populated from request body
- grpc_zap - integration of zap logging library into gRPC handlers.
- grpc_logrus - integration of logrus logging library into gRPC handlers.
- grpc_prometheus - Prometheus client-side and server-side monitoring middleware
- otgrpc - OpenTracing client-side and server-side interceptors
- grpc_opentracing - OpenTracing client-side and server-side interceptors with support for streaming and handler-returned tags
- grpc_validator - codegen inbound message validation from .proto options
- grpc_recovery - turn panics into gRPC errors
grpc_auth
処理を実行する前に認証処理を行いたい場合使うミドルウェアです。
クライアントから認証用のトークンを受け取り、意図したトークンでない場合認証エラーを出す処理を書いてみます。
// NewServerのところでInterceptorを設定します
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_auth.UnaryServerInterceptor(authFunc)),
)
---
// 認証用の関数
func authFunc(ctx context.Context) (context.Context, error) {
token, err := grpc_auth.AuthFromMD(ctx, "bearer")
if err != nil {
return nil, err
}
fmt.Printf("receive token: %s\n", token)
if token != "hoge" {
return nil, grpc.Errorf(codes.Unauthenticated, "invalid token")
}
newCtx := context.WithValue(ctx, "result", "ok")
return newCtx, nil
}
上記の処理は、Authorizationヘッダー内にBearer Tokenを埋め込み、Tokenが「hoge」であればリクエストが成功するようにしてます。
試してみます。
# ヘッダーなし
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
ERROR:
Code: Unauthenticated
Message: Request unauthenticated with bearer
# ヘッダーあり、token間違い
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "Authorization: bearer fuga" localhost:5000 hello.HelloService/Hello
ERROR:
Code: Unauthenticated
Message: invalid token
# ヘッダーあり(正常)
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "Authorization: bearer hoge" localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
というふうに、お手軽に認証の共通処理を書けました。
grpc_ctxtags
リクエストの情報をContext内のMetadataに埋め込んでくれるものです。他のミドルウェアと併用して使います。
grpc_zap
ZapというLoggerをgRPCのロガーとして設定し、ログ出力をするためのミドルウェアです。
opts := []grpc_zap.Option{
grpc_zap.WithDurationField(func(duration time.Duration) zapcore.Field {
return zap.Int64("grpc.time_ns", duration.Nanoseconds())
}),
}
zapLogger, _ := zap.NewProduction()
grpc_zap.ReplaceGrpcLogger(zapLogger)
s := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_zap.UnaryServerInterceptor(zapLogger, opts...),
),
)
実行した結果が以下になります。
$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "te
st: hoge" localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
# gRPCサーバーのログに以下が出力される
{"level":"info","ts":1547104771.884754,"caller":"zap/server_interceptors.go:40","msg":"finished unary call with code OK","peer.address":"[::1]:50033","grpc.start_time":"2019-01-10T16:19:31+09:00","system":"grpc","span.kind":"server","grpc.service":"hello.HelloService","grpc.method":"Hello","peer.address":"[::1]:50033","grpc.code":"OK","grpc.time_ns":162981}
gRPCのアクセスログがいい感じに出力されました。
grpc_logrus
grpc_zapと同様にlogrus用のミドルウェアが提供されています。
logrus.SetLevel(logrus.DebugLevel)
logrus.SetOutput(os.Stdout)
logrus.SetFormatter(&logrus.JSONFormatter{})
logger := logrus.WithFields(logrus.Fields{})
opts := []grpc_logrus.Option{
grpc_logrus.WithDurationField(func(duration time.Duration) (key string, value interface{}) {
return "grpc.time_ns", duration.Nanoseconds()
}),
}
grpc_logrus.ReplaceGrpcLogger(logger)
s := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(
grpc_ctxtags.UnaryServerInterceptor(grpc_ctxtags.WithFieldExtractor(grpc_ctxtags.CodeGenRequestFieldExtractor)),
grpc_logrus.UnaryServerInterceptor(logger, opts...),
),
)
実行してみます。
$ go run main.go
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' -H "te
st: hoge" localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
# gRPCサーバーのログに以下が出力される
{"grpc.code":"OK","grpc.method":"Hello","grpc.service":"hello.HelloService","grpc.start_time":"2019-01-10T16:34:13+09:00","grpc.time_ns":16512,"level":"info","msg":"finished unary call with code OK","peer.address":"[::1]:53643","span.kind":"server","system":"grpc","time":"2019-01-10T16:34:13+09:00"}
zapと同じですね。
grpc_prometheus
Prometheusで監視する際のメトリクスを出力するミドルウェアです。
変更箇所が多いので全部のせます。
package main
import (
"context"
"fmt"
grpc_prome "github.com/grpc-ecosystem/go-grpc-prometheus"
pb "github.com/morix1500/sample-go-grpc-middleware/proto"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"net"
"net/http"
)
var (
grpcMetrics = grpc_prome.NewServerMetrics()
reg = prometheus.NewRegistry()
customizedCounterMetric = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "demo_server_say_hello_method_handle_count",
Help: "Total number of RPCs handled on the server.",
}, []string{"name"})
)
type HelloService struct{}
func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
// Custom Metrics Count up
customizedCounterMetric.WithLabelValues("Test").Inc()
return &pb.HelloResponse{
Message: "Hello, " + in.Name,
}, nil
}
func main() {
lis, err := net.Listen("tcp", ":5000")
if err != nil {
panic(err)
}
// カスタムメトリクス登録
reg.MustRegister(grpcMetrics, customizedCounterMetric)
customizedCounterMetric.WithLabelValues("Test")
// create http server
httpServer := &http.Server{
Handler: promhttp.HandlerFor(reg, promhttp.HandlerOpts{}),
Addr: fmt.Sprintf("0.0.0.0:%d", 5001),
}
s := grpc.NewServer(
grpc.UnaryInterceptor(grpcMetrics.UnaryServerInterceptor()),
)
pb.RegisterHelloServiceServer(s, HelloService{})
// メトリクス初期化
grpcMetrics.InitializeMetrics(s)
go func() {
if err := httpServer.ListenAndServe(); err != nil {
panic(err)
}
}()
if err := s.Serve(lis); err != nil {
panic(err)
}
}
実行します
$ go run main.go
# helloを3回呼び出したあとに
$ curl localhost:5001
# HELP demo_server_say_hello_method_handle_count Total number of RPCs handled on the server.
# TYPE demo_server_say_hello_method_handle_count counter
demo_server_say_hello_method_handle_count{name="Test"} 3
# HELP grpc_server_handled_total Total number of RPCs completed on the server, regardless of success or failure.
# TYPE grpc_server_handled_total counter
grpc_server_handled_total{grpc_code="Aborted",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="AlreadyExists",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Canceled",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="DataLoss",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="DeadlineExceeded",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="FailedPrecondition",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Internal",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="InvalidArgument",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="NotFound",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="OK",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
grpc_server_handled_total{grpc_code="OutOfRange",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="PermissionDenied",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="ResourceExhausted",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unauthenticated",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unavailable",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unimplemented",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
grpc_server_handled_total{grpc_code="Unknown",grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 0
# HELP grpc_server_msg_received_total Total number of RPC stream messages received on the server.
# TYPE grpc_server_msg_received_total counter
grpc_server_msg_received_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
# HELP grpc_server_msg_sent_total Total number of gRPC stream messages sent by the server.
# TYPE grpc_server_msg_sent_total counter
grpc_server_msg_sent_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
# HELP grpc_server_started_total Total number of RPCs started on the server.
# TYPE grpc_server_started_total counter
grpc_server_started_total{grpc_method="Hello",grpc_service="hello.HelloService",grpc_type="unary"} 3
このようにPrometheusが拾える形式でメトリクスを出力しています。
もちろんカスタムメトリクスも設定できます。
otgrpc / grpc_opentracing
OpenTracingを利用するミドルウェア。
そもそもOpenTracingの使い方を知らないので割愛。
grpc_validator
Requestのバリデートを行なうミドルウェアです。
protoにバリデートの方法を記載します。
syntax = "proto3";
import "github.com/mwitkow/go-proto-validators/validator.proto";
package hello;
message HelloRequest {
string name = 1 [(validator.field) = {string_not_empty: true}];
}
goファイルを出すコマンドも少し変更になります。
docker run --rm -v $(pwd):$(pwd) \
-w $(pwd) znly/protoc:0.4.0 \
-I ./proto \
--go_out=plugins=grpc:./proto/ \
--govalidators_out=./proto/ \
proto/hello.proto
これでprotoディレクトリ配下に hello.validator.pb.go
というファイルが出力されます。
あとはgRPCのIntercepterの設定です。
s := grpc.NewServer(
grpc.UnaryInterceptor(grpc_middleware.ChainUnaryServer(
grpc_validator.UnaryServerInterceptor(),
)),
)
これで実行してみます。
$ go run main.go
# 正常
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localhost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
# リクエストなし
$ grpcurl -plaintext -import-path . -proto proto/hello.proto localhost:5000 hello.HelloSer
vice/Hello
ERROR:
Code: InvalidArgument
Message: invalid field Name: value '' must not be an empty string
protoファイルに定義してバリデートができるので、インターフェイス定義書の役割をprotoファイルに寄せることが可能です。
grpc_recovery
panicをgRPCエラーに変えてくれるミドルウェアです。
// 意図的にpanicが起こるようにします
func (h HelloService) Hello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloResponse, error) {
if in.Name == "panic" {
panic("failed")
}
---
func main() {
opts := []grpc_recovery.Option{
grpc_recovery.WithRecoveryHandler(recoveryFunc),
}
s := grpc.NewServer(
grpc_middleware.WithUnaryServerChain(
grpc_recovery.UnaryServerInterceptor(opts...),
),
)
---
// gRPCのerrorを返します
func recoveryFunc(p interface{}) error {
fmt.Printf("p: %+v\n", p)
return grpc.Errorf(codes.Internal, "Unexpected error")
}
実行します
$ go run main.go
# 正常
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "Morix"}' localh
ost:5000 hello.HelloService/Hello
{
"message": "Hello, Morix"
}
# 以上
$ grpcurl -plaintext -import-path . -proto proto/hello.proto -d '{"name": "panic"}' localhost:5000 hello.HelloService/Hello
ERROR:
Code: Internal
Message: Unexpected error
panicが起きるとサーバーが停止してしまいますが、gPRCのエラーとして返却できるようになりました。
TIPS
gRPCのエラーコード
gRPCのエラーを発生させる際は以下の中からコードを選びます。
https://github.com/grpc/grpc-go/blob/master/codes/codes.go
contextのMetadataを操作する便利関数
contextに付随するMetadataを編集するのはちょっとめんどくさいんですが、
go-grpc-middlewareでそういう機能を提供してくれてました。
https://github.com/grpc-ecosystem/go-grpc-middleware/tree/master/util/metautils
// これでMetadataのKeyからValueが取れる
val := metautils.ExtractIncoming(ctx).Get("Hoge")