LoginSignup
259
186

More than 3 years have passed since last update.

内部実装から理解するgRPC

Last updated at Posted at 2019-05-26

概要

目的

gRPCはDocumentにあるように以下の特徴があるかと思います。

  • protocol buffer のようなインターフェース定義語 (IDL) から生成されたコードを利用してRPCができる
  • HTTP/2で通信することができ、リクエストとレスポンスをそれぞれ分割できる
  • 多言語に対応している

しかし、この記事ではこれらの機能の紹介ではなく、gRPCの仕組みを理解することを意図しています。
なぜそれを意図したかというと普段の開発でgRPCを利用しているものの、どのような仕組みでRPCが実現できているのかイメージが持てていなかったためです。そのために、grpc-goの内部実装(2019/5時点)を読み解きながら、実際の通信の中身を覗いてみました。

そして結果的には以下の効用がありました。

  • protoc-gen-goがprotocol-buffersから生成したコードがどのように利用されているか分かる
  • HTTP/2により多重化されたリクエストをどう扱っているか分かる
  • Webサーバーを実装をする時にどのような設計にすればいいのか参考になる

処理流れ

まずソースコードを読むとサーバー側は以下のようなフローで処理されていることが分かります。

grpc.png

goroutineが三重に実行されています。具体的には以下の順番で処理が進みます。

  1. TCP Connection の受付
  2. TLSハンドシェイクを並行処理
  3. フレームの読み取りを並行処理
  4. gRPC method の実行を並行処理

1は2の完了を監視しており、func (*WaitGroup) Addを2の呼び出し前で実行し、2が完了する前にfunc (*WaitGroup) Doneを実行しています。

この二つの処理でTCPやTLSの通信の確立を担っており、それ以降の処理を別のgoroutineに委ねています。こうすることで、後続の通信の確立を3, 4の処理でブロックしてしまわないようにしています。

そして、次の3, 4が実際のHTTP/2の扱いとgRPC methodの呼び出しを行います。3ではリクエストからフレームを読み取り、gRPC methodに対応したchannelにデータを格納します。

HTTP/2ではリクエスト(RPCでは一つのメソッド呼び出し)は一つの通信に多重化されるので、複数のgRPC methodの呼び出しがされている可能性もあります。そこで、それぞれのgRPC methodごとにgoroutineを起動し、channelからデータを取得し、IDLで定義したリクエストのフォーマットに従った構造体にUnmarshalし、gRPC methodの呼び出しを行います。これらが大きな処理の流れになります。

HTTP/2に関する前提知識

実際に詳しくソースコードを理解するためにはHTTP/2に関する知識が不可欠です。HTTP/2はHTTP/1.1の抱えるパフォーマンス上の課題を解決することを主な目的として開発されました。

そのパフォーマンス強化の中心となる仕組みとして、バイナリフォーマットでHTTPのメソッド、ヘッダ、ボディなどの構成要素がHTTPのレイヤーにおいて表現されること(バイナリフレーミング)があります。HTTP/1.xまでは改行で区切られたプレーンテキストでこれらを表現していました。リクエストは複数の改行されたヘッダフィールドを持ち、レスポンスの最初の行はステータスコードであるといった具合です。

そのため、一つの通信に一つのリクエスト/レスポンスという関係でした。しかしHTTP/2ではリクエスト/レスポンスをHTTPレベルでバイナリとして扱えるので、それらを分割し、一つの通信において多重化し、受信側で再構成をすることができます。gRPCの実装においてもこれらがなされています。

binary_framing.png
こちらを参照

まず、これらのプロセスを理解するためにHTTP/2に関する用語を確認します。

  • ストリーム
    • 確立した接続内の双方向の論理的なフレームの流れのまとまりを表現したもの
  • メッセージ
    • 論理的なリクエストまたはレスポンス、メッセージを表現するためにフレームを順番にまとめたもの
  • フレーム
    • HTTP/2における通信の最小単位で、それぞれのフレームはヘッダを持ち、ヘッダはそのフレームが所属するストリームを識別する

stream_message_frame.png
こちらを参照

メッセージとフレームの関係は従来のHTTP/1.xからイメージしやすいかと思いますが、ストリームという概念がなぜ存在するのかが私には分かりづらかったです。調べてみると、ストリームは以下の目的のために存在しているようです。

  • フレームの流れのライフサイクルをフレームのタイプによって制御する
  • フレームの流れごとに優先順位を付ける
  • フレームの流れごとにフロー制御する

フレームの流れのライフサイクルをフレームタイプによって制御する

ストリームごとに状態を持ちます。RFCに状態遷移図があります。
実際に通信されるのはフレームなので、フレームの持つタイプに応じて、ストリームの開始、ストリームにおけるフレームの扱い、ストリームの終了が遷移します。

フレームの流れごとに優先順位を付ける

https://summerwind.jp/docs/rfc7540/#section5-3
HTTPメッセージが複数のフレームに分割されると、一つの通信で複数のストリームに所属するフレームが送信されることになります。
ストリームを開始するHEADERSフレームに優先順位を持たせることで、複数のストリームにおけるリソース割り当ての方法を明示することができます。

フレームの流れごとにフロー制御する

https://summerwind.jp/docs/rfc7540/#section6-9
TCPでは送信受信共にバッファにデータを格納し互いに通信するので、それを上回らないように利用状況を交換、調整します。
HTTP/2では、複数のストリームを同じTCP接続上で多重化するので、リソース割り当てをHTTPレベルでも管理する必要があります。
なぜならTCPのフロー制御では、単一の接続内に存在する複数のストリームを判別できないからです。これをWINDOW_UPDATEフレームにより実現します。

gRPCを実際に呼び出す

それでは、実際にgRPCを使ってクライアントとサーバーで通信してみます。
ここの実装はgrpc-go/examplesを利用しています。

Protocol Buffer から go のソースコードを生成する

まず、以下のようにprotocol bufferの定義を定義します。
https://github.com/grpc/grpc-go/blob/master/examples/helloworld/helloworld/helloworld.proto

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}

そして、protocol buffer の定義に沿ったgRPC用の go のソースコードを生成します。
https://github.com/grpc/grpc-go/blob/master/examples/helloworld/helloworld/helloworld.pb.go

サーバーサイドの実装

以下のようにgRPC method を実装し、gRPCサーバーを起動します。
https://github.com/grpc/grpc-go/blob/master/examples/helloworld/greeter_server/main.go

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
    log.Printf("Received: %v", in.Name)
    return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

func main() {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        log.Fatalf("failed to listen: %v", err)
    }
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})
    if err := s.Serve(lis); err != nil {
        log.Fatalf("failed to serve: %v", err)
    }
}

ここでは、実装したgRPC methodをgrpc.Server構造体に格納されます。
これはこの後のgRPCのメソッド呼び出しに応じて、gRPC Service名、method名に応じて取り出され、実行されることになります。

// server is used to implement helloworld.GreeterServer.
type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
  ...
}

func main() {
  ...
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})
  ...
}

内部的には、protocol buffer から生成された以下のgrpc.ServiceDes構造体をgrpc.Server構造体に登録しています。
https://github.com/grpc/grpc-go/blob/master/examples/helloworld/helloworld/helloworld.pb.go#L148

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {
    s.RegisterService(&_Greeter_serviceDesc, srv)
}

var _Greeter_serviceDesc = grpc.ServiceDesc{
    ServiceName: "helloworld.Greeter",
    HandlerType: (*GreeterServer)(nil),
    Methods: []grpc.MethodDesc{
        {
            MethodName: "SayHello",
            Handler:    _Greeter_SayHello_Handler,
        },
    },
    Streams:  []grpc.StreamDesc{},
    Metadata: "helloworld.proto",
}

上記のMethodDesc構造体のMethodNameフィールドをkey, MethodDesc構造体自体をvalueとして、grpc.service構造体のmdフィールドに格納していることが分かります。
そして、このgrpc.service構造体はgrpc.Server構造体のmフィールドに格納されます。
https://github.com/grpc/grpc-go/blob/master/server.go#L426

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {
  ...
    s.register(sd, ss)
}

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
        ...
    srv := &service{
        server: ss,
        md:     make(map[string]*MethodDesc),
        sd:     make(map[string]*StreamDesc),
        mdata:  sd.Metadata,
    }
    for i := range sd.Methods {
        d := &sd.Methods[i]
        srv.md[d.MethodName] = d
    }
        ...
    s.m[sd.ServiceName] = srv
}

// service consists of the information of the server serving this service and
// the methods in this service.
type service struct {
        ...
    md     map[string]*MethodDesc
}

// Server is a gRPC server to serve RPC requests.
type Server struct {
        ...
    m      map[string]*service // service name -> service info
        ...
}

要するに以下のような構造体にそれぞれ作られることになります。

register.png

そのようにgRPC methodが登録されたServerを起動します。

func main() {
    lis, err := net.Listen("tcp", port)
        ...
    s := grpc.NewServer()
    pb.RegisterGreeterServer(s, &server{})
    s.Serve(lis)
        ...
}

クライアントからの呼び出し

今度は、以下のようにprotocol-buffersから生成したgoのクライアントサイドの実装を使って、gRPCサーバーにリクエストします。
https://github.com/grpc/grpc-go/blob/master/examples/helloworld/greeter_client/main.go

func main() {
    // Set up a connection to the server.
    conn, err := grpc.Dial(address, grpc.WithInsecure())
        ...
    c := pb.NewGreeterClient(conn)
        ...
    r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
        ...
}

これは、内部的には新規にストリームを作成し、gRPCのメソッド呼び出しをしてレスポンスを待っています。
https://github.com/grpc/grpc-go/blob/master/examples/helloworld/helloworld/helloworld.pb.go#L133

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {
    out := new(HelloReply)
    err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
        ...
    return out, nil
}

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {
    cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
    if err != nil {
        return err
    }
    if err := cs.SendMsg(req); err != nil {
        return err
    }
    return cs.RecvMsg(reply)
}

実際にwiresharkを使って、この通信をみてみます。

request.png

複数のフレームが送受信されていることが確認できます。そして全てのフレームはStreamIDが1になっており、一つのストリームで通信されています。

gRPC Server 側の挙動を確認する

今度はメソッド呼び出しされたgRPCサーバー側の挙動を確認します。

grpc.png

概要で説明したように以下の流れでgRPC method の呼び出しが行われます。

  1. TCP Connection の確立
  2. TLSハンドシェイクを並行処理
  3. Streamの読み取りを並行処理
  4. gRPC method の実行を並行処理

TCP/TLSのコネクション確立

先ほどは、以下のようにServer.Serveメソッドを呼び出しました。

func main() {
    lis, err := net.Listen("tcp", port)
        ...
    s.Serve(lis)
        ...
}

そこでは、TCPコネクションが確立されると、net.Connインターフェースを返し、Server.handleRawConnにそれを渡してgoroutineを実行します。
https://github.com/grpc/grpc-go/blob/master/server.go#L545

func (s *Server) Serve(lis net.Listener) error {
        ...
    for {
        rawConn, err := lis.Accept()
                ...
        go func() {
            s.handleRawConn(rawConn)
            s.serveWG.Done()
        }()
    }
}

そして、Server.handleRawConnでは以下の二つを行います。

  • TLSハンドシェイクによる通信の確立
  • net.Connからtransport.ServerTransportインターフェースを作成

具体的には以下のように、まずnet.ConnからTLSハンドシェイクをし、次に受け取ったTCPコネクションの情報からtransport.ServerTransportインターフェースを作成して、Server.serveStreamsに渡します。
https://github.com/grpc/grpc-go/blob/master/server.go#L639

func (s *Server) handleRawConn(rawConn net.Conn) {
        ...
   conn, authInfo, err := s.useTransportAuthenticator(rawConn)
        ...
    // Finish handshaking (HTTP2)
    st := s.newHTTP2Transport(conn, authInfo)
        ...
    go func() {
        s.serveStreams(st)
        s.removeConn(st)
    }()
}

このnewHTTP2Transporで作成されているtransport.ServerTransportインターフェースはgRPCのサーバーサイドがストリームを扱うための実装になっています。
https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L631

// ServerTransport is the common interface for all gRPC server-side transport
// implementations.
//
// Methods may be called concurrently from multiple goroutines, but
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
    // HandleStreams receives incoming streams using the given handler.
    HandleStreams(func(*Stream), func(context.Context, string) context.Context)

    // WriteHeader sends the header metadata for the given stream.
    // WriteHeader may not be called on all streams.
    WriteHeader(s *Stream, md metadata.MD) error

    // Write sends the data for the given stream.
    // Write may not be called on all streams.
    Write(s *Stream, hdr []byte, data []byte, opts *Options) error
  ...
}

実際には、transport.ServerTransportインターフェースを満たすtransport.http2Server構造体を作成して返しています。
https://github.com/grpc/grpc-go/blob/master/server.go#L682
https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L488
https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L126

// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
  ...
    framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
  ...
    t := &http2Server{
    ...
        conn:              conn,
        framer:            framer,
        readerDone:        make(chan struct{}),
        activeStreams:     make(map[uint32]*Stream),
        stats:             config.StatsHandler,
        initialWindowSize: iwz,
    }
  ...
    return t, nil
}

ここのフィールドをみるとフレーム扱うためのframer構造体へのポインタや、現在の接続下におけるストリームなどHTTP/2を扱うための情報が格納されていることが分かります。
そして、その構造体をServer.serveStreamsに渡してgoroutineを起動します。

func (s *Server) handleRawConn(rawConn net.Conn) {
  ...
    st := s.newHTTP2Transport(conn, authInfo)
  ...
    go func() {
        s.serveStreams(st)
        s.removeConn(st)
    }()
}

ストリームの読み取り

呼び出されたServer.serveStreamsでは、先ほど渡されたtransport.ServerTransportインターフェースのHandleStreamsを実行します。
第一引数には、Server.handleStreamを実行する関数が渡されています。これは実際にStreamから読み取ったデータを使ってgRPC method を実行する役割を担っています。(概要図参照)
https://github.com/grpc/grpc-go/blob/master/server.go#L713

func (s *Server) serveStreams(st transport.ServerTransport) {
  ...
  // Streamの読み取りをし、goroutineでServer.handleStreamを実行する
    st.HandleStreams(func(stream *transport.Stream) {
        wg.Add(1)
        go func() {
            defer wg.Done()
      // Streamの読み取り結果に応じた gRPC method の呼び出しをする
            s.handleStream(st, stream, s.traceInfo(st, stream))
        }()
    },
  ...)
  ...
}

transport.ServerTransportインターフェースは上で確認したように、実際はtransport.http2Server構造体なので、以下が実行されます。
https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L428

// HandleStreams receives incoming streams using the given handler. This is
// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
  ...
    for {
        frame, err := t.framer.fr.ReadFrame()
        ...
        if err != nil {
            if err == io.EOF || err == io.ErrUnexpectedEOF {
                t.Close()
                return
            }
                ...
            return
        }
        switch frame := frame.(type) {
        case *http2.MetaHeadersFrame:
            if t.operateHeaders(frame, handle, traceCtx) {
                t.Close()
                break
            }
        case *http2.DataFrame:
            t.handleData(frame)
        case *http2.RSTStreamFrame:
            t.handleRSTStream(frame)
        case *http2.SettingsFrame:
            t.handleSettings(frame)
        case *http2.PingFrame:
            t.handlePing(frame)
        case *http2.WindowUpdateFrame:
            t.handleWindowUpdate(frame)
        case *http2.GoAwayFrame:
            // TODO: Handle GoAway from the client appropriately.
        default:
            errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
        }
    }
}

ここでは、http2パッケージのFramer.ReadFrameメソッドを実行してHTTP/2のフレームを順次読み取りフレームタイプに応じて処理を分岐させています。フレームタイプとは、ストリームの状態を管理するためのもので、現在や残りのフレームをどのように解釈されるかを表します。
https://summerwind.jp/docs/rfc7540/#section6

まず、クライアントが新規のストリームを開始すると、HEADERSフレームが送信されます。
このHEADERSフレームはヘッダに新規ストリームのIDを持ち、ペイロードにHTTPヘッダのKeyValueのペアを持ちます。

request_headers.png

そして、そのフレームを受け取ると、http2Server.operateHeadersが実行されます。
そこでは、フレームから所属するストリームのIDを取得してtransport.Stream構造体を作成し、それを引数で受け取る関数に渡して実行しています。
さらにtransport.newRecevBufferによりフレームから読み取ったデータを受け渡しするChannel作成し、構造体に登録しています。
https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L188
https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L287
https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L64

// operateHeader takes action on the decoded headers.
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
    streamID := frame.Header().StreamID
  ...
  // Channelの作成
    buf := newRecvBuffer()
  // 新規ストリームを表現する構造体の作成
    s := &Stream{
        id:             streamID,
        st:             t,
        buf:            buf,
        fc:             &inFlow{limit: uint32(t.initialWindowSize)},
        recvCompress:   state.data.encoding,
        method:         state.data.method,
        contentSubtype: state.data.contentSubtype,
    }
  ...
    s.trReader = &transportReader{
        reader: &recvBufferReader{
            ctx:     s.ctx,
            ctxDone: s.ctxDone,
            recv:    s.buf,
        },
        windowHandler: func(n int) {
            t.updateWindow(s, uint32(n))
        },
    }
  ...
  // 引数の関数に作成したtransport.Stream構造体を渡す
    handle(s)
    return false
}

HTTP/2では新規ストリームがHEADERSフレームにより作成されると、ペイロードにアプリケーションデータを格納しDATAフレームが送信されます。

request_data.png

フレームタイプがDATA(http2.DataFrame)の場合は、http2Server.handleDataが実行されます。
ここでは、フレームからtransport.Stream構造体を取得して、そこに登録されたChannelに対して読み取ったフレームのデータを書き込んでいます。
https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L544

func (t *http2Server) handleData(f *http2.DataFrame) {
  ...
    // Select the right stream to dispatch.
    s, ok := t.getStream(f)
  ...
    if size > 0 {
    ...
        // TODO(bradfitz, zhaoq): A copy is required here because there is no
        // guarantee f.Data() is consumed before the arrival of next frame.
        // Can this copy be eliminated?
        if len(f.Data()) > 0 {
            data := make([]byte, len(f.Data()))
            copy(data, f.Data())
            s.write(recvMsg{data: data})
        }
    }
    if f.Header().Flags.Has(http2.FlagDataEndStream) {
        // Received the end of stream from the client.
        s.compareAndSwapState(streamActive, streamReadDone)
        s.write(recvMsg{err: io.EOF})
    }
}

内部的にはバッファありChannelにrecvMsg構造体を送信しています。
https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L71

func (s *Stream) write(m recvMsg) {
    s.buf.put(m)
}

type recvBuffer struct {
    c       chan recvMsg
    mu      sync.Mutex
    backlog []recvMsg
    err     error
}

func (b *recvBuffer) put(r recvMsg) {
    b.mu.Lock()
  ...
    if len(b.backlog) == 0 {
        select {
        case b.c <- r:
            b.mu.Unlock()
            return
        default:
        }
    }
    b.backlog = append(b.backlog, r)
    b.mu.Unlock()
}

読み取ったストリームからgRPC methodの実行

呼び出し元の実装を再び確認すると、このtrasport.Stream構造体を受け取る関数の実行によりServer.handleStreamメソッドが実行されてることが分かります。
https://github.com/grpc/grpc-go/blob/master/server.go#L713

func (s *Server) serveStreams(st transport.ServerTransport) {
  ...
    st.HandleStreams(func(stream *transport.Stream) {
        wg.Add(1)
        go func() {
            defer wg.Done()
            s.handleStream(st, stream, s.traceInfo(st, stream))
        }()
    },
  ...)
  ...
}

このServer.handleStreamメソッドは先ほど確認した、HEADERSフレームを読み取ってからtrasport.Stream構造体を引数に呼び出されます。
HEADERSフレームのペイロードにはHTTPヘッダの情報が格納されており、その一つであるヘッダキーpathには、Service名とMethod名がセットされています。

request_only_headers.png

それをstream.Method()で取り出しているので以下のような結果が返ってきます。

/helloworld.Greeter/SayHello

次にそれを使って、gRPCサーバー起動前にgrpc.Serverのmフィールドに登録したgrpc.service構造体を取り出します。
これは最初に説明したように、Service名とgrpc.MethodDesc構造体を対応付けており、さらにmethod名から対応するgRPC methodを取得できます。

register.png

そして、Server.processUnaryRPCメソッドを呼び出します。
https://github.com/grpc/grpc-go/blob/master/server.go#L1248

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {
  // pathヘッダの値を取得 e.g /helloworld.Greeter/SayHello
    sm := stream.Method()
  ...
  pos := strings.LastIndex(sm, "/")
  ...
    service := sm[:pos]
    method := sm[pos+1:]

  // Service名からgrpc.service構造体を取得
    srv, knownService := s.m[service]
    if knownService {
    // Method名からgrpc methodを取得
        if md, ok := srv.md[method]; ok {
            s.processUnaryRPC(t, stream, srv, md, trInfo)
            return
        }
    ...
    }
  ...
}

Server.processUnaryRPCメソッドでは主に以下の四つを実行しています。

  • ストリームを読み取る
  • そのバイト列をリクエストの構造体にUnmarshalする
  • grpc.MethodDesc構造体に登録されたgRPC methodを実行する
  • 実行結果を元にレスポンスをバイト列に書き込む

func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
  ...
  // Channel経由でストリームのデータを読み取る
    d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  ...
    df := func(v interface{}) error {
    // protocol buffer から生成されたリクエストの構造体に読み取ったデータをUnmarshalする
        if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
            return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
        }
    ...
        return nil
    }
  ...
  // 登録されていたgRPC methodを読み出す
    reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
  ...
    opts := &transport.Options{Last: true}

  // その実行結果をレスポンスに書き込む
    if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
    ...
    }
  ...
    err = t.WriteStatus(stream, status.New(codes.OK, ""))
  ...
    return err
}

これで、gRPCサーバー側の主な処理を確認することができました!

TCPコネクションはいつ終了するのか

HTTP/2は1オリジンに1つのTCPコネクションを持ちます。これによって、ストリームをどれだけ並列化しても複数のTCPコネクションが不要になります。
そのため初回のストリームの作成時にのみTCPの通信に関するオーバーヘッドがかかるようになり、スループットが改善しました。

しかし、TCPコネクションがいつ終了するのかは、RFCを読んでもあまり言及されていませんでした。
少なくとも、GOAWAYフレームが送信された時に新規のストリームの作成を止めて、TCPコネクションを終了させるようです。
https://summerwind.jp/docs/rfc7540/#section6-8

The GOAWAY frame (type=0x7) is used to initiate shutdown of a connection or to signal serious error conditions.
GOAWAY allows an endpoint to gracefully stop accepting new streams while still finishing processing of previously established streams.

request.png

実際に確認すると、GOAWAYフレームは送信されておらず、HTTP/2の仕組みを利用せずTCPコネクションを終了させているようです。
https://github.com/grpc/grpc-go/blob/master/server.go#L578

func (s *Server) Serve(lis net.Listener) error {
  ...
    defer func() {
        s.mu.Lock()
        if s.lis != nil && s.lis[ls] {
            ls.Close()
            delete(s.lis, ls)
        }
        s.mu.Unlock()
    }()
  ...
}

このことから、rangeループでUnary RPCを複数回呼び出すよりも、Client Streaming RPCを利用した方が、効率が良さそうです。前者はTCPコネクションを確立、ストリームの作成、ストリームの終了、TCPコネクションの終了を繰り返すのに対して、後者は一度で済むからです。

所感

grpc-goの内部実装を調べることで、goroutineの扱い方、channelの使い方、HTTP2の仕組み、TCPの仕組みなどまとめて学ぶことができたのでよかったです。

259
186
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
259
186