5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

gRPC-Goのサーバー実装を読む

Last updated at Posted at 2021-12-10

この記事はMobility Technologies アドベントカレンダー2021の10日目です。

スクリーンショット 2021-12-10 9.08.44.png

MoTではサーバーサイドのサービス間通信にgRPCを使用しています。
gRPCはprotobufのスキーマ定義から必要なコードが自動生成出来るため、
なんとなくで使えてしまうところがAPI作成の手間と難易度を下げてくれているのですが、
なんとなくの部分が気持ち悪くもあったためこの機会にGoのgRPCライブラリであるgrpc-goの実装を調べてみました。
ただし、この記事ではスキーマからのコード生成やアプリケーションでの利用方法については説明しません。

gRPCは通信プロトコルにHTTP/2を使用しており、バイナリフレームでのデータ通信を行っています。
このバイナリフレームがサーバー側でどの様に処理されメッセージがサービスへ渡されているのかを追って行きましょう。
記事を読むにあたりHTTP/2に関する基礎的な知識があると理解がしやすくなると思います。
こちらの記事も参考にどうぞ。(宣伝)
Goで見るHTTP/2

gRPC-Go

GoのgRPC実装の本体で、google.golang.org/grpc のからインストールされるgRPCライブラリです

HelloWorldサンプル

今回はgRPC-Goのに付属のHelloWorldサンプルを使って基本的な
serverの動きを追ってみます。

GreeterサービスのSayHelloがHelloRequestを受けてHelloReplyを返すだけの超シンプルなgRPCサーバです。

helloworld.proto

// The greeting service definition.
service Greeter {
  // Sends a greeting
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

// The request message containing the user's name.
message HelloRequest {
  string name = 1;
}

// The response message containing the greetings
message HelloReply {
  string message = 1;
}
  • リクエストメッセージ
{
  "name": "Hello"
}
  • レスポンスメッセージ
{
  "message": "Hello Hello"
}

Serverの受信ログ

SayHelloメソッドへ上記のメッセージを送った際のHTTP/2通信のフレームダンプを取っておきます。
送受信しているフレームの種別/内容やヘッダフレームのフィールド等が確認できます。

server listening at 127.0.0.1:50051
http2: Framer 0x140001c6000: wrote SETTINGS len=6, settings: MAX_FRAME_SIZE=16384
http2: Framer 0x140001c6000: read SETTINGS len=36, settings: ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=4194304, MAX_FRAME_SIZE=4194304, MAX_HEADER_LIST_SIZE=8192, UNKNOWN_SETTING_65027=1
http2: Framer 0x140001c6000: read WINDOW_UPDATE len=4 (conn) incr=4128769
http2: Framer 0x140001c6000: read PING len=8 ping="\x00\x00\x00\x00\x00\x00\x00\x00"
http2: Framer 0x140001c6000: wrote SETTINGS flags=ACK len=0
http2: Framer 0x140001c6000: wrote PING flags=ACK len=8 ping="\x00\x00\x00\x00\x00\x00\x00\x00"

## ヘッダーフレーム受信
http2: Framer 0x140001c6000: read HEADERS flags=END_HEADERS stream=1 len=249
http2: decoded hpack field header field ":scheme" = "http"
http2: decoded hpack field header field ":method" = "POST"
http2: decoded hpack field header field ":authority" = "localhost:50051"
http2: decoded hpack field header field ":path" = "/helloworld.Greeter/SayHello"
http2: decoded hpack field header field "te" = "trailers"
http2: decoded hpack field header field "content-type" = "application/grpc"
http2: decoded hpack field header field "user-agent" = "grpc-node/1.24.7 grpc-c/8.0.0 (osx; chttp2; ganges)"
http2: decoded hpack field header field "grpc-accept-encoding" = "identity,deflate,gzip"
http2: decoded hpack field header field "accept-encoding" = "identity,gzip"

http2: Framer 0x140001c6000: read WINDOW_UPDATE stream=1 len=4 incr=5

## データフレーム受信
http2: Framer 0x140001c6000: read DATA flags=END_STREAM stream=1 len=12 data="\x00\x00\x00\x00\a\n\x05Hello"
http2: Framer 0x140001c6000: read WINDOW_UPDATE len=4 (conn) incr=5
Received: Hello

http2: Framer 0x140001c6000: wrote WINDOW_UPDATE len=4 (conn) incr=12
http2: Framer 0x140001c6000: wrote PING len=8 ping="\x02\x04\x10\x10\t\x0e\a\a"

## レスポンスヘッダー送信
http2: Framer 0x140001c6000: wrote HEADERS flags=END_HEADERS stream=1 len=14

## レスポンスデータ送信
http2: Framer 0x140001c6000: wrote DATA stream=1 len=18 data="\x00\x00\x00\x00\r\n\vHello Hello"
http2: Framer 0x140001c6000: wrote HEADERS flags=END_STREAM|END_HEADERS stream=1 len=24

http2: Framer 0x140001c6000: read PING flags=ACK len=8 ping="\x02\x04\x10\x10\t\x0e\a\a"
http2: Framer 0x140001c6000: read SETTINGS flags=ACK len=0
http2: Framer 0x140001c6000: read WINDOW_UPDATE len=4 (conn) incr=13

Server実装

Server起動

サーバーの起動部分のmain.goでは次のようなことをおこなっています。

  • net.ListenからTCPListenerを取得
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
  • gRPCサーバーの生成
s := grpc.NewServer()
  • RegisterGreeterServerでserviceInfo(gRPCのサービス情報)をservicesに登録
    • Greeterのサービス情報はprotobufから生成されたhelloworld_grpc.pb.go内にGreeter_ServiceDescとして定義されており、サービス名/メソッド名/ハンドラとなる関数等の情報が含まれています。
pb.RegisterGreeterServer(s, &server{})
  • リスナーを渡してServerを起動する
if err := s.Serve(lis); err != nil {
	log.Fatalf("failed to serve: %v", err)
}

ではこのServe内では何が行われているのかを追っていきましょう。

かなり様々なことが行われているため、今回はメッセージがどのようにしてサーバーアプリケーションメソッドまで送られているのかに焦点を当てて見て行きます。

TCPコネクションハンドラ生成

grpc-go/server.go
Serve

Serve内ではTCPリスナーがクライアントからのリクエストを待ち受け、生TCPコネクションを取得しています。

for {
	rawConn, err := lis.Accept()
...

コネクションを取得する毎にHandlerとなるゴルーチンを起動して処理を渡し、次のリクエストを待ち受けます。

go func() {
	s.handleRawConn(lis.Addr().String(), rawConn)
	s.serveWG.Done()
}()

HTTP/2サーバートランスポート生成

handleRawConn
HTTP/2ではデータの送受信をフレーム単位で行いっています。 [Frame Definitions]
コネクションハンドラ内ではHTTP/2トランスポートを生成します。
ここでHTTP/2としての接続を確立して以後の通信をHTTP/2のフレームベースで行います。

// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(rawConn)

HTTP/2トランスポートの生成とプロトコルの有効性チェック等はhttp2_server.go内でおこなっており、
バイナリフレームの読み書きを担うFramer構造体の生成やトランスポートへの登録、コネクションプリフェイスの確認、
Keep-Alive用のゴルーチン起動などもここで行っています。

NewServerTransport

framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
t := &http2Server{
    ...
	framer:            framer,
    ...
}
if !bytes.Equal(preface, clientPreface) {
...
go t.keepalive()

ストリームハンドラ生成

生成されたトランスポートを用いて受信したフレームからストリームを生成するゴルーチンを起動します。
handleRawConn

go func() {
	s.serveStreams(st)
	s.removeConn(lisAddr, st)
}()

Framerからフレームデータを読み取ります。
HTTP/2ではストリームを用いて1つのTCPコネクションを多重化して扱い、各フレームが持つストリームIDにより平行にデータのやり取りを行います。[Streams and Multiplexing]

HandleStreams

frame, err := t.framer.fr.ReadFrame()

取得したフレームをフレームタイプ別に処理します。
HTTP/2ログで表示されていた各フレームタイプはここで各々の処理へスイッチされています。

switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
	if t.operateHeaders(frame, handle, traceCtx) {
		t.Close()
		break
	}
case *http2.DataFrame:
	t.handleData(frame)
case *http2.RSTStreamFrame:
	t.handleRSTStream(frame)
case *http2.SettingsFrame:
	t.handleSettings(frame)
case *http2.PingFrame:
	t.handlePing(frame)
case *http2.WindowUpdateFrame:
	t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
	// TODO: Handle GoAway from the client appropriately.
default:
	if logger.V(logLevel) {
		logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
	}
}

ヘッダフレームの受信時にはヘッダデータの内容からStream構造体を生成します。

case *http2.MetaHeadersFrame:
	if t.operateHeaders(frame, handle, traceCtx) {
    ...

Streamは扱うストリームのID、gRPCメソッドのパス情報(今回の例では/helloworld.Greeter/SayHello)、データの送信ゴルーチンと受信ゴルーチン間での受け渡しを行うrecvMsg型のchannel等を保持し、RPCのトランスポート層を表現します。
つまり、一つのgRPCのリクエスト/レスポンスはこのStream単位で行われることになります。

operateHeaders

streamID := frame.Header().StreamID
buf := newRecvBuffer()
s.trReader = &transportReader{
	reader: &recvBufferReader{
		ctx:        s.ctx,
		ctxDone:    s.ctxDone,
		recv:       s.buf,
		freeBuffer: t.bufferPool.put,
	},
	windowHandler: func(n int) {
		t.updateWindow(s, uint32(n))
	},
}

recvBuffer構造体にチャンネルを保持し、Stream構造体のbuf(recvBuffer)からtrReader(transportReader内のrecvBufferReader)へchannelを通してデータをやりとりします。

  • Stream生成
s := &Stream{
	id:  streamID,
	st:  t,
	buf: buf,
	fc:  &inFlow{limit: uint32(t.initialWindowSize)},
}

また、ヘッダー内容の妥当性チェックやHTTPメソッドのチェック(POST)等もここで行っています

  • 生成したStreamを処理するハンドラ用のゴルーチンを起動

serveStreams

go func() {
	defer wg.Done()
	s.handleStream(st, stream, s.traceInfo(st, stream))
}()

ストリームデータをgRPCサービスのハンドラへ渡す

Streamからのデータフレームのメッセージを待ち受け、設定されたサービスハンドラのメソッドを呼び出します。

  • Streamが受け持つメソッド情報を取得し、pb.RegisterGreeterServerで設定したservice情報を取得

handleStream

sm := stream.Method()
...
service := sm[:pos]
method := sm[pos+1:]

srv, knownService := s.services[service]
if md, ok := srv.methods[method]; ok {
	s.processUnaryRPC(t, stream, srv, md, trInfo)
	return
}
  • ストリームデータの取得(圧縮データの解凍)

processUnaryRPC

d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
  • ストリームからのデータフレームメッセージの待ち受けと読み取り

recvAndDecompress

pf, d, err := p.recvMsg(maxReceiveMessageSize)

送信側のストリームハンドラのゴルーチンでフレームを読み取り、データフレームのデータをStreamを通して待ち受け側のゴルーチンへ渡します。

  • streamを通してrecvMsg型を送信

handleData

if len(f.Data()) > 0 {
	buffer := t.bufferPool.get()
	buffer.Reset()
	buffer.Write(f.Data())
	s.write(recvMsg{buffer: buffer})
}
  • ストリームから取得したgRPCメソッド情報が持つハンドラメソッドを呼び出し

processUnaryRPC

reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)

ここのmd.Handlerの実体はprotobufから自動生成されたhelloworld_grpc.pb.go_Greeter_SayHello_Handlerです。
この中でgRPCリクエストメッセージのHellowRequest型をデコード用関数に渡してデータフレームから渡されたメッセージをデシリアライズします。

in := new(HelloRequest)
	if err := dec(in); err != nil {
		return nil, err
	}
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
	return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}

gRPCメソッドはサーバーアプリケーション側のSayHelloメソッドを呼び出しリクエストメッセージを渡します。

if interceptor == nil {
	return srv.(GreeterServer).SayHello(ctx, in)
}

サーバーアプリケーションからのレスポンスはバイナリデータへ変換後、またStreamを通してクライアント側へ送信されます。

if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
...

sendResponse

data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
...
hdr, payload := msgHeader(data, compData)
...
err = t.Write(stream, hdr, payload, opts)

処理分岐概観

ここまで見てきた内容から、サーバー内で行われる通信処理の分岐を大まかに図にするとこの様になります。

overview.png

  • ServerがTCP Connectionを確立
  • TCP Connection毎にConnection Handlerとなるゴルーチンを起動
  • Connection HandlerはHeaderフレームの内容からStreamID毎にStream Handlerとなるゴルーチンを起動
  • Streamが一つのgRPCリクエストに対応してgRPC Handler Methodを呼び出しメッセージデータを送る

これらにより単一のTCPコネクション内でストリームを多重化することで複数のリクエストを処理するHTTP/2の利点を活かしたAPI実装が可能になります。
gRPC-GoではこれらをGo特有のgoroutineとchannelを利用して実装しています。

今回の例はUnary(単一)なリクエストであったため利点が分かりにくいですが、Streamデータを扱う場合は更に大きな利点となります。

5
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?