LoginSignup
1

More than 3 years have passed since last update.

【初めてのgRPC】Server Streaming API開発ハンズオン

Last updated at Posted at 2019-09-01

下記の前回記事で紹介したgRPCでのUnary APIに続き、今回はServer streaming APIのハンズオンを行います。

Unary APIやgRPCについては下記をご覧ください。
https://qiita.com/shiei_kawa/items/6a5dabce570657b9e2cc

Server Streaming APIとは

前回の一つのリクエストに対し、一つのレスポンスが返ってくるものとは異なり、1つのリクエストに対して複数のレスポンスが返ってくるAPIになります。

今回はリクエストで投げられた数字を2から順番に割っていき、割り切れた数字を順番にレスポンスしていくAPIを作成します。

前回と同じようにまずprotoファイルの作成から行なっていきます。

protoファイルの作成

syntax = "proto3";

package prime;
option go_package="primepb";

message PrimeRequest{
    int64 num = 1;
}

message PrimeResponse{
    int64 result = 1;
}

service PrimeService{
    rpc PrimeNumber(PrimeRequest) returns (stream PrimeResponse){};
}

protoファイルから雛形コードの生成

次に上記のprotoファイルからコードを生成していきます。
以下のコマンドを叩いてください。

protoc {protoファイル} --go_out=plugins=grpc:.

すると何と先ほどのprotoファイルと同階層に下記のようなコードが生成されているはずです。

// Code generated by protoc-gen-go. DO NOT EDIT.
// source: prime/primepb/prime.proto

package primepb

import (
    context "context"
    fmt "fmt"
    math "math"

    proto "github.com/golang/protobuf/proto"
    grpc "google.golang.org/grpc"
    codes "google.golang.org/grpc/codes"
    status "google.golang.org/grpc/status"
)

// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf

// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.ProtoPackageIsVersion3 // please upgrade the proto package

type PrimeRequest struct {
    Num                  int64    `protobuf:"varint,1,opt,name=num,proto3" json:"num,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int64    `json:"-"`
}

func (m *PrimeRequest) Reset()         { *m = PrimeRequest{} }
func (m *PrimeRequest) String() string { return proto.CompactTextString(m) }
func (*PrimeRequest) ProtoMessage()    {}
func (*PrimeRequest) Descriptor() ([]byte, []int) {
    return fileDescriptor_b8d5732fce9c712c, []int{0}
}

func (m *PrimeRequest) XXX_Unmarshal(b []byte) error {
    return xxx_messageInfo_PrimeRequest.Unmarshal(m, b)
}
func (m *PrimeRequest) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    return xxx_messageInfo_PrimeRequest.Marshal(b, m, deterministic)
}
func (m *PrimeRequest) XXX_Merge(src proto.Message) {
    xxx_messageInfo_PrimeRequest.Merge(m, src)
}
func (m *PrimeRequest) XXX_Size() int {
    return xxx_messageInfo_PrimeRequest.Size(m)
}
func (m *PrimeRequest) XXX_DiscardUnknown() {
    xxx_messageInfo_PrimeRequest.DiscardUnknown(m)
}

var xxx_messageInfo_PrimeRequest proto.InternalMessageInfo

func (m *PrimeRequest) GetNum() int64 {
    if m != nil {
        return m.Num
    }
    return 0
}

type PrimeResponse struct {
    Result               int64    `protobuf:"varint,1,opt,name=result,proto3" json:"result,omitempty"`
    XXX_NoUnkeyedLiteral struct{} `json:"-"`
    XXX_unrecognized     []byte   `json:"-"`
    XXX_sizecache        int64    `json:"-"`
}

func (m *PrimeResponse) Reset()         { *m = PrimeResponse{} }
func (m *PrimeResponse) String() string { return proto.CompactTextString(m) }
func (*PrimeResponse) ProtoMessage()    {}
func (*PrimeResponse) Descriptor() ([]byte, []int) {
    return fileDescriptor_b8d5732fce9c712c, []int{1}
}

func (m *PrimeResponse) XXX_Unmarshal(b []byte) error {
    return xxx_messageInfo_PrimeResponse.Unmarshal(m, b)
}
func (m *PrimeResponse) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
    return xxx_messageInfo_PrimeResponse.Marshal(b, m, deterministic)
}
func (m *PrimeResponse) XXX_Merge(src proto.Message) {
    xxx_messageInfo_PrimeResponse.Merge(m, src)
}
func (m *PrimeResponse) XXX_Size() int {
    return xxx_messageInfo_PrimeResponse.Size(m)
}
func (m *PrimeResponse) XXX_DiscardUnknown() {
    xxx_messageInfo_PrimeResponse.DiscardUnknown(m)
}

var xxx_messageInfo_PrimeResponse proto.InternalMessageInfo

func (m *PrimeResponse) GetResult() int64 {
    if m != nil {
        return m.Result
    }
    return 0
}

func init() {
    proto.RegisterType((*PrimeRequest)(nil), "prime.PrimeRequest")
    proto.RegisterType((*PrimeResponse)(nil), "prime.PrimeResponse")
}

func init() { proto.RegisterFile("prime/primepb/prime.proto", fileDescriptor_b8d5732fce9c712c) }

var fileDescriptor_b8d5732fce9c712c = []byte{
    // 154 bytes of a gzipped FileDescriptorProto
    0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2c, 0x28, 0xca, 0xcc,
    0x4d, 0xd5, 0x07, 0x93, 0x05, 0x49, 0x10, 0x5a, 0xaf, 0xa0, 0x28, 0xbf, 0x24, 0x5f, 0x88, 0x15,
    0xcc, 0x51, 0x52, 0xe0, 0xe2, 0x09, 0x00, 0x31, 0x82, 0x52, 0x0b, 0x4b, 0x53, 0x8b, 0x4b, 0x84,
    0x04, 0xb8, 0x98, 0xf3, 0x4a, 0x73, 0x25, 0x18, 0x15, 0x18, 0x35, 0x98, 0x83, 0x40, 0x4c, 0x25,
    0x75, 0x2e, 0x5e, 0xa8, 0x8a, 0xe2, 0x82, 0xfc, 0xbc, 0xe2, 0x54, 0x21, 0x31, 0x2e, 0xb6, 0xa2,
    0xd4, 0xe2, 0xd2, 0x9c, 0x12, 0xa8, 0x2a, 0x28, 0xcf, 0xc8, 0x07, 0x6a, 0x54, 0x70, 0x6a, 0x51,
    0x59, 0x66, 0x72, 0xaa, 0x90, 0x0d, 0x17, 0x37, 0x98, 0xef, 0x57, 0x9a, 0x9b, 0x94, 0x5a, 0x24,
    0x24, 0xac, 0x07, 0xb1, 0x1e, 0xd9, 0x3a, 0x29, 0x11, 0x54, 0x41, 0x88, 0x0d, 0x4a, 0x0c, 0x06,
    0x8c, 0x4e, 0x9c, 0x51, 0xec, 0x50, 0x67, 0x27, 0xb1, 0x81, 0x5d, 0x6c, 0x0c, 0x08, 0x00, 0x00,
    0xff, 0xff, 0x29, 0x78, 0x12, 0xd8, 0xce, 0x00, 0x00, 0x00,
}

// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4

// PrimeServiceClient is the client API for PrimeService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type PrimeServiceClient interface {
    PrimeNumber(ctx context.Context, in *PrimeRequest, opts ...grpc.CallOption) (PrimeService_PrimeNumberClient, error)
}

type primeServiceClient struct {
    cc *grpc.ClientConn
}

func NewPrimeServiceClient(cc *grpc.ClientConn) PrimeServiceClient {
    return &primeServiceClient{cc}
}

func (c *primeServiceClient) PrimeNumber(ctx context.Context, in *PrimeRequest, opts ...grpc.CallOption) (PrimeService_PrimeNumberClient, error) {
    stream, err := c.cc.NewStream(ctx, &_PrimeService_serviceDesc.Streams[0], "/prime.PrimeService/PrimeNumber", opts...)
    if err != nil {
        return nil, err
    }
    x := &primeServicePrimeNumberClient{stream}
    if err := x.ClientStream.SendMsg(in); err != nil {
        return nil, err
    }
    if err := x.ClientStream.CloseSend(); err != nil {
        return nil, err
    }
    return x, nil
}

type PrimeService_PrimeNumberClient interface {
    Recv() (*PrimeResponse, error)
    grpc.ClientStream
}

type primeServicePrimeNumberClient struct {
    grpc.ClientStream
}

func (x *primeServicePrimeNumberClient) Recv() (*PrimeResponse, error) {
    m := new(PrimeResponse)
    if err := x.ClientStream.RecvMsg(m); err != nil {
        return nil, err
    }
    return m, nil
}

// PrimeServiceServer is the server API for PrimeService service.
type PrimeServiceServer interface {
    PrimeNumber(*PrimeRequest, PrimeService_PrimeNumberServer) error
}

// UnimplementedPrimeServiceServer can be embedded to have forward compatible implementations.
type UnimplementedPrimeServiceServer struct {
}

func (*UnimplementedPrimeServiceServer) PrimeNumber(req *PrimeRequest, srv PrimeService_PrimeNumberServer) error {
    return status.Errorf(codes.Unimplemented, "method PrimeNumber not implemented")
}

func RegisterPrimeServiceServer(s *grpc.Server, srv PrimeServiceServer) {
    s.RegisterService(&_PrimeService_serviceDesc, srv)
}

func _PrimeService_PrimeNumber_Handler(srv interface{}, stream grpc.ServerStream) error {
    m := new(PrimeRequest)
    if err := stream.RecvMsg(m); err != nil {
        return err
    }
    return srv.(PrimeServiceServer).PrimeNumber(m, &primeServicePrimeNumberServer{stream})
}

type PrimeService_PrimeNumberServer interface {
    Send(*PrimeResponse) error
    grpc.ServerStream
}

type primeServicePrimeNumberServer struct {
    grpc.ServerStream
}

func (x *primeServicePrimeNumberServer) Send(m *PrimeResponse) error {
    return x.ServerStream.SendMsg(m)
}

var _PrimeService_serviceDesc = grpc.ServiceDesc{
    ServiceName: "prime.PrimeService",
    HandlerType: (*PrimeServiceServer)(nil),
    Methods:     []grpc.MethodDesc{},
    Streams: []grpc.StreamDesc{
        {
            StreamName:    "PrimeNumber",
            Handler:       _PrimeService_PrimeNumber_Handler,
            ServerStreams: true,
        },
    },
    Metadata: "prime/primepb/prime.proto",
}


続いてserver側を実装していきます。

server

package main

import (
    "fmt"
    "log"
    "net"

    "github.com/waytkheming/grpc-go-course/prime/primepb"
    "google.golang.org/grpc"
)

type sever struct{}

func (*sever) PrimeNumber(req *primepb.PrimeRequest, stream primepb.PrimeService_PrimeNumberServer) error {
    fmt.Printf("primenum function was invoked with %v", req)

    number := req.GetNum()
    divisor := int64(2)
    for number > 1 {
        if number%divisor == 0 {
            stream.Send(&primepb.PrimeResponse{
                Result: divisor,
            })
            number = number / divisor
        } else {
            divisor++
        }
    }
    return nil

}

func main() {
    fmt.Println("start")

    lis, err := net.Listen("tcp", "0.0.0.0:50051")
    if err != nil {
        log.Fatalf("Failed to listen: %v", err)
    }

    s := grpc.NewServer()
    primepb.RegisterPrimeServiceServer(s, &sever{})

    if err := s.Serve(lis); err != nil {
        log.Fatalf("Failed to serve: %v", err)
    }
}


続いてclient側になります。

client側


package main

import (
    "fmt"
    "log"

    "context"

    "io"

    "github.com/waytkheming/grpc-go-course/prime/primepb"
    "google.golang.org/grpc"
)

func main() {
    fmt.Println("Hello from client")
    conn, err := grpc.Dial("localhost:50051", grpc.WithInsecure())
    if err != nil {
        log.Fatalf("could not connect:%v", err)
    }

    defer conn.Close()

    c := primepb.NewPrimeServiceClient(conn)
    // fmt.Printf("created client: %f", c)

    doServerStreaming(c, 210)
    doServerStreaming(c, 300)
    doServerStreaming(c, 3300)

}

func doServerStreaming(c primepb.PrimeServiceClient, number int64) {
    fmt.Println("Starting to do a server streaimng rpc....")

    req := &primepb.PrimeRequest{
        Num: number,
    }
    resStream, err := c.PrimeNumber(context.Background(), req)
    if err != nil {
        log.Fatalf("error while calling GRPC: %v", err)
    }
    for {
        msg, err := resStream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            log.Fatalf("error while reading stream: %v", err)

        }
        fmt.Println(msg.GetResult())
    }
}



ここでは「210,300,3300」をリクエストしています。

APIの実行

ここまで実装したら、server側client側双方でgo runをして実行してみてください。
すると、下記のようなレスポンスが返ってくると思います。

Starting to do a server streaimng rpc....
2
3
5
7
Starting to do a server streaimng rpc....
2
2
3
5
5
Starting to do a server streaimng rpc....
2
2
3
5
5
11

以上になります。ここに書かれていることを応用すれば、簡単なserver streaming方式のAPIは構築できるはずです!

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1