この記事はMobility Technologies アドベントカレンダー2021の10日目です。
MoTではサーバーサイドのサービス間通信にgRPCを使用しています。
gRPCはprotobufのスキーマ定義から必要なコードが自動生成出来るため、
なんとなくで使えてしまうところがAPI作成の手間と難易度を下げてくれているのですが、
なんとなく
の部分が気持ち悪くもあったためこの機会にGoのgRPCライブラリであるgrpc-goの実装を調べてみました。
ただし、この記事ではスキーマからのコード生成やアプリケーションでの利用方法については説明しません。
gRPCは通信プロトコルにHTTP/2を使用しており、バイナリフレームでのデータ通信を行っています。
このバイナリフレームがサーバー側でどの様に処理されメッセージがサービスへ渡されているのかを追って行きましょう。
記事を読むにあたりHTTP/2に関する基礎的な知識があると理解がしやすくなると思います。
こちらの記事も参考にどうぞ。(宣伝)
Goで見るHTTP/2
gRPC-Go
GoのgRPC実装の本体で、google.golang.org/grpc のからインストールされるgRPCライブラリです
HelloWorldサンプル
今回はgRPC-Goのに付属のHelloWorldサンプルを使って基本的な
serverの動きを追ってみます。
GreeterサービスのSayHelloがHelloRequestを受けてHelloReplyを返すだけの超シンプルなgRPCサーバです。
// The greeting service definition.
service Greeter {
// Sends a greeting
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
// The request message containing the user's name.
message HelloRequest {
string name = 1;
}
// The response message containing the greetings
message HelloReply {
string message = 1;
}
- リクエストメッセージ
{
"name": "Hello"
}
- レスポンスメッセージ
{
"message": "Hello Hello"
}
Serverの受信ログ
SayHelloメソッドへ上記のメッセージを送った際のHTTP/2通信のフレームダンプを取っておきます。
送受信しているフレームの種別/内容やヘッダフレームのフィールド等が確認できます。
server listening at 127.0.0.1:50051
http2: Framer 0x140001c6000: wrote SETTINGS len=6, settings: MAX_FRAME_SIZE=16384
http2: Framer 0x140001c6000: read SETTINGS len=36, settings: ENABLE_PUSH=0, MAX_CONCURRENT_STREAMS=0, INITIAL_WINDOW_SIZE=4194304, MAX_FRAME_SIZE=4194304, MAX_HEADER_LIST_SIZE=8192, UNKNOWN_SETTING_65027=1
http2: Framer 0x140001c6000: read WINDOW_UPDATE len=4 (conn) incr=4128769
http2: Framer 0x140001c6000: read PING len=8 ping="\x00\x00\x00\x00\x00\x00\x00\x00"
http2: Framer 0x140001c6000: wrote SETTINGS flags=ACK len=0
http2: Framer 0x140001c6000: wrote PING flags=ACK len=8 ping="\x00\x00\x00\x00\x00\x00\x00\x00"
## ヘッダーフレーム受信
http2: Framer 0x140001c6000: read HEADERS flags=END_HEADERS stream=1 len=249
http2: decoded hpack field header field ":scheme" = "http"
http2: decoded hpack field header field ":method" = "POST"
http2: decoded hpack field header field ":authority" = "localhost:50051"
http2: decoded hpack field header field ":path" = "/helloworld.Greeter/SayHello"
http2: decoded hpack field header field "te" = "trailers"
http2: decoded hpack field header field "content-type" = "application/grpc"
http2: decoded hpack field header field "user-agent" = "grpc-node/1.24.7 grpc-c/8.0.0 (osx; chttp2; ganges)"
http2: decoded hpack field header field "grpc-accept-encoding" = "identity,deflate,gzip"
http2: decoded hpack field header field "accept-encoding" = "identity,gzip"
http2: Framer 0x140001c6000: read WINDOW_UPDATE stream=1 len=4 incr=5
## データフレーム受信
http2: Framer 0x140001c6000: read DATA flags=END_STREAM stream=1 len=12 data="\x00\x00\x00\x00\a\n\x05Hello"
http2: Framer 0x140001c6000: read WINDOW_UPDATE len=4 (conn) incr=5
Received: Hello
http2: Framer 0x140001c6000: wrote WINDOW_UPDATE len=4 (conn) incr=12
http2: Framer 0x140001c6000: wrote PING len=8 ping="\x02\x04\x10\x10\t\x0e\a\a"
## レスポンスヘッダー送信
http2: Framer 0x140001c6000: wrote HEADERS flags=END_HEADERS stream=1 len=14
## レスポンスデータ送信
http2: Framer 0x140001c6000: wrote DATA stream=1 len=18 data="\x00\x00\x00\x00\r\n\vHello Hello"
http2: Framer 0x140001c6000: wrote HEADERS flags=END_STREAM|END_HEADERS stream=1 len=24
http2: Framer 0x140001c6000: read PING flags=ACK len=8 ping="\x02\x04\x10\x10\t\x0e\a\a"
http2: Framer 0x140001c6000: read SETTINGS flags=ACK len=0
http2: Framer 0x140001c6000: read WINDOW_UPDATE len=4 (conn) incr=13
Server実装
Server起動
サーバーの起動部分のmain.goでは次のようなことをおこなっています。
-
net.Listen
からTCPListenerを取得
lis, err := net.Listen("tcp", fmt.Sprintf("localhost:%d", *port))
- gRPCサーバーの生成
s := grpc.NewServer()
-
RegisterGreeterServer
でserviceInfo(gRPCのサービス情報)をservicesに登録- Greeterのサービス情報はprotobufから生成された
helloworld_grpc.pb.go
内にGreeter_ServiceDescとして定義されており、サービス名/メソッド名/ハンドラとなる関数等の情報が含まれています。
- Greeterのサービス情報はprotobufから生成された
pb.RegisterGreeterServer(s, &server{})
- リスナーを渡してServerを起動する
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
ではこのServe
内では何が行われているのかを追っていきましょう。
かなり様々なことが行われているため、今回はメッセージがどのようにしてサーバーアプリケーションメソッドまで送られているのかに焦点を当てて見て行きます。
TCPコネクションハンドラ生成
Serve内ではTCPリスナーがクライアントからのリクエストを待ち受け、生TCPコネクションを取得しています。
for {
rawConn, err := lis.Accept()
...
コネクションを取得する毎にHandlerとなるゴルーチンを起動して処理を渡し、次のリクエストを待ち受けます。
go func() {
s.handleRawConn(lis.Addr().String(), rawConn)
s.serveWG.Done()
}()
HTTP/2サーバートランスポート生成
handleRawConn
HTTP/2ではデータの送受信をフレーム単位で行いっています。 [Frame Definitions]
コネクションハンドラ内ではHTTP/2トランスポートを生成します。
ここでHTTP/2としての接続を確立して以後の通信をHTTP/2のフレームベースで行います。
// Finish handshaking (HTTP2)
st := s.newHTTP2Transport(rawConn)
HTTP/2トランスポートの生成とプロトコルの有効性チェック等はhttp2_server.go内でおこなっており、
バイナリフレームの読み書きを担うFramer構造体の生成やトランスポートへの登録、コネクションプリフェイスの確認、
Keep-Alive用のゴルーチン起動などもここで行っています。
framer := newFramer(conn, writeBufSize, readBufSize, maxHeaderListSize)
t := &http2Server{
...
framer: framer,
...
}
if !bytes.Equal(preface, clientPreface) {
...
go t.keepalive()
ストリームハンドラ生成
生成されたトランスポートを用いて受信したフレームからストリームを生成するゴルーチンを起動します。
handleRawConn
go func() {
s.serveStreams(st)
s.removeConn(lisAddr, st)
}()
Framerからフレームデータを読み取ります。
HTTP/2ではストリームを用いて1つのTCPコネクションを多重化して扱い、各フレームが持つストリームIDにより平行にデータのやり取りを行います。[Streams and Multiplexing]
frame, err := t.framer.fr.ReadFrame()
取得したフレームをフレームタイプ別に処理します。
HTTP/2ログで表示されていた各フレームタイプはここで各々の処理へスイッチされています。
switch frame := frame.(type) {
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
t.Close()
break
}
case *http2.DataFrame:
t.handleData(frame)
case *http2.RSTStreamFrame:
t.handleRSTStream(frame)
case *http2.SettingsFrame:
t.handleSettings(frame)
case *http2.PingFrame:
t.handlePing(frame)
case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame)
case *http2.GoAwayFrame:
// TODO: Handle GoAway from the client appropriately.
default:
if logger.V(logLevel) {
logger.Errorf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
}
}
ヘッダフレームの受信時にはヘッダデータの内容からStream構造体を生成します。
case *http2.MetaHeadersFrame:
if t.operateHeaders(frame, handle, traceCtx) {
...
Streamは扱うストリームのID、gRPCメソッドのパス情報(今回の例では/helloworld.Greeter/SayHello
)、データの送信ゴルーチンと受信ゴルーチン間での受け渡しを行うrecvMsg型のchannel等を保持し、RPCのトランスポート層を表現します。
つまり、一つのgRPCのリクエスト/レスポンスはこのStream単位で行われる
ことになります。
streamID := frame.Header().StreamID
- recvMsg型channelを持つrecvBuffer生成
buf := newRecvBuffer()
s.trReader = &transportReader{
reader: &recvBufferReader{
ctx: s.ctx,
ctxDone: s.ctxDone,
recv: s.buf,
freeBuffer: t.bufferPool.put,
},
windowHandler: func(n int) {
t.updateWindow(s, uint32(n))
},
}
recvBuffer構造体にチャンネルを保持し、Stream構造体のbuf(recvBuffer)からtrReader(transportReader内のrecvBufferReader)へchannelを通してデータをやりとりします。
- Stream生成
s := &Stream{
id: streamID,
st: t,
buf: buf,
fc: &inFlow{limit: uint32(t.initialWindowSize)},
}
また、ヘッダー内容の妥当性チェックやHTTPメソッドのチェック(POST)等もここで行っています
- 生成したStreamを処理するハンドラ用のゴルーチンを起動
go func() {
defer wg.Done()
s.handleStream(st, stream, s.traceInfo(st, stream))
}()
ストリームデータをgRPCサービスのハンドラへ渡す
Streamからのデータフレームのメッセージを待ち受け、設定されたサービスハンドラのメソッドを呼び出します。
- Streamが受け持つメソッド情報を取得し、
pb.RegisterGreeterServer
で設定したservice情報を取得
sm := stream.Method()
...
service := sm[:pos]
method := sm[pos+1:]
srv, knownService := s.services[service]
if md, ok := srv.methods[method]; ok {
s.processUnaryRPC(t, stream, srv, md, trInfo)
return
}
- ストリームデータの取得(圧縮データの解凍)
d, err := recvAndDecompress(&parser{r: stream}, stream, dc, s.opts.maxReceiveMessageSize, payInfo, decomp)
- ストリームからのデータフレームメッセージの待ち受けと読み取り
pf, d, err := p.recvMsg(maxReceiveMessageSize)
送信側のストリームハンドラのゴルーチンでフレームを読み取り、データフレームのデータをStreamを通して待ち受け側のゴルーチンへ渡します。
- streamを通してrecvMsg型を送信
if len(f.Data()) > 0 {
buffer := t.bufferPool.get()
buffer.Reset()
buffer.Write(f.Data())
s.write(recvMsg{buffer: buffer})
}
- ストリームから取得したgRPCメソッド情報が持つハンドラメソッドを呼び出し
reply, appErr := md.Handler(info.serviceImpl, ctx, df, s.opts.unaryInt)
ここのmd.Handler
の実体はprotobufから自動生成されたhelloworld_grpc.pb.goの_Greeter_SayHello_Handlerです。
この中でgRPCリクエストメッセージのHellowRequest型をデコード用関数に渡してデータフレームから渡されたメッセージをデシリアライズします。
in := new(HelloRequest)
if err := dec(in); err != nil {
return nil, err
}
if err := s.getCodec(stream.ContentSubtype()).Unmarshal(d, v); err != nil {
return status.Errorf(codes.Internal, "grpc: error unmarshalling request: %v", err)
}
gRPCメソッドはサーバーアプリケーション側のSayHelloメソッドを呼び出しリクエストメッセージを渡します。
if interceptor == nil {
return srv.(GreeterServer).SayHello(ctx, in)
}
サーバーアプリケーションからのレスポンスはバイナリデータへ変換後、またStreamを通してクライアント側へ送信されます。
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
...
data, err := encode(s.getCodec(stream.ContentSubtype()), msg)
...
hdr, payload := msgHeader(data, compData)
...
err = t.Write(stream, hdr, payload, opts)
処理分岐概観
ここまで見てきた内容から、サーバー内で行われる通信処理の分岐を大まかに図にするとこの様になります。
- ServerがTCP Connectionを確立
- TCP Connection毎にConnection Handlerとなるゴルーチンを起動
- Connection HandlerはHeaderフレームの内容からStreamID毎にStream Handlerとなるゴルーチンを起動
- Streamが一つのgRPCリクエストに対応してgRPC Handler Methodを呼び出しメッセージデータを送る
これらにより単一のTCPコネクション内でストリームを多重化することで複数のリクエストを処理するHTTP/2の利点を活かしたAPI実装が可能になります。
gRPC-GoではこれらをGo特有のgoroutineとchannelを利用して実装しています。
今回の例はUnary(単一)なリクエストであったため利点が分かりにくいですが、Streamデータを扱う場合は更に大きな利点となります。