はじめに
前回の記事で、gRPCのAPIタイプの内、1リクエストに対して1レスポンスを返すUnaryを実装しました。
今回は1回のコネクションで複数のリクエストやレスポンスを送るタイプである、Streaming APIを実装してみました。
Streaming API
Streaming APIはHTTP/2の恩恵を受けており、リクエスト/レスポンスのたびにコネクションを確立しないというメリットをもちます。
3種類のタイプがあり、それぞれが以下のような特長をもっています。
- Server Streaming: 1リクエストに対して複数レスポンスを返す。サーバが大量のデータを送りたいとき(ライブ配信やチャット)に使い、リクエストを受け取らずとも非同期でレスポンスを返す。
- Client Streaming: 複数リクエストに対して1レスポンスを返す。クライアントが大量のデータを送りたいとき(データアップロードなど)に使い、サーバは非同期でレスポンスを返すことができる。
- Bi Directional Streaming: 複数リクエストに対して複数レスポンスを返す。リクエストとレスポンスの数は合っている必要がなく、サーバは返すレスポンス数を選択することができる。非同期でデータを送りあうようなチャットや長時間の接続を行うゲームのオンライン対戦などの用途で使用する。
実装
スキーマ作成、サーバ実装、クライアント実装の順で行います。
スキーマ作成(.protoファイルの作成)
Unaryのときと同様にスキーマを作成します。
rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManytimesResponse) {};
のように、リクエストやレスポンスの前にstream
をつけることで、各Streaming APIを実現します。
message Greeting {
string first_name = 1;
string last_name = 2;
}
message GreetManyTimesRequest {
Greeting greeting = 1;
}
message GreetManytimesResponse {
string result = 1;
}
message LongGreetRequest {
Greeting greeting = 1;
}
message LongGreetResponse {
string result = 1;
}
message GreetEveryoneRequest {
Greeting greeting = 1;
}
message GreetEveryoneResponse {
string result = 1;
}
service GreetService{
// Server Streaming
rpc GreetManyTimes(GreetManyTimesRequest) returns (stream GreetManytimesResponse) {};
// Client Streaming
rpc LongGreet(stream LongGreetRequest) returns (LongGreetResponse) {};
// Bi Directional Streaming
rpc GreetEveryone(stream GreetEveryoneRequest) returns (stream GreetEveryoneResponse) {};
}
サーバ実装
protoc greet/greetpb/greet.proto --go_out=plugins=grpc:.
でgreet.pb.go
ファイルを生成後、サーバ実装を行います。
GreetManyTimes()
はServer Streamingのため、実装では1000 msごとに10回レスポンスを返すようにしています。
LongGreet()
はClient Streamingのため、リクエストが終わるまでresult
を連結して、最終的なレスポンスを一つのResult
として返します。
GreetEveryone()
はBi Directional Streamingで、今回はすべてのリクエストに対してレスポンスを返しています(レスポンスの数は自由に選択するような実装も可能です)。
// Server Streaming
func (*server) GreetManyTimes(req *greetpb.GreetManyTimesRequest, stream greetpb.GreetService_GreetManyTimesServer) error {
fmt.Printf("GreetManyTimes function was invoked with %v\n", req)
firstName := req.GetGreeting().GetFirstName()
for i := 0; i < 10; i++ {
result := "Hello " + firstName + " number " + strconv.Itoa(i)
res := &greetpb.GreetManytimesResponse{
Result: result,
}
stream.Send(res)
time.Sleep(1000 * time.Millisecond)
}
return nil
}
// Client Streaming
func (*server) LongGreet(stream greetpb.GreetService_LongGreetServer) error {
fmt.Printf("LongGreet function was invoked with a streaming request\n")
result := ""
for {
req, err := stream.Recv()
if err == io.EOF {
// we have finished reading the client stream
return stream.SendAndClose(&greetpb.LongGreetResponse{
Result: result,
})
}
if err != nil {
log.Fatalf("Error while reading client stream: %v", err)
}
firstName := req.GetGreeting().GetFirstName()
result += "Hello " + firstName + "! "
}
}
// Bi Directional Streaming
func (*server) GreetEveryone(stream greetpb.GreetService_GreetEveryoneServer) error {
fmt.Printf("GreetEveryone function was invoked with a streaming request\n")
for {
req, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
log.Fatalf("Error while reading client stream: %v", err)
return err
}
firstName := req.GetGreeting().GetFirstName()
result := "Hello " + firstName + "! "
sendErr := stream.Send(&greetpb.GreetEveryoneResponse{
Result: result,
})
if sendErr != nil {
log.Fatalf("Error while sending data to client: %v", sendErr)
return sendErr
}
}
}
クライアント実装
最後にクライアント側を実装します。
doServerStreaming()
では、resStream, err := c.GreetManyTimes(context.Background(), req)
でレスポンスを取得して、for
内でメッセージを一つずつ出力しています。
doClientStreaming()
では、Streamingで送りたいリクエストrequests
をつくり、1000 msごとにstream.Send(req)
としてサーバに送ります。
そして、res, err := stream.CloseAndRecv()
で1つのレスポンスを取得し、最後に出力します。
doBiDiStreaming()
では、doClientStreaming()
と同様にstream.Send(req)
でサーバにリクエストを送ります。
Bi Directional Streamingは非同期でリクエストとレスポンスを送りあうので、この辺りの処理をgoroutineで実装します。
すべてのリクエストとレスポンスが修了したら、<-waitc
で修了します。
// Server Streaming
func doServerStreaming(c greetpb.GreetServiceClient) {
fmt.Println("Starting to do a Server Streaming RPC...")
req := &greetpb.GreetManyTimesRequest{
Greeting: &greetpb.Greeting{
FirstName: "Stephane",
LastName: "Maarek",
},
}
resStream, err := c.GreetManyTimes(context.Background(), req)
if err != nil {
log.Fatalf("error while calling GreetManyTimes RPC: %v", err)
}
for {
msg, err := resStream.Recv()
if err == io.EOF {
// we've reached the end of the stream
break
}
if err != nil {
log.Fatalf("error while reading stream: %v", err)
}
log.Printf("Response from GreetManyTimes: %v", msg.GetResult())
}
}
// Client Streaming
func doClientStreaming(c greetpb.GreetServiceClient) {
fmt.Println("Starting to do a Client Streaming RPC...")
requests := []*greetpb.LongGreetRequest{
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "Stephane",
},
},
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "John",
},
},
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "Lucy",
},
},
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "Mark",
},
},
&greetpb.LongGreetRequest{
Greeting: &greetpb.Greeting{
FirstName: "Piper",
},
},
}
stream, err := c.LongGreet(context.Background())
if err != nil {
log.Fatalf("error while calling LongGreet: %v", err)
}
// we iterate over our slice and send each message individually
for _, req := range requests {
fmt.Printf("Sending req: %v\n", req)
stream.Send(req)
time.Sleep(1000 * time.Millisecond)
}
res, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("error while receiving response from LongGreet: %v", err)
}
fmt.Printf("LongGreet Response: %v\n", res)
}
// Bi Directional Streaming
func doBiDiStreaming(c greetpb.GreetServiceClient) {
fmt.Println("Starting to do a BiDi Streaming RPC...")
// we create a stream by invoking the client
stream, err := c.GreetEveryone(context.Background())
if err != nil {
log.Fatalf("Error while creating stream: %v", err)
return
}
requests := []*greetpb.GreetEveryoneRequest{
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "Stephane",
},
},
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "John",
},
},
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "Lucy",
},
},
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "Mark",
},
},
&greetpb.GreetEveryoneRequest{
Greeting: &greetpb.Greeting{
FirstName: "Piper",
},
},
}
waitc := make(chan struct{})
// we send a bunch of messages to the client (go routine)
go func() {
// function to send a bunch of messages
for _, req := range requests {
fmt.Printf("Sending message: %v\n", req)
stream.Send(req)
time.Sleep(1000 * time.Millisecond)
}
stream.CloseSend()
}()
// we receive a bunch of messages from the client (go routine)
go func() {
// function to receive a bunch of messages
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Error while receiving: %v", err)
break
}
fmt.Printf("Received: %v\n", res.GetResult())
}
close(waitc)
}()
// block until everything is done
<-waitc
}
参考資料