下記の前回記事で紹介したgRPCでのUnary APIに続き、今回はServer streaming APIのハンズオンを行います。
Unary APIやgRPCについては下記をご覧ください。
https://qiita.com/shiei_kawa/items/6a5dabce570657b9e2cc
Server Streaming APIとは
前回の一つのリクエストに対し、一つのレスポンスが返ってくるものとは異なり、1つのリクエストに対して複数のレスポンスが返ってくるAPIになります。
今回はリクエストで投げられた数字を2から順番に割っていき、割り切れた数字を順番にレスポンスしていくAPIを作成します。
前回と同じようにまずprotoファイルの作成から行なっていきます。
protoファイルの作成
syntax = "proto3";
package prime;
option go_package="primepb";
message PrimeRequest{
int64 num = 1;
}
message PrimeResponse{
int64 result = 1;
}
service PrimeService{
rpc PrimeNumber(PrimeRequest) returns (stream PrimeResponse){};
}
protoファイルから雛形コードの生成
次に上記のprotoファイルからコードを生成していきます。
以下のコマンドを叩いてください。
protoc {protoファイル} --go_out=plugins=grpc:.
すると何と先ほどのprotoファイルと同階層に下記のようなコードが生成されているはずです。
// Code generated by protoc-gen-go. DO NOT EDIT.
// source: prime/primepb/prime.proto
package primepb
import (
context "context"
fmt "fmt"
math "math"
proto "github.com/golang/protobuf/proto"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package
type PrimeRequest struct {
Num int64 `protobuf:"varint,1,opt,name=num,proto3" json:"num,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int64 `json:"-"`
}
func (m *PrimeRequest) Reset() { *m = PrimeRequest{} }
func (m *PrimeRequest) String() string { return proto.CompactTextString(m) }
func (*PrimeRequest) ProtoMessage() {}
func (*PrimeRequest) Descriptor() ([]byte, []int) {
return fileDescriptor_b8d5732fce9c712c, []int{0}
}
func (m *PrimeRequest) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PrimeRequest.Unmarshal(m, b)
}
func (m *PrimeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_PrimeRequest.Marshal(b, m, deterministic)
}
func (m *PrimeRequest) XXX_Merge(src proto.Message) {
xxx_messageInfo_PrimeRequest.Merge(m, src)
}
func (m *PrimeRequest) XXX_Size() int {
return xxx_messageInfo_PrimeRequest.Size(m)
}
func (m *PrimeRequest) XXX_DiscardUnknown() {
xxx_messageInfo_PrimeRequest.DiscardUnknown(m)
}
var xxx_messageInfo_PrimeRequest proto.InternalMessageInfo
func (m *PrimeRequest) GetNum() int64 {
if m != nil {
return m.Num
}
return 0
}
type PrimeResponse struct {
Result int64 `protobuf:"varint,1,opt,name=result,proto3" json:"result,omitempty"`
XXX_NoUnkeyedLiteral struct{} `json:"-"`
XXX_unrecognized []byte `json:"-"`
XXX_sizecache int64 `json:"-"`
}
func (m *PrimeResponse) Reset() { *m = PrimeResponse{} }
func (m *PrimeResponse) String() string { return proto.CompactTextString(m) }
func (*PrimeResponse) ProtoMessage() {}
func (*PrimeResponse) Descriptor() ([]byte, []int) {
return fileDescriptor_b8d5732fce9c712c, []int{1}
}
func (m *PrimeResponse) XXX_Unmarshal(b []byte) error {
return xxx_messageInfo_PrimeResponse.Unmarshal(m, b)
}
func (m *PrimeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
return xxx_messageInfo_PrimeResponse.Marshal(b, m, deterministic)
}
func (m *PrimeResponse) XXX_Merge(src proto.Message) {
xxx_messageInfo_PrimeResponse.Merge(m, src)
}
func (m *PrimeResponse) XXX_Size() int {
return xxx_messageInfo_PrimeResponse.Size(m)
}
func (m *PrimeResponse) XXX_DiscardUnknown() {
xxx_messageInfo_PrimeResponse.DiscardUnknown(m)
}
var xxx_messageInfo_PrimeResponse proto.InternalMessageInfo
func (m *PrimeResponse) GetResult() int64 {
if m != nil {
return m.Result
}
return 0
}
func init() {
proto.RegisterType((*PrimeRequest)(nil), "prime.PrimeRequest")
proto.RegisterType((*PrimeResponse)(nil), "prime.PrimeResponse")
}
func init() { proto.RegisterFile("prime/primepb/prime.proto", fileDescriptor_b8d5732fce9c712c) }
var fileDescriptor_b8d5732fce9c712c = []byte{
// 154 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2c, 0x28, 0xca, 0xcc,
0x4d, 0xd5, 0x07, 0x93, 0x05, 0x49, 0x10, 0x5a, 0xaf, 0xa0, 0x28, 0xbf, 0x24, 0x5f, 0x88, 0x15,
0xcc, 0x51, 0x52, 0xe0, 0xe2, 0x09, 0x00, 0x31, 0x82, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84,
0x04, 0xb8, 0x98, 0xf3, 0x4a, 0x73, 0x25, 0x18, 0x15, 0x18, 0x35, 0x98, 0x83, 0x40, 0x4c, 0x25,
0x75, 0x2e, 0x5e, 0xa8, 0x8a, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x21, 0x31, 0x2e, 0xb6, 0xa2,
0xd4, 0xe2, 0xd2, 0x9c, 0x12, 0xa8, 0x2a, 0x28, 0xcf, 0xc8, 0x07, 0x6a, 0x54, 0x70, 0x6a, 0x51,
0x59, 0x66, 0x72, 0xaa, 0x90, 0x0d, 0x17, 0x37, 0x98, 0xef, 0x57, 0x9a, 0x9b, 0x94, 0x5a, 0x24,
0x24, 0xac, 0x07, 0xb1, 0x1e, 0xd9, 0x3a, 0x29, 0x11, 0x54, 0x41, 0x88, 0x0d, 0x4a, 0x0c, 0x06,
0x8c, 0x4e, 0x9c, 0x51, 0xec, 0x50, 0x67, 0x27, 0xb1, 0x81, 0x5d, 0x6c, 0x0c, 0x08, 0x00, 0x00,
0xff, 0xff, 0x29, 0x78, 0x12, 0xd8, 0xce, 0x00, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// PrimeServiceClient is the client API for PrimeService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type PrimeServiceClient interface {
PrimeNumber(ctx context.Context, in *PrimeRequest, opts ...grpc.CallOption) (PrimeService_PrimeNumberClient, error)
}
type primeServiceClient struct {
cc *grpc.ClientConn
}
func NewPrimeServiceClient(cc *grpc.ClientConn) PrimeServiceClient {
return &primeServiceClient{cc}
}
func (c *primeServiceClient) PrimeNumber(ctx context.Context, in *PrimeRequest, opts ...grpc.CallOption) (PrimeService_PrimeNumberClient, error) {
stream, err := c.cc.NewStream(ctx, &_PrimeService_serviceDesc.Streams[0], "/prime.PrimeService/PrimeNumber", opts...)
if err != nil {
return nil, err
}
x := &primeServicePrimeNumberClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type PrimeService_PrimeNumberClient interface {
Recv() (*PrimeResponse, error)
grpc.ClientStream
}
type primeServicePrimeNumberClient struct {
grpc.ClientStream
}
func (x *primeServicePrimeNumberClient) Recv() (*PrimeResponse, error) {
m := new(PrimeResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
// PrimeServiceServer is the server API for PrimeService service.
type PrimeServiceServer interface {
PrimeNumber(*PrimeRequest, PrimeService_PrimeNumberServer) error
}
// UnimplementedPrimeServiceServer can be embedded to have forward compatible implementations.
type UnimplementedPrimeServiceServer struct {
}
func (*UnimplementedPrimeServiceServer) PrimeNumber(req *PrimeRequest, srv PrimeService_PrimeNumberServer) error {
return status.Errorf(codes.Unimplemented, "method PrimeNumber not implemented")
}
func RegisterPrimeServiceServer(s *grpc.Server, srv PrimeServiceServer) {
s.RegisterService(&_PrimeService_serviceDesc, srv)
}
func _PrimeService_PrimeNumber_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(PrimeRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(PrimeServiceServer).PrimeNumber(m, &primeServicePrimeNumberServer{stream})
}
type PrimeService_PrimeNumberServer interface {
Send(*PrimeResponse) error
grpc.ServerStream
}
type primeServicePrimeNumberServer struct {
grpc.ServerStream
}
func (x *primeServicePrimeNumberServer) Send(m *PrimeResponse) error {
return x.ServerStream.SendMsg(m)
}
var _PrimeService_serviceDesc = grpc.ServiceDesc{
ServiceName: "prime.PrimeService",
HandlerType: (*PrimeServiceServer)(nil),
Methods: []grpc.MethodDesc{},
Streams: []grpc.StreamDesc{
{
StreamName: "PrimeNumber",
Handler: _PrimeService_PrimeNumber_Handler,
ServerStreams: true,
},
},
Metadata: "prime/primepb/prime.proto",
}
続いてserver側を実装していきます。
server
package main
import (
"fmt"
"log"
"net"
"github.com/waytkheming/grpc-go-course/prime/primepb"
"google.golang.org/grpc"
)
type sever struct{}
func (*sever) PrimeNumber(req *primepb.PrimeRequest, stream primepb.PrimeService_PrimeNumberServer) error {
fmt.Printf("primenum function was invoked with %v", req)
number := req.GetNum()
divisor := int64(2)
for number > 1 {
if number%divisor == 0 {
stream.Send(&primepb.PrimeResponse{
Result: divisor,
})
number = number / divisor
} else {
divisor++
}
}
return nil
}
func main() {
fmt.Println("start")
lis, err := net.Listen("tcp", "0.0.0.0:50051")
if err != nil {
log.Fatalf("Failed to listen: %v", err)
}
s := grpc.NewServer()
primepb.RegisterPrimeServiceServer(s, &sever{})
if err := s.Serve(lis); err != nil {
log.Fatalf("Failed to serve: %v", err)
}
}
続いてclient側になります。
client側
package main
import (
"fmt"
"log"
"context"
"io"
"github.com/waytkheming/grpc-go-course/prime/primepb"
"google.golang.org/grpc"
)
func main() {
fmt.Println("Hello from client")
conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
if err != nil {
log.Fatalf("could not connect:%v", err)
}
defer conn.Close()
c := primepb.NewPrimeServiceClient(conn)
// fmt.Printf("created client: %f", c)
doServerStreaming(c, 210)
doServerStreaming(c, 300)
doServerStreaming(c, 3300)
}
func doServerStreaming(c primepb.PrimeServiceClient, number int64) {
fmt.Println("Starting to do a server streaimng rpc....")
req := &primepb.PrimeRequest{
Num: number,
}
resStream, err := c.PrimeNumber(context.Background(), req)
if err != nil {
log.Fatalf("error while calling GRPC: %v", err)
}
for {
msg, err := resStream.Recv()
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("error while reading stream: %v", err)
}
fmt.Println(msg.GetResult())
}
}
ここでは「210,300,3300」をリクエストしています。
APIの実行
ここまで実装したら、server側client側双方でgo runをして実行してみてください。
すると、下記のようなレスポンスが返ってくると思います。
Starting to do a server streaimng rpc....
2
3
5
7
Starting to do a server streaimng rpc....
2
2
3
5
5
Starting to do a server streaimng rpc....
2
2
3
5
5
11
以上になります。ここに書かれていることを応用すれば、簡単なserver streaming方式のAPIは構築できるはずです!