Edited at

GRPC Goのチュートリアルをやってみる

前日の東京理科大学AdventCalendarは@ko2091さんのハッカソンに参加してみたでした

ハッカソン一緒に出て楽しかったです!一緒に楽しめてたら幸いです!また一緒に出よう!


概要

GRPCのtutorialをやりながら自分なりにわからなかった所などをまとめて行きたいと思います。

GRPCのtutorialとはGRPC Basics - Goのことです!

このサイトを実際にやってみて自分なりにまとめていきます!

翻訳間違いなどがあればご指摘お願い致します


GRPCって何?

GRPCとはGoogleの開発したRPCフレームワークです

サーバー間の通信をHTTP2のプロトコルで通信でき、バイナリ形式のシリアライズフォーマットであるprotocol buffersでの通信ができるためJSONやXMLなどでの通信よりも早い処理でクライアント、サーバー間の通信を実現できます

最近ではマイクロサービスアーキテクチャの実装などで用いられています


RPCについてはこの記事 RPCとは?がわかりやすいです。

Protocol Buffersについては以前書いた記事があるのでそちらをご覧ください!

Protocol Buffers Basics Go



GRPCを使う利点


  1. GRPCでは一つの.protoファイルによってサーバーとクライアントを実装することができるのでクライアントとサーバー間のリクエスト、レスポンスの定義の一貫性を持たせることが出来ます

  2. サポートされている言語間であれば面倒な変換やリクエスト、レスポンスの形式の再定義なしに一つの.protoファイルによってサーバーとクライアンを生成できます

  3. Protocol Buffersの利点を享受できる(バイナリ形式のシリアライズフォーマットなのでテキスト形式であるJsonよりも早く通信を行うことができることなど)


セットアップ

go get するだけです

$ go get google.golang.org/grpc

今回はgo get したパッケージ内のサンプルを見ながらGRPCでのサービスなどの実装方法を見ていきます

公式のサンプルを見る場合は下記コマンドを実行して参照してください

$ cd $GOPATH/src/google.golang.org/grpc/examples/route_guide

なおこの記事ではサンプルを参照する形ではなくgopath内で自前で実装を行います


Serviceの定義

最初はgRPCのサービスを定義していきたいと思います

Restful APIで言うAPIのエンドポイントみたいなものです

ここのサンプルはgo getしたgoogle.golang.org/grpc内の

examples/route_guide/routeguide/route_guide.protoに書いてあります

serviceを定義するには次のようにserviceという予約語を.protoに書きます

service RouteGuide {

...
}

その後service内に送り値、返り値のメッセージタイプを指定してrpcメソッドを定義していきます

gRPCでは4つのタイプのserviceメソッドを定義でき今回は全てのタイプをRouteGuideサービスに定義していきます


  • シンプルなRPCではクライアントからサーバーへリクエストを送信したい場合は次のようにします
    通常のRestFul apiの方式と同じです、単純にクライアントからリクエストを送信してサーバーがレスポンスを返しています


rpc GetFeature(Point) returns (Feature) {}


  • server-side streaming RPCではクライアント側から送られてきたストリームを順次読み込みメッセージごとに返します クライアントからのリクエストは通常通りの通信方式で行いサーバーからのレスポンスはストリーミング方式で返してます


rpc ListFeatures(Rectangle) returns (stream Feature) {}


  • client-side streaming RPCではクライアントはメッセージごとに順次サーバーに送信します、書き込みが完了したクライアンはサーバーから全てのレスポンスが帰ってくるまで待ちます
    こちらは先程とは逆でリクエストをストリーミングで投げています


rpc RecordRoute(stream Point) returns (RouteSummary) {}


  • 双方向通信RPCではread-write streamを使って両サイドからメッセージを受け取ります、2つのstreamのオペレーションは独立しているのでクライアントとサーバーは好きなタイミングで読み込み、書き込みができます、こちらはリクエスト、レスポンス共にストリーミングで送信しています


rpc RouteChat(stream RouteNote) returns (stream RouteNote)


Message typeの定義

GRPCのサービスではRequestとResponseのtypeを定義する必要があります

今回実装するのは

Point Feature Rectangle RouteSummary RouteNoteの5つです

exampleを見るとPointは次のようになっています

型や=1などの記法は基本的にprotocol buffersに沿った記法なのでこちらを参照してください!

message Point {

int32 lattitude = 1;
int32 longititude = 2;
}


ClientとServerのコードを生成する

次に.protoファイルで定義したクライアントとサーバーのインターフェースを生成する必要があります

これはgrpc goプラグインを利用したprotocコマンドによって生成します

Install protoc command

protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide

plugins=grpcという引数を追加してコンパイルする際にgrpcのプラグインを使っています

このコマンドを使用するとrouteguideディレクトリ下にroute_guide.pb.goというファイルが生成されるかと思います

これは


  • シリアライズ、メッセージを受け取るための全てのリクエスト、レスポンスメッセージタイプ

  • クライアントがコールするためのインターフェースタイプ(RouteGuideに定義したやつ)

  • サーバーが実装するインターフェースタイプ(RouteGuideに定義したやつ)

を含んでいます


Serverの実装

まずはRouteGuideサーバーを実装する方法を見ていきます

RouteGuideサーバーを実装するためには2つのステップがあります


  1. サービス定義から生成されたサービスインターフェースの定義

  2. クライアントからのリクエストをListenするgRPCサーバーを走らせ、正しいサービスの実装へとdispatchする


RouteGuideの実装

exampleのserver.goのファイルを見てみるとrouteGuideServerという構造体が定義されているのを見ることができると思います

これはRouteGuideServerインターフェースを実装しています


server.go

type routeGuideServer struct {

savedFeatures []*pb.Feature // read-only after initialized

mu sync.Mutex // protects routeNotes
routeNotes map[string][]*pb.RouteNote
}

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...



Simple RPC

もっともシンプルなGetFeatureの実装をまず見ていきます

これはPointをクライアントから受け取り対応するFeatureの情報をデータベースなどから取得しreturnします


server.go

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {

for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}

// Featureが見つからない場合は空のFeatureを返す
return &pb.Feature{Location: point}, nil
}


メソッドはRPCのcontextオブジェクトとクライアントのprotocol bufferのリクエスト(Point)を受け取ります

そしてresponseの情報、エラー情報と共にprotocol bufferオブジェクト(Feature)を返します


Server-side streaming RPC

Streaming RPCを見ていきます

ListFeatureはServer-side streaming RPCです

ここでは複数のFeatureをclientに送り返す必要があります


server.go

func (s *routeGuideServer) ListFeature(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {

for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}

func inRange(point *pb.Point, rect *pb.Rectangle) bool {
left := math.Min(float64(rect.Lo.Longititude), float64(rect.Hi.Longititude))
right := math.Max(float64(rect.Lo.Longititude), float64(rect.Hi.Longititude))
top := math.Max(float64(rect.Lo.Lattitude), float64(rect.Hi.Lattitude))
bottom := math.Min(float64(rect.Lo.Lattitude), float64(rect.Hi.Lattitude))

if float64(point.Longititude) >= left &&
float64(point.Longititude) <= right &&
float64(point.Lattitude) <= top &&
float64(point.Lattitude) >= bottom {
return true
}
return false
}


シンプルなRPCでのリクエスト、レスポンスの代わりにListFeatureではリクエストオブジェクトとレスポンスを書き込む特別なRouteGuide_ListFeatureServerオブジェクトを使っています

ListFeatureメソッドではたくさんのFeatureオブジェクトを返す必要があります

これはRouteGuide_ListFeatureServerのSend()メソッドを使うことによって実現しています


Client-side streaming RPC

client-side streaming method であるRecordRouteはクライアントからstreamデータとしてPointを受け取り単一のRouteSummaryを返します

methodは全てのリクエストパラメータの代わりにstreamとしてRouteGuide_RecordRouteServerを受け取り書き込みと読み込みの両方を行います

RouteGuide_RecordRouteServerはRecv()メソッドでクライアントからメッセージを受け取ることができ

SendAndClose()メソッドで単一のレスポンスを返します


server.go

func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {

var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}

latitudeとlogitudeのスペル間違えてますが無視してください笑


server.go

func calcDistance(p1 *pb.Point, p2 *pb.Point) int32 {

const CordFactor float64 = 1e7
const R float64 = float64(6371000)
lat1 := toRadians(float64(p1.Lattitude) / CordFactor)
lat2 := toRadians(float64(p2.Lattitude) / CordFactor)
lng1 := toRadians(float64(p1.Longititude) / CordFactor)
lng2 := toRadians(float64(p2.Longititude) / CordFactor)
dlat := lat2 - lat1
dlng := lng2 - lng1

a := math.Sin(dlat/2)*math.Sin(dlat/2) +
math.Cos(lat1)*math.Cos(lat2)*
math.Sin(dlng/2)*math.Sin(dlng/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))

distance := R * c
return int32(distance)
}

func toRadians(num float64) float64 {
return num * math.Pi / float64(180)
}


method bodyではRouteGuide_RecordRouteServerのRecv()メソッドを使いメッセージがなくなるまで繰り返しクライアントのリクエストをリクエストオブジェクトに読み込んでいます

serverはRead()で帰ってきたerrorをチェックする必要があります

err == io.EOFだった場合はメッセージストリームが終了したということなのでRouteSummaryをこの時点で返します


Bidirectional streaming RPC

最後に双方向ストリーミングRPCであるRouteChat()を見ていきます


server.go

func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {

for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)

s.mu.Lock()
s.routeNotes[key] = append(s.routeNotes[key], in)
rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
copy(rn, s.routeNotes[key])
s.mu.Unlock()
for _, note := range rn {
if err := stream.Send(note); err != nil {
return err
}
}
}
}



server.go

func serialize(point *pb.Point) string {

return fmt.Sprintf("%d %d", point.Lattitude, point.Longititude)
}

基本的にはクライアントサイドストリーミングとサーバーサイドストリーミングを組み合わせたような感じです

ここでサーバーがSendAndClose()ではなくSend()を使ってやり取りしているのは複数のレスポンスに書き込むからです


Starting the server

実装は全て完了したのでサーバーをスタートさせましょう


server.go

func main() {

flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)
}


server.go

func newServer() *routeGuideServer {

s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
s.loadFeatures(*jsonDBFile)
return s
}


server.go

// loadFeatures loads features from a JSON file.

func (s *routeGuideServer) loadFeatures(filePath string) {
file, err := ioutil.ReadFile(filePath)
if err != nil {
log.Fatalf("Failed to load default features: %v", err)
}
if err := json.Unmarshal(file, &s.savedFeatures); err != nil {
log.Fatalf("Failed to load default features: %v", err)
}
}

loadするテストデータはtestdata/route_guide_db.jsonに用意されています

サーバーをスタートさせる処理では


  1. クライアントが利用するポートをlistenするlis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))

  2. grpc.NewServer()によってgRPC serverのインスタンスを作成する

  3. 実装したサービスを登録する pb.RegisterRouteGuideServer(grpcServer, newServer())

  4. Serve()メソッドを呼び出す

の4ステップを踏んでいます


Creating the client

このセクションではサーバーを叩くためのgRPCクライアントを生成します


Creating a stub

まずサーバーと通信をするgRPC channelを作成する必要があります

grpc.Dial()によって実現させます


client.go

var (

tls = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")
caFile = flag.String("ca_file", "", "The file containning the CA root cert file")
serverAddr = flag.String("server_addr", "127.0.0.1:10000", "The server address in the format of host:port")
serverHostOverride = flag.String("server_host_override", "x.test.youtube.com", "The server name use to verify the hostname returned by TLS handshake")
)


client.go

func main() {

flag.Parse()
// load options
var opts []grpc.DialOption
if *tls {
if *caFile == "" {
*caFile = testdata.Path("ca.pem")
}
creds, err := credentials.NewClientTLSFromFile(*caFile, *serverHostOverride)
if err != nil {
log.Fatalf("Failed to create TLS credentials %v", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
}

次にRPCを実行するためのstubを用意します


client.go

client := pb.NewRouteGuideClient(conn)



Calling service method

ここもSimple, Client, Server-side, Bidirectionalによって変わりますが

ちょっと長いためSimpleとBidirectionalのみ紹介します


Simple RPC


client.go

feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})

if err != nil {
...
}

上記のようにするとサーバーのGetFeatureメソッドにアクセスすることができます


Bidirectional RPC


client.go

stream, err := client.RouteChat(context.Background())

waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc

CloseAndSend()によってstreamを閉じ、それぞれリクエストとレスポンスをストリーミングによって順次取得し処理しています

ここもServer側の処理と似ています


Try it out!!

$ go run server/server.go

$ go run client/client.go

これでそれぞれサーバーとクライアント間で通信できるようになっていると思います

ここでGRPC Go Basicは終了です

クライアントのコードをまだ深く読めていないので記事はところどころ修正していきたいと思います(clientの処理全部書けてないし...)

みなさん、最後まで見ていただきありがとうございました!

これでこの記事は終了です!

閲覧ありがとうございました