内部実装から理解するgRPC


概要


目的

gRPCはDocumentにあるように以下の特徴があるかと思います。


  • protocol buffer のようなインターフェース定義語 (IDL) から生成されたコードを利用してRPCができる

  • HTTP/2で通信することができ、リクエストとレスポンスをそれぞれ分割できる

  • 多言語に対応している

しかし、この記事ではこれらの機能の紹介ではなく、gRPCの仕組みを理解することを意図しています。

なぜそれを意図したかというと普段の開発でgRPCを利用しているものの、どのような仕組みでRPCが実現できているのかイメージが持てていなかったためです。そのために、grpc-goの内部実装(2019/5時点)を読み解きながら、実際の通信の中身を覗いてみました。

そして結果的には以下の効用がありました。


  • protoc-gen-goがprotocol-buffersから生成したコードがどのように利用されているか分かる

  • HTTP/2により多重化されたリクエストをどう扱っているか分かる

  • Webサーバーを実装をする時にどのような設計にすればいいのか参考になる


処理流れ

まずソースコードを読むとサーバー側は以下のようなフローで処理されていることが分かります。

grpc.png

goroutineが三重に実行されています。具体的には以下の順番で処理が進みます。


  1. TCP Connection の受付

  2. TLSハンドシェイクを並行処理

  3. フレームの読み取りを並行処理

  4. gRPC method の実行を並行処理

1は2の完了を監視しており、func (*WaitGroup) Addを2の呼び出し前で実行し、2が完了する前にfunc (*WaitGroup) Doneを実行しています。

この二つの処理でTCPやTLSの通信の確立を担っており、それ以降の処理を別のgoroutineに委ねています。こうすることで、後続の通信の確立を3, 4の処理でブロックしてしまわないようにしています。

そして、次の3, 4が実際のHTTP/2の扱いとgRPC methodの呼び出しを行います。3ではリクエストからフレームを読み取り、gRPC methodに対応したchannelにデータを格納します。

HTTP/2ではリクエスト(RPCでは一つのメソッド呼び出し)は一つの通信に多重化されるので、複数のgRPC methodの呼び出しがされている可能性もあります。そこで、それぞれのgRPC methodごとにgoroutineを起動し、channelからデータを取得し、IDLで定義したリクエストのフォーマットに従った構造体にUnmarshalし、gRPC methodの呼び出しを行います。これらが大きな処理の流れになります。


HTTP/2に関する前提知識

実際に詳しくソースコードを理解するためにはHTTP/2に関する知識が不可欠です。HTTP/2はHTTP/1.1の抱えるパフォーマンス上の課題を解決することを主な目的として開発されました。

そのパフォーマンス強化の中心となる仕組みとして、バイナリフォーマットでHTTPのメソッド、ヘッダ、ボディなどの構成要素がHTTPのレイヤーにおいて表現されること(バイナリフレーミング)があります。HTTP/1.xまでは改行で区切られたプレーンテキストでこれらを表現していました。リクエストは複数の改行されたヘッダフィールドを持ち、レスポンスの最初の行はステータスコードであるといった具合です。

そのため、一つの通信に一つのリクエスト/レスポンスという関係でした。しかしHTTP/2ではリクエスト/レスポンスをHTTPレベルでバイナリとして扱えるので、それらを分割し、一つの通信において多重化し、受信側で再構成をすることができます。gRPCの実装においてもこれらがなされています。

binary_framing.png

こちらを参照

まず、これらのプロセスを理解するためにHTTP/2に関する用語を確認します。


  • ストリーム


    • 確立した接続内の双方向の論理的なフレームの流れのまとまりを表現したもの



  • メッセージ


    • 論理的なリクエストまたはレスポンス、メッセージを表現するためにフレームを順番にまとめたもの



  • フレーム


    • HTTP/2における通信の最小単位で、それぞれのフレームはヘッダを持ち、ヘッダはそのフレームが所属するストリームを識別する



stream_message_frame.png

こちらを参照

メッセージとフレームの関係は従来のHTTP/1.xからイメージしやすいかと思いますが、ストリームという概念がなぜ存在するのかが私には分かりづらかったです。調べてみると、ストリームは以下の目的のために存在しているようです。


  • フレームの流れのライフサイクルをフレームのタイプによって制御する

  • フレームの流れごとに優先順位を付ける

  • フレームの流れごとにフロー制御する

フレームの流れのライフサイクルをフレームタイプによって制御する

ストリームごとに状態を持ちます。RFCに状態遷移図があります。

実際に通信されるのはフレームなので、フレームの持つタイプに応じて、ストリームの開始、ストリームにおけるフレームの扱い、ストリームの終了が遷移します。

フレームの流れごとに優先順位を付ける

https://summerwind.jp/docs/rfc7540/#section5-3

HTTPメッセージが複数のフレームに分割されると、一つの通信で複数のストリームに所属するフレームが送信されることになります。

ストリームを開始するHEADERSフレームに優先順位を持たせることで、複数のストリームにおけるリソース割り当ての方法を明示することができます。

フレームの流れごとにフロー制御する

https://summerwind.jp/docs/rfc7540/#section6-9

TCPでは送信受信共にバッファにデータを格納し互いに通信するので、それを上回らないように利用状況を交換、調整します。

HTTP/2では、複数のストリームを同じTCP接続上で多重化するので、リソース割り当てをHTTPレベルでも管理する必要があります。

なぜならTCPのフロー制御では、単一の接続内に存在する複数のストリームを判別できないからです。これをWINDOW_UPDATEフレームにより実現します。


gRPCを実際に呼び出す

それでは、実際にgRPCを使ってクライアントとサーバーで通信してみます。

ここの実装はgrpc-go/examplesを利用しています。


Protocol Buffer から go のソースコードを生成する

まず、以下のようにprotocol bufferの定義を定義します。

https://github.com/grpc/grpc-go/blob/master/examples/helloworld/helloworld/helloworld.proto

// The greeting service definition.

service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
string name = 1;
}

// The response message containing the greetings
message HelloReply {
string message = 1;
}

そして、protocol buffer の定義に沿ったgRPC用の go のソースコードを生成します。

https://github.com/grpc/grpc-go/blob/master/examples/helloworld/helloworld/helloworld.pb.go


サーバーサイドの実装

以下のようにgRPC method を実装し、gRPCサーバーを起動します。

https://github.com/grpc/grpc-go/blob/master/examples/helloworld/greeter_server/main.go

// server is used to implement helloworld.GreeterServer.

type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
log.Printf("Received: %v", in.Name)
return &pb.HelloReply{Message: "Hello " + in.Name}, nil
}

func main() {
lis, err := net.Listen("tcp", port)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}

ここでは、実装したgRPC methodをgrpc.Server構造体に格納されます。

これはこの後のgRPCのメソッド呼び出しに応じて、gRPC Service名、method名に応じて取り出され、実行されることになります。

// server is used to implement helloworld.GreeterServer.

type server struct{}

// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
...
}

func main() {
...
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
...
}

内部的には、protocol buffer から生成された以下のgrpc.ServiceDes構造体をgrpc.Server構造体に登録しています。

https://github.com/grpc/grpc-go/blob/master/examples/helloworld/helloworld/helloworld.pb.go#L148

func RegisterGreeterServer(s *grpc.Server, srv GreeterServer) {

s.RegisterService(&_Greeter_serviceDesc, srv)
}

var _Greeter_serviceDesc = grpc.ServiceDesc{
ServiceName: "helloworld.Greeter",
HandlerType: (*GreeterServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "SayHello",
Handler: _Greeter_SayHello_Handler,
},
},
Streams: []grpc.StreamDesc{},
Metadata: "helloworld.proto",
}

上記のMethodDesc構造体のMethodNameフィールドをkey, MethodDesc構造体自体をvalueとして、grpc.service構造体のmdフィールドに格納していることが分かります。

そして、このgrpc.service構造体はgrpc.Server構造体のmフィールドに格納されます。

https://github.com/grpc/grpc-go/blob/master/server.go#L426

func (s *Server) RegisterService(sd *ServiceDesc, ss interface{}) {

...
s.register(sd, ss)
}

func (s *Server) register(sd *ServiceDesc, ss interface{}) {
...
srv := &service{
server: ss,
md: make(map[string]*MethodDesc),
sd: make(map[string]*StreamDesc),
mdata: sd.Metadata,
}
for i := range sd.Methods {
d := &sd.Methods[i]
srv.md[d.MethodName] = d
}
...
s.m[sd.ServiceName] = srv
}

https://github.com/grpc/grpc-go/blob/master/server.go#L80

// service consists of the information of the server serving this service and

// the methods in this service.
type service struct {
...
md map[string]*MethodDesc
}

// Server is a gRPC server to serve RPC requests.
type Server struct {
...
m map[string]*service // service name -> service info
...
}

要するに以下のような構造体にそれぞれ作られることになります。

register.png

そのようにgRPC methodが登録されたServerを起動します。

func main() {

lis, err := net.Listen("tcp", port)
...
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
s.Serve(lis)
...
}


クライアントからの呼び出し

今度は、以下のようにprotocol-buffersから生成したgoのクライアントサイドの実装を使って、gRPCサーバーにリクエストします。

https://github.com/grpc/grpc-go/blob/master/examples/helloworld/greeter_client/main.go

func main() {

// Set up a connection to the server.
conn, err := grpc.Dial(address, grpc.WithInsecure())
...
c := pb.NewGreeterClient(conn)
...
r, err := c.SayHello(ctx, &pb.HelloRequest{Name: name})
...
}

これは、内部的には新規にストリームを作成し、gRPCのメソッド呼び出しをしてレスポンスを待っています。

https://github.com/grpc/grpc-go/blob/master/examples/helloworld/helloworld/helloworld.pb.go#L133

func (c *greeterClient) SayHello(ctx context.Context, in *HelloRequest, opts ...grpc.CallOption) (*HelloReply, error) {

out := new(HelloReply)
err := c.cc.Invoke(ctx, "/helloworld.Greeter/SayHello", in, out, opts...)
...
return out, nil
}

https://github.com/grpc/grpc-go/blob/master/call.go#L65

func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {

cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)
if err != nil {
return err
}
if err := cs.SendMsg(req); err != nil {
return err
}
return cs.RecvMsg(reply)
}

実際にwiresharkを使って、この通信をみてみます。

request.png

複数のフレームが送受信されていることが確認できます。そして全てのフレームはStreamIDが1になっており、一つのストリームで通信されています。


gRPC Server 側の挙動を確認する

今度はメソッド呼び出しされたgRPCサーバー側の挙動を確認します。

grpc.png

概要で説明したように以下の流れでgRPC method の呼び出しが行われます。


  1. TCP Connection の確立

  2. TLSハンドシェイクを並行処理

  3. Streamの読み取りを並行処理

  4. gRPC method の実行を並行処理


TCP/TLSのコネクション確立

先ほどは、以下のようにServer.Serveメソッドを呼び出しました。

func main() {

lis, err := net.Listen("tcp", port)
...
s.Serve(lis)
...
}

そこでは、TCPコネクションが確立されると、net.Connインターフェースを返し、Server.handleRawConnにそれを渡してgoroutineを実行します。

https://github.com/grpc/grpc-go/blob/master/server.go#L545

func (s *Server) Serve(lis net.Listener) error {

...
for {
rawConn, err := lis.Accept()
...
go func() {
s.handleRawConn(rawConn)
s.serveWG.Done()
}()
}
}

そして、Server.handleRawConnでは以下の二つを行います。


  • TLSハンドシェイクによる通信の確立

  • net.Connからtransport.ServerTransportインターフェースを作成

具体的には以下のように、まずnet.ConnからTLSハンドシェイクをし、次に受け取ったTCPコネクションの情報からtransport.ServerTransportインターフェースを作成して、Server.serveStreamsに渡します。

https://github.com/grpc/grpc-go/blob/master/server.go#L639

func (s *Server) handleRawConn(rawConn net.Conn) {

...
conn, authInfo, err := s.useTransportAuthenticator(rawConn)
...
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(conn, authInfo)
...
go func() {
s.serveStreams(st)
s.removeConn(st)
}()
}

このnewHTTP2Transporで作成されているtransport.ServerTransportインターフェースはgRPCのサーバーサイドがストリームを扱うための実装になっています。

https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L631

// ServerTransport is the common interface for all gRPC server-side transport

// implementations.
//
// Methods may be called concurrently from multiple goroutines, but
// Write methods for a given Stream will be called serially.
type ServerTransport interface {
// HandleStreams receives incoming streams using the given handler.
HandleStreams(func(*Stream), func(context.Context, string) context.Context)

// WriteHeader sends the header metadata for the given stream.
// WriteHeader may not be called on all streams.
WriteHeader(s *Stream, md metadata.MD) error

// Write sends the data for the given stream.
// Write may not be called on all streams.
Write(s *Stream, hdr []byte, data []byte, opts *Options) error
...
}

実際には、transport.ServerTransportインターフェースを満たすtransport.http2Server構造体を作成して返しています。

https://github.com/grpc/grpc-go/blob/master/server.go#L682

https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L488

https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L126

// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is

// returned if something goes wrong.
func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) {
...
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
...
t := &http2Server{
...
conn: conn,
framer: framer,
readerDone: make(chan struct{}),
activeStreams: make(map[uint32]*Stream),
stats: config.StatsHandler,
initialWindowSize: iwz,
}
...
return t, nil
}

ここのフィールドをみるとフレーム扱うためのframer構造体へのポインタや、現在の接続下におけるストリームなどHTTP/2を扱うための情報が格納されていることが分かります。

そして、その構造体をServer.serveStreamsに渡してgoroutineを起動します。

func (s *Server) handleRawConn(rawConn net.Conn) {

...
st := s.newHTTP2Transport(conn, authInfo)
...
go func() {
s.serveStreams(st)
s.removeConn(st)
}()
}


ストリームの読み取り

呼び出されたServer.serveStreamsでは、先ほど渡されたtransport.ServerTransportインターフェースのHandleStreamsを実行します。

第一引数には、Server.handleStreamを実行する関数が渡されています。これは実際にStreamから読み取ったデータを使ってgRPC method を実行する役割を担っています。(概要図参照)

https://github.com/grpc/grpc-go/blob/master/server.go#L713

func (s *Server) serveStreams(st transport.ServerTransport) {

...
// Streamの読み取りをし、goroutineでServer.handleStreamを実行する
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
// Streamの読み取り結果に応じた gRPC method の呼び出しをする
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
},
...)
...
}

transport.ServerTransportインターフェースは上で確認したように、実際はtransport.http2Server構造体なので、以下が実行されます。

https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L428

// HandleStreams receives incoming streams using the given handler. This is

// typically run in a separate goroutine.
// traceCtx attaches trace to ctx and returns the new context.
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
...
for {
frame, err := t.framer.fr.ReadFrame()
...
if err != nil {
if err == io.EOF || err == io.ErrUnexpectedEOF {
t.Close()
return
}
...
return
}
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
}

ここでは、http2パッケージのFramer.ReadFrameメソッドを実行してHTTP/2のフレームを順次読み取りフレームタイプに応じて処理を分岐させています。フレームタイプとは、ストリームの状態を管理するためのもので、現在や残りのフレームをどのように解釈されるかを表します。

https://summerwind.jp/docs/rfc7540/#section6

まず、クライアントが新規のストリームを開始すると、HEADERSフレームが送信されます。

このHEADERSフレームはヘッダに新規ストリームのIDを持ち、ペイロードにHTTPヘッダのKeyValueのペアを持ちます。

request_headers.png

そして、そのフレームを受け取ると、http2Server.operateHeadersが実行されます。

そこでは、フレームから所属するストリームのIDを取得してtransport.Stream構造体を作成し、それを引数で受け取る関数に渡して実行しています。

さらにtransport.newRecevBufferによりフレームから読み取ったデータを受け渡しするChannel作成し、構造体に登録しています。

https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L188

https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L287

https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L64

// operateHeader takes action on the decoded headers.

func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {
streamID := frame.Header().StreamID
...
// Channelの作成
buf := newRecvBuffer()
// 新規ストリームを表現する構造体の作成
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
recvCompress: state.data.encoding,
method: state.data.method,
contentSubtype: state.data.contentSubtype,
}
...
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
}
...
// 引数の関数に作成したtransport.Stream構造体を渡す
handle(s)
return false
}

HTTP/2では新規ストリームがHEADERSフレームにより作成されると、ペイロードにアプリケーションデータを格納しDATAフレームが送信されます。

request_data.png

フレームタイプがDATA(http2.DataFrame)の場合は、http2Server.handleDataが実行されます。

ここでは、フレームからtransport.Stream構造体を取得して、そこに登録されたChannelに対して読み取ったフレームのデータを書き込んでいます。

https://github.com/grpc/grpc-go/blob/master/internal/transport/http2_server.go#L544

func (t *http2Server) handleData(f *http2.DataFrame) {

...
// Select the right stream to dispatch.
s, ok := t.getStream(f)
...
if size > 0 {
...
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
if len(f.Data()) > 0 {
data := make([]byte, len(f.Data()))
copy(data, f.Data())
s.write(recvMsg{data: data})
}
}
if f.Header().Flags.Has(http2.FlagDataEndStream) {
// Received the end of stream from the client.
s.compareAndSwapState(streamActive, streamReadDone)
s.write(recvMsg{err: io.EOF})
}
}

内部的にはバッファありChannelにrecvMsg構造体を送信しています。

https://github.com/grpc/grpc-go/blob/master/internal/transport/transport.go#L71

func (s *Stream) write(m recvMsg) {

s.buf.put(m)
}

type recvBuffer struct {
c chan recvMsg
mu sync.Mutex
backlog []recvMsg
err error
}

func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
...
if len(b.backlog) == 0 {
select {
case b.c <- r:
b.mu.Unlock()
return
default:
}
}
b.backlog = append(b.backlog, r)
b.mu.Unlock()
}


読み取ったストリームからgRPC methodの実行

呼び出し元の実装を再び確認すると、このtrasport.Stream構造体を受け取る関数の実行によりServer.handleStreamメソッドが実行されてることが分かります。

https://github.com/grpc/grpc-go/blob/master/server.go#L713

func (s *Server) serveStreams(st transport.ServerTransport) {

...
st.HandleStreams(func(stream *transport.Stream) {
wg.Add(1)
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
},
...)
...
}

このServer.handleStreamメソッドは先ほど確認した、HEADERSフレームを読み取ってからtrasport.Stream構造体を引数に呼び出されます。

HEADERSフレームのペイロードにはHTTPヘッダの情報が格納されており、その一つであるヘッダキーpathには、Service名とMethod名がセットされています。

request_only_headers.png

それをstream.Method()で取り出しているので以下のような結果が返ってきます。

/helloworld.Greeter/SayHello

次にそれを使って、gRPCサーバー起動前にgrpc.Serverのmフィールドに登録したgrpc.service構造体を取り出します。

これは最初に説明したように、Service名とgrpc.MethodDesc構造体を対応付けており、さらにmethod名から対応するgRPC methodを取得できます。

register.png

そして、Server.processUnaryRPCメソッドを呼び出します。

https://github.com/grpc/grpc-go/blob/master/server.go#L1248

func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Stream, trInfo *traceInfo) {

// pathヘッダの値を取得 e.g /helloworld.Greeter/SayHello
sm := stream.Method()
...
pos := strings.LastIndex(sm, "/")
...
service := sm[:pos]
method := sm[pos+1:]

// Service名からgrpc.service構造体を取得
srv, knownService := s.m[service]
if knownService {
// Method名からgrpc methodを取得
if md, ok := srv.md[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
...
}
...
}

Server.processUnaryRPCメソッドでは主に以下の四つを実行しています。


  • ストリームを読み取る

  • そのバイト列をリクエストの構造体にUnmarshalする

  • grpc.MethodDesc構造体に登録されたgRPC methodを実行する

  • 実行結果を元にレスポンスをバイト列に書き込む

https://github.com/grpc/grpc-go/blob/master/server.go#L859

func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {

...
// Channel経由でストリームのデータを読み取る
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
...
df := func(v interface{}) error {
// protocol buffer から生成されたリクエストの構造体に読み取ったデータをUnmarshalする
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
...
return nil
}
...
// 登録されていたgRPC methodを読み出す
reply, appErr := md.Handler(srv.server, ctx, df, s.opts.unaryInt)
...
opts := &transport.Options{Last: true}

// その実行結果をレスポンスに書き込む
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
...
}
...
err = t.WriteStatus(stream, status.New(codes.OK, ""))
...
return err
}

これで、gRPCサーバー側の主な処理を確認することができました!


TCPコネクションはいつ終了するのか

HTTP/2は1オリジンに1つのTCPコネクションを持ちます。これによって、ストリームをどれだけ並列化しても複数のTCPコネクションが不要になります。

そのため初回のストリームの作成時にのみTCPの通信に関するオーバーヘッドがかかるようになり、スループットが改善しました。

しかし、TCPコネクションがいつ終了するのかは、RFCを読んでもあまり言及されていませんでした。

少なくとも、GOAWAYフレームが送信された時に新規のストリームの作成を止めて、TCPコネクションを終了させるようです。

https://summerwind.jp/docs/rfc7540/#section6-8


The GOAWAY frame (type=0x7) is used to initiate shutdown of a connection or to signal serious error conditions.

GOAWAY allows an endpoint to gracefully stop accepting new streams while still finishing processing of previously established streams.


request.png

実際に確認すると、GOAWAYフレームは送信されておらず、HTTP/2の仕組みを利用せずTCPコネクションを終了させているようです。

https://github.com/grpc/grpc-go/blob/master/server.go#L578

func (s *Server) Serve(lis net.Listener) error {

...
defer func() {
s.mu.Lock()
if s.lis != nil && s.lis[ls] {
ls.Close()
delete(s.lis, ls)
}
s.mu.Unlock()
}()
...
}

このことから、rangeループでUnary RPCを複数回呼び出すよりも、Client Streaming RPCを利用した方が、効率が良さそうです。前者はTCPコネクションを確立、ストリームの作成、ストリームの終了、TCPコネクションの終了を繰り返すのに対して、後者は一度で済むからです。


所感

grpc-goの内部実装を調べることで、goroutineの扱い方、channelの使い方、HTTP2の仕組み、TCPの仕組みなどまとめて学ぶことができたのでよかったです。