16
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

東京理科大学Advent Calendar 2018

Day 25

GRPC Goのチュートリアルをやってみる

Last updated at Posted at 2018-12-25

前日の東京理科大学AdventCalendarは@ko2091さんのハッカソンに参加してみたでした
ハッカソン一緒に出て楽しかったです!一緒に楽しめてたら幸いです!また一緒に出よう!

概要

GRPCのtutorialをやりながら自分なりにわからなかった所などをまとめて行きたいと思います。
GRPCのtutorialとはGRPC Basics - Goのことです!
このサイトを実際にやってみて自分なりにまとめていきます!
翻訳間違いなどがあればご指摘お願い致します

GRPCって何?

GRPCとはGoogleの開発したRPCフレームワークです
サーバー間の通信をHTTP2のプロトコルで通信でき、バイナリ形式のシリアライズフォーマットであるprotocol buffersでの通信ができるためJSONやXMLなどでの通信よりも早い処理でクライアント、サーバー間の通信を実現できます
最近ではマイクロサービスアーキテクチャの実装などで用いられています

RPCについてはこの記事 RPCとは?がわかりやすいです。
Protocol Buffersについては以前書いた記事があるのでそちらをご覧ください!
Protocol Buffers Basics Go

GRPCを使う利点

  1. GRPCでは一つの.protoファイルによってサーバーとクライアントを実装することができるのでクライアントとサーバー間のリクエスト、レスポンスの定義の一貫性を持たせることが出来ます
  2. サポートされている言語間であれば面倒な変換やリクエスト、レスポンスの形式の再定義なしに一つの.protoファイルによってサーバーとクライアンを生成できます
  3. Protocol Buffersの利点を享受できる(バイナリ形式のシリアライズフォーマットなのでテキスト形式であるJsonよりも早く通信を行うことができることなど)

セットアップ

go get するだけです
$ go get google.golang.org/grpc
今回はgo get したパッケージ内のサンプルを見ながらGRPCでのサービスなどの実装方法を見ていきます
公式のサンプルを見る場合は下記コマンドを実行して参照してください
$ cd $GOPATH/src/google.golang.org/grpc/examples/route_guide
なおこの記事ではサンプルを参照する形ではなくgopath内で自前で実装を行います

Serviceの定義

最初はgRPCのサービスを定義していきたいと思います
Restful APIで言うAPIのエンドポイントみたいなものです

ここのサンプルはgo getしたgoogle.golang.org/grpc内の
examples/route_guide/routeguide/route_guide.protoに書いてあります

serviceを定義するには次のようにserviceという予約語を.protoに書きます

service RouteGuide {
    ...
}

その後service内に送り値、返り値のメッセージタイプを指定してrpcメソッドを定義していきます
gRPCでは4つのタイプのserviceメソッドを定義でき今回は全てのタイプをRouteGuideサービスに定義していきます

  • シンプルなRPCではクライアントからサーバーへリクエストを送信したい場合は次のようにします
    通常のRestFul apiの方式と同じです、単純にクライアントからリクエストを送信してサーバーがレスポンスを返しています
rpc GetFeature(Point) returns (Feature) {}
  • server-side streaming RPCではクライアント側から送られてきたストリームを順次読み込みメッセージごとに返します クライアントからのリクエストは通常通りの通信方式で行いサーバーからのレスポンスはストリーミング方式で返してます
rpc ListFeatures(Rectangle) returns (stream Feature) {}
  • client-side streaming RPCではクライアントはメッセージごとに順次サーバーに送信します、書き込みが完了したクライアンはサーバーから全てのレスポンスが帰ってくるまで待ちます
    こちらは先程とは逆でリクエストをストリーミングで投げています
rpc RecordRoute(stream Point) returns (RouteSummary) {}
  • 双方向通信RPCではread-write streamを使って両サイドからメッセージを受け取ります、2つのstreamのオペレーションは独立しているのでクライアントとサーバーは好きなタイミングで読み込み、書き込みができます、こちらはリクエスト、レスポンス共にストリーミングで送信しています
rpc RouteChat(stream RouteNote) returns (stream RouteNote)

Message typeの定義

GRPCのサービスではRequestとResponseのtypeを定義する必要があります
今回実装するのは
Point Feature Rectangle RouteSummary RouteNoteの5つです

exampleを見るとPointは次のようになっています
型や=1などの記法は基本的にprotocol buffersに沿った記法なのでこちらを参照してください!

message Point {
  int32 lattitude = 1;
  int32 longititude = 2;
}

ClientとServerのコードを生成する

次に.protoファイルで定義したクライアントとサーバーのインターフェースを生成する必要があります
これはgrpc goプラグインを利用したprotocコマンドによって生成します
Install protoc command

protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide

plugins=grpcという引数を追加してコンパイルする際にgrpcのプラグインを使っています

このコマンドを使用するとrouteguideディレクトリ下にroute_guide.pb.goというファイルが生成されるかと思います

これは

  • シリアライズ、メッセージを受け取るための全てのリクエスト、レスポンスメッセージタイプ
  • クライアントがコールするためのインターフェースタイプ(RouteGuideに定義したやつ)
  • サーバーが実装するインターフェースタイプ(RouteGuideに定義したやつ)

を含んでいます

Serverの実装

まずはRouteGuideサーバーを実装する方法を見ていきます

RouteGuideサーバーを実装するためには2つのステップがあります

  1. サービス定義から生成されたサービスインターフェースの定義
  2. クライアントからのリクエストをListenするgRPCサーバーを走らせ、正しいサービスの実装へとdispatchする

RouteGuideの実装

exampleのserver.goのファイルを見てみるとrouteGuideServerという構造体が定義されているのを見ることができると思います
これはRouteGuideServerインターフェースを実装しています

server.go
type routeGuideServer struct {
	savedFeatures []*pb.Feature // read-only after initialized

	mu         sync.Mutex // protects routeNotes
	routeNotes map[string][]*pb.RouteNote
}

func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
        ...
}
...

Simple RPC

もっともシンプルなGetFeatureの実装をまず見ていきます
これはPointをクライアントから受け取り対応するFeatureの情報をデータベースなどから取得しreturnします

server.go
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
	for _, feature := range s.savedFeatures {
		if proto.Equal(feature.Location, point) {
			return feature, nil
		}
	}

	// Featureが見つからない場合は空のFeatureを返す
	return &pb.Feature{Location: point}, nil
}

メソッドはRPCのcontextオブジェクトとクライアントのprotocol bufferのリクエスト(Point)を受け取ります
そしてresponseの情報、エラー情報と共にprotocol bufferオブジェクト(Feature)を返します

Server-side streaming RPC

Streaming RPCを見ていきます
ListFeatureはServer-side streaming RPCです
ここでは複数のFeatureをclientに送り返す必要があります

server.go
func (s *routeGuideServer) ListFeature(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
	for _, feature := range s.savedFeatures {
		if inRange(feature.Location, rect) {
			if err := stream.Send(feature); err != nil {
				return err
			}
		}
	}
	return nil
}

func inRange(point *pb.Point, rect *pb.Rectangle) bool {
	left := math.Min(float64(rect.Lo.Longititude), float64(rect.Hi.Longititude))
	right := math.Max(float64(rect.Lo.Longititude), float64(rect.Hi.Longititude))
	top := math.Max(float64(rect.Lo.Lattitude), float64(rect.Hi.Lattitude))
	bottom := math.Min(float64(rect.Lo.Lattitude), float64(rect.Hi.Lattitude))

	if float64(point.Longititude) >= left &&
		float64(point.Longititude) <= right &&
		float64(point.Lattitude) <= top &&
		float64(point.Lattitude) >= bottom {
		return true
	}
	return false
}

シンプルなRPCでのリクエスト、レスポンスの代わりにListFeatureではリクエストオブジェクトとレスポンスを書き込む特別なRouteGuide_ListFeatureServerオブジェクトを使っています

ListFeatureメソッドではたくさんのFeatureオブジェクトを返す必要があります
これはRouteGuide_ListFeatureServerのSend()メソッドを使うことによって実現しています

Client-side streaming RPC

client-side streaming method であるRecordRouteはクライアントからstreamデータとしてPointを受け取り単一のRouteSummaryを返します
methodは全てのリクエストパラメータの代わりにstreamとしてRouteGuide_RecordRouteServerを受け取り書き込みと読み込みの両方を行います

RouteGuide_RecordRouteServerはRecv()メソッドでクライアントからメッセージを受け取ることができ
SendAndClose()メソッドで単一のレスポンスを返します

server.go
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
	var pointCount, featureCount, distance int32
	var lastPoint *pb.Point
	startTime := time.Now()
	for {
		point, err := stream.Recv()
		if err == io.EOF {
			endTime := time.Now()
			return stream.SendAndClose(&pb.RouteSummary{
				PointCount:   pointCount,
				FeatureCount: featureCount,
				Distance:     distance,
				ElapsedTime:  int32(endTime.Sub(startTime).Seconds()),
			})
		}
		if err != nil {
			return err
		}
		pointCount++
		for _, feature := range s.savedFeatures {
			if proto.Equal(feature.Location, point) {
				featureCount++
			}
		}
		if lastPoint != nil {
			distance += calcDistance(lastPoint, point)
		}
		lastPoint = point
	}
}

latitudeとlogitudeのスペル間違えてますが無視してください笑

server.go
func calcDistance(p1 *pb.Point, p2 *pb.Point) int32 {
	const CordFactor float64 = 1e7
	const R float64 = float64(6371000)
	lat1 := toRadians(float64(p1.Lattitude) / CordFactor)
	lat2 := toRadians(float64(p2.Lattitude) / CordFactor)
	lng1 := toRadians(float64(p1.Longititude) / CordFactor)
	lng2 := toRadians(float64(p2.Longititude) / CordFactor)
	dlat := lat2 - lat1
	dlng := lng2 - lng1

	a := math.Sin(dlat/2)*math.Sin(dlat/2) +
		math.Cos(lat1)*math.Cos(lat2)*
			math.Sin(dlng/2)*math.Sin(dlng/2)
	c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))

	distance := R * c
	return int32(distance)
}

func toRadians(num float64) float64 {
	return num * math.Pi / float64(180)
}

method bodyではRouteGuide_RecordRouteServerのRecv()メソッドを使いメッセージがなくなるまで繰り返しクライアントのリクエストをリクエストオブジェクトに読み込んでいます

serverはRead()で帰ってきたerrorをチェックする必要があります
err == io.EOFだった場合はメッセージストリームが終了したということなのでRouteSummaryをこの時点で返します

Bidirectional streaming RPC

最後に双方向ストリーミングRPCであるRouteChat()を見ていきます

server.go
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		key := serialize(in.Location)

		s.mu.Lock()
		s.routeNotes[key] = append(s.routeNotes[key], in)
		rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
		copy(rn, s.routeNotes[key])
		s.mu.Unlock()
		for _, note := range rn {
			if err := stream.Send(note); err != nil {
				return err
			}
		}
	}
}
server.go
func serialize(point *pb.Point) string {
	return fmt.Sprintf("%d %d", point.Lattitude, point.Longititude)
}

基本的にはクライアントサイドストリーミングとサーバーサイドストリーミングを組み合わせたような感じです
ここでサーバーがSendAndClose()ではなくSend()を使ってやり取りしているのは複数のレスポンスに書き込むからです

Starting the server

実装は全て完了したのでサーバーをスタートさせましょう

server.go
func main() {
	flag.Parse()
	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}
	grpcServer := grpc.NewServer()
	pb.RegisterRouteGuideServer(grpcServer, newServer())
	grpcServer.Serve(lis)
}
server.go
func newServer() *routeGuideServer {
	s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
	s.loadFeatures(*jsonDBFile)
	return s
}
server.go
// loadFeatures loads features from a JSON file.
func (s *routeGuideServer) loadFeatures(filePath string) {
	file, err := ioutil.ReadFile(filePath)
	if err != nil {
		log.Fatalf("Failed to load default features: %v", err)
	}
	if err := json.Unmarshal(file, &s.savedFeatures); err != nil {
		log.Fatalf("Failed to load default features: %v", err)
	}
}

loadするテストデータはtestdata/route_guide_db.jsonに用意されています

サーバーをスタートさせる処理では

  1. クライアントが利用するポートをlistenするlis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
  2. grpc.NewServer()によってgRPC serverのインスタンスを作成する
  3. 実装したサービスを登録する pb.RegisterRouteGuideServer(grpcServer, newServer())
  4. Serve()メソッドを呼び出す

の4ステップを踏んでいます

Creating the client

このセクションではサーバーを叩くためのgRPCクライアントを生成します

Creating a stub

まずサーバーと通信をするgRPC channelを作成する必要があります
grpc.Dial()によって実現させます

client.go
var (
	tls                = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")
	caFile             = flag.String("ca_file", "", "The file containning the CA root cert file")
	serverAddr         = flag.String("server_addr", "127.0.0.1:10000", "The server address in the format of host:port")
	serverHostOverride = flag.String("server_host_override", "x.test.youtube.com", "The server name use to verify the hostname returned by TLS handshake")
)
client.go
func main() {
	flag.Parse()
	// load options
	var opts []grpc.DialOption
	if *tls {
		if *caFile == "" {
			*caFile = testdata.Path("ca.pem")
		}
		creds, err := credentials.NewClientTLSFromFile(*caFile, *serverHostOverride)
		if err != nil {
			log.Fatalf("Failed to create TLS credentials %v", err)
		}
		opts = append(opts, grpc.WithTransportCredentials(creds))
	} else {
		opts = append(opts, grpc.WithInsecure())
	}
	conn, err := grpc.Dial(*serverAddr, opts...)
	if err != nil {
		log.Fatalf("fail to dial: %v", err)
	}
	defer conn.Close()
}

次にRPCを実行するためのstubを用意します

client.go
client := pb.NewRouteGuideClient(conn)

Calling service method

ここもSimple, Client, Server-side, Bidirectionalによって変わりますが
ちょっと長いためSimpleとBidirectionalのみ紹介します

Simple RPC

client.go
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
        ...
}

上記のようにするとサーバーのGetFeatureメソッドにアクセスすることができます

Bidirectional RPC

client.go
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			// read done.
			close(waitc)
			return
		}
		if err != nil {
			log.Fatalf("Failed to receive a note : %v", err)
		}
		log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
	}
}()
for _, note := range notes {
	if err := stream.Send(note); err != nil {
		log.Fatalf("Failed to send a note: %v", err)
	}
}
stream.CloseSend()
<-waitc

CloseAndSend()によってstreamを閉じ、それぞれリクエストとレスポンスをストリーミングによって順次取得し処理しています
ここもServer側の処理と似ています

Try it out!!

$ go run server/server.go
$ go run client/client.go

これでそれぞれサーバーとクライアント間で通信できるようになっていると思います

ここでGRPC Go Basicは終了です

クライアントのコードをまだ深く読めていないので記事はところどころ修正していきたいと思います(clientの処理全部書けてないし...)

みなさん、最後まで見ていただきありがとうございました!
これでこの記事は終了です!

閲覧ありがとうございました

16
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
16
9

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?