始めに
前回の延長でちょっと分かりにくいgrpc-goのInterceptorを使ってみようと思います。
Interceptorとは
WebFramework系で言うとこのMiddlewareという認識で良いのではないかと思います。RPCメソッドの呼出に割り込んで事前・事後処理を実行することができます。
サーバーサイド・クライアントサイドどちらでも使えるようですが、今回はサーバーサイドのみに使っています。
※grpc-goのソースにまだ実験的なものとあったので今後に注意必要かも
実装
InterceptorにはUnaryInterceptorとStreamInterceptorの2種類が用意されており、単純なリクエストとリプライを返すRPCメソッドにはUnaryInterceptorを、Streamを使うRPCメソッドにはStreamInterceptorを使います。
前回のサーバーサイドにInterceptorを用いて処理後のロギングを実装してみようと思います(Logrusを使います)。
server/main.goより抜粋
// 単項用Interceptor
func unaryServerInterceptor(logger *logrus.Logger) grpc.UnaryServerInterceptor {
return func(ctx netCtx.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
// deferを使って処理後にログを出力
var err error
defer func(begin time.Time) {
// infoからメソッド名を取得
method := path.Base(info.FullMethod)
took := time.Since(begin)
fields := logrus.Fields{
"method": method,
"took": took,
}
if err != nil {
fields["error"] = err
logger.WithFields(fields).Error("Failed")
} else {
logger.WithFields(fields).Info("Successed")
}
}(time.Now())
// handler = RPCメソッド
reply, hErr := handler(ctx, req)
if hErr != nil {
err = hErr
}
return reply, err
}
}
// Stream用Iterceptor
func streamServerInterceptor(logger *logrus.Logger) grpc.StreamServerInterceptor {
return func(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
// deferを使って処理後にログを出力
var err error
defer func(begin time.Time) {
// infoからメソッド名を取得
method := path.Base(info.FullMethod)
took := time.Since(begin)
fields := logrus.Fields{
"method": method,
"took": took,
}
if err != nil {
fields["error"] = err
logger.WithFields(fields).Error("Failed")
} else {
logger.WithFields(fields).Info("Successed")
}
}(time.Now())
// handler = RPCメソッド
if hErr := handler(srv, stream); err != nil {
err = hErr
}
return err
}
}
UnaryServerInterceptorでは引数にあるコンテキストをいじってhandlerに渡すことも可能です。StreamServerInterceptorではコンテキストは引数stream内に内包されており、取得は可能ですが、戻すことはできない(=編集できない)模様です。
今回、参考にしたgo-grpc-middlewareではstreamをラッパーしてコンテキストを編集していたりします。
これらをサーバーのオプションとして渡します。
server/main.goより抜粋
logger := logrus.New()
ops := make([]grpc.ServerOption, 0)
ops = append(ops, grpc.UnaryInterceptor(unaryServerInterceptor(logger)))
ops = append(ops, grpc.StreamInterceptor(streamServerInterceptor(logger)))
g := grpc.NewServer(ops...)
これでInterceptorの実装は完了です。
最後に
Streamの方で素直にコンテキストをいじれたらなぁって思いましたとさ。今のところ、ロギングか認証周りくらいしか使い道を思いつかなかったりします。
前回作ったものをInterceptorを追加して更新しています。
https://github.com/lightstaff/grpc_test