LoginSignup
14
6

More than 3 years have passed since last update.

本番環境で使えるgRPC-Gateway【後編】

Last updated at Posted at 2019-12-13

VISITS Technologies Advent Calendar 2019 14日目は@istshが担当します。

この記事は本番環境で使えるgRPC-Gateway【前編】の続きで、gRPC Clientの実装についてまとめました。
また、前回と同様に、サンプルコードを使って解説してきます。

goファイルを生成

前編 - goファイルを生成 で、login.pb.gw.goも生成されていると思います。
--grpc-gateway_outオプションを付けると、gRPC-Gateway用のgoファイルが生成されます。

gRPC Clientの実装

一部抜粋しています。コード全体を見たい場合はサンプルコードを見てください。

app/cmd/client/main.go
func run() error {
    ctx := context.Background()
    ctx, cancel := context.WithCancel(ctx)
    defer cancel()

    mux := runtime.NewServeMux(
        runtime.WithMetadata(RequestIDAnnotator),
        runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: false, EmitDefaults: true}),
    )

    if err := registerServiceHandlers(ctx, mux); err != nil {
        return err
    }

    handler := handlers.CORS(
        handlers.AllowedOrigins([]string{"*"}),
        handlers.AllowedMethods([]string{http.MethodPost, http.MethodGet, http.MethodPut, http.MethodDelete}),
        handlers.AllowedHeaders([]string{"Authorization", "Content-Type", "Accept-Encoding", "Accept"}),
    )(mux)

    addr := ":8080"
    fmt.Printf("http server started on %s\n", addr)
    // Start HTTP server (and proxy calls to gRPC server endpoint)
    return http.ListenAndServe(addr, handler)
}

func main() {
    flag.Parse()
    defer glog.Flush()

    if err := run(); err != nil {
        glog.Fatal(err)
    }
}

Annotator, Marshallerの設定

mux := runtime.NewServeMux(
    runtime.WithMetadata(RequestIDAnnotator),
    runtime.WithMarshalerOption(runtime.MIMEWildcard, &runtime.JSONPb{OrigName: false, EmitDefaults: true}),
)

前者は、HTTPリクエストヘッダーのX-Request-IDの値を取得し、gRPCのmetadataに詰めて返す関数です。
RequestIDAnnotatorの実装については後述します。

後者は、JSON形式でレスポンスを返すように定義しています。
ここでは、OrigNameEmitDefaultsに値を設定しています。

  • OrigName
    • false: キャメルケースで返す。(具体的には、各フィールドのjson nameで返す)
    • true: protoで定義した名前で返す。
  • EmitDefaults
    • 0やfalseなどのゼロ値になっているものもJSONで出力することができます。

HTTPとgRPCの連携を追加する

func registerServiceHandlers(ctx context.Context, mux *runtime.ServeMux) error {
    opts := grpcDialOptions()

    if err := pbv1.RegisterLoginServiceHandlerFromEndpoint(ctx, mux, *grpcServerEndpoint, opts); err != nil {
        return err
    }

    if err := pbv1.RegisterUserServiceHandlerFromEndpoint(ctx, mux, *grpcServerEndpoint, opts); err != nil {
        return err
    }

    return nil
}

HTTPリクエストをgRPC Serverに渡すための定義です。
grpcDialOptionsの実装については後述します。

CORSの設定

handler := handlers.CORS(
    handlers.AllowedOrigins([]string{"*"}),
    handlers.AllowedMethods([]string{http.MethodPost, http.MethodGet, http.MethodPut, http.MethodDelete}),
    handlers.AllowedHeaders([]string{"Authorization", "Content-Type", "Accept-Encoding", "Accept"}),
)(mux)

github.com/gorilla/handlersが、簡潔に定義できる関数を用意してくれているので採用しています。

RequestIDAnnotatorの実装

// RequestIDAnnotator takes requestID from http request header and sets it to metadata.
func RequestIDAnnotator(ctx context.Context, req *http.Request) metadata.MD {
    requestID := req.Header.Get(interceptor.XRequestIDKey)
    if requestID == "" {
        requestID = xid.New().String()
    }

    return metadata.New(map[string]string{
        interceptor.XRequestIDKey: requestID,
    })
}

前述の通り、HTTPリクエストヘッダーのX-Request-IDの値を取得し、gRPCのmetadataに詰めて返す関数です。
注意すべきなのは、metadataのキーです。
理由は、metadata.Newの中でmapのキーがstrings.ToLowerで小文字に変換されるからです。
metadataから値を取る時に、md[key]と書く場合は気をつけてください。
ただし、md.Get(key)で取る場合は、Getの中でstrings.ToLowerを実行してくれるので、気にしなくて大丈夫です。

grpcDialOptionsの実装

app/cmd/client/main.go
func grpcDialOptions() []grpc.DialOption {
    l := logrus.New()
    l.SetFormatter(&logrus.JSONFormatter{})
    decider := func(ctx context.Context, fullMethodName string) bool {
        return true
    }
    startTimeFunc := func() time.Time {
        return time.Now()
    }
    durationFunc := func(startTime time.Time) time.Duration {
        return time.Now().Sub(startTime)
    }

    opts := []grpc.DialOption{
        grpc.WithInsecure(), // 証明書のエラーを無視する
        grpc.WithDefaultCallOptions(grpc.UseCompressor(gzip.Name)), // gzip圧縮
        grpc.WithUnaryInterceptor(interceptor.PayloadUnaryClientInterceptor(logrus.NewEntry(l), decider, startTimeFunc, durationFunc)),
    }

    return opts
}

PayloadUnaryClientInterceptor

app/infrastructure/interceptor/payload_interceptor.go
// PayloadUnaryClientInterceptor returns a new unary client interceptor that logs the paylods of requests and responses.
func PayloadUnaryClientInterceptor(entry *logrus.Entry, decider grpc_logging.ClientPayloadLoggingDecider, startTimeFunc func() time.Time, durationFunc func(startTime time.Time) time.Duration) grpc.UnaryClientInterceptor {
    return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
        if !decider(ctx, method) {
            return invoker(ctx, method, req, reply, cc, opts...)
        }

        startTime := startTimeFunc()
        err := invoker(ctx, method, req, reply, cc, opts...)
        fields, level := newLogFields(ctx, method, req, reply, durationFunc(startTime), err)

        levelLogf(
            entry.WithFields(fields),
            level,
            "client request/response payload logged as grpc.response.content")
        return err
    }
}

func newLogFields(ctx context.Context, method string, reqPbMsg, resPbMsg interface{}, duration time.Duration, err error) (logrus.Fields, logrus.Level) {
    code := grpc_logging.DefaultErrorToCode(err)
    level := grpc_logrus.DefaultCodeToLevel(code)
    durField, durVal := grpc_logrus.DefaultDurationToField(duration)

    fields := logrus.Fields{
        "system":       "grpc",
        "span.kind":    "client",
        "grpc.service": path.Dir(method)[1:],
        "grpc.method":  path.Base(method),
        "grpc.code":    code.String(),
        durField:       durVal,
    }

    if md, ok := metadata.FromOutgoingContext(ctx); ok {
        if values := md.Get("grpcgateway-user-agent"); len(values) > 0 {
            fields["user-agent"] = values[0]
        }
        if values := md.Get("x-forwarded-for"); len(values) > 0 {
            fields["remote_ip"] = values[0]
        }
        if values := md.Get(XRequestIDKey); len(values) > 0 {
            fields["request_id"] = values[0]
        }
    }
    if p, ok := reqPbMsg.(proto.Message); ok {
        fields["grpc.request.content"] = &jsonpbRequestMarshalleble{p}
    }
    if p, ok := resPbMsg.(proto.Message); ok {
        fields["grpc.response.content"] = &jsonpbResponseMarshalleble{p}
    }
    if err != nil {
        fields[logrus.ErrorKey] = err.Error()
    }

    return fields, level
}

(一部抜粋)

ここでは簡潔に説明するので、詳細はコードを見ていただきたいと思います。
このインターセプターは、各Serviceの実行後に、リクエスト/レスポンスパラメータを標準出力に出力するものです。
実行時間やエラーメッセージ、リクエストID、userAgentなども合わせて出力しています。
また、ログインなどでパスワードが送られてきたときにマスクさせる、といった処理もこのファイル内に実装してあります。

まとめ

【前編】に続き、gRPC Clientの実装を紹介しました。
インターセプターは、github.com/grpc-ecosystem/go-grpc-middlewareを参考に実装したので、場合によってはこのリポジトリの実装で事足りるかもしれません。
自分が探した限りでは、RequestIDやUserAgent、リクエストとレスポンスをまとめて標準出力するなどといったことを実現してくれるライブラリは見つけられなかったので実装することにしました。
同じ課題を抱えている人に少しでも参考になれば嬉しいです。

後日【gRPC-Gatewayでカスタムエラーを返す方法】についても投稿するのでお楽しみに!

14
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
6