前日の東京理科大学AdventCalendarは@ko2091さんのハッカソンに参加してみたでした
ハッカソン一緒に出て楽しかったです!一緒に楽しめてたら幸いです!また一緒に出よう!
概要
GRPCのtutorialをやりながら自分なりにわからなかった所などをまとめて行きたいと思います。
GRPCのtutorialとはGRPC Basics - Goのことです!
このサイトを実際にやってみて自分なりにまとめていきます!
翻訳間違いなどがあればご指摘お願い致します
GRPCって何?
GRPCとはGoogleの開発したRPCフレームワークです
サーバー間の通信をHTTP2のプロトコルで通信でき、バイナリ形式のシリアライズフォーマットであるprotocol buffersでの通信ができるためJSONやXMLなどでの通信よりも早い処理でクライアント、サーバー間の通信を実現できます
最近ではマイクロサービスアーキテクチャの実装などで用いられています
RPCについてはこの記事 RPCとは?がわかりやすいです。
Protocol Buffersについては以前書いた記事があるのでそちらをご覧ください!
Protocol Buffers Basics Go
GRPCを使う利点
- GRPCでは一つの.protoファイルによってサーバーとクライアントを実装することができるのでクライアントとサーバー間のリクエスト、レスポンスの定義の一貫性を持たせることが出来ます
- サポートされている言語間であれば面倒な変換やリクエスト、レスポンスの形式の再定義なしに一つの.protoファイルによってサーバーとクライアンを生成できます
- Protocol Buffersの利点を享受できる(バイナリ形式のシリアライズフォーマットなのでテキスト形式であるJsonよりも早く通信を行うことができることなど)
セットアップ
go get するだけです
$ go get google.golang.org/grpc
今回はgo get したパッケージ内のサンプルを見ながらGRPCでのサービスなどの実装方法を見ていきます
公式のサンプルを見る場合は下記コマンドを実行して参照してください
$ cd $GOPATH/src/google.golang.org/grpc/examples/route_guide
なおこの記事ではサンプルを参照する形ではなくgopath内で自前で実装を行います
Serviceの定義
最初はgRPCのサービスを定義していきたいと思います
Restful APIで言うAPIのエンドポイントみたいなものです
ここのサンプルはgo getしたgoogle.golang.org/grpc内の
examples/route_guide/routeguide/route_guide.proto
に書いてあります
serviceを定義するには次のようにserviceという予約語を.protoに書きます
service RouteGuide {
...
}
その後service内に送り値、返り値のメッセージタイプを指定してrpcメソッドを定義していきます
gRPCでは4つのタイプのserviceメソッドを定義でき今回は全てのタイプをRouteGuideサービスに定義していきます
- シンプルなRPCではクライアントからサーバーへリクエストを送信したい場合は次のようにします
通常のRestFul apiの方式と同じです、単純にクライアントからリクエストを送信してサーバーがレスポンスを返しています
rpc GetFeature(Point) returns (Feature) {}
- server-side streaming RPCではクライアント側から送られてきたストリームを順次読み込みメッセージごとに返します クライアントからのリクエストは通常通りの通信方式で行いサーバーからのレスポンスはストリーミング方式で返してます
rpc ListFeatures(Rectangle) returns (stream Feature) {}
- client-side streaming RPCではクライアントはメッセージごとに順次サーバーに送信します、書き込みが完了したクライアンはサーバーから全てのレスポンスが帰ってくるまで待ちます
こちらは先程とは逆でリクエストをストリーミングで投げています
rpc RecordRoute(stream Point) returns (RouteSummary) {}
- 双方向通信RPCではread-write streamを使って両サイドからメッセージを受け取ります、2つのstreamのオペレーションは独立しているのでクライアントとサーバーは好きなタイミングで読み込み、書き込みができます、こちらはリクエスト、レスポンス共にストリーミングで送信しています
rpc RouteChat(stream RouteNote) returns (stream RouteNote)
Message typeの定義
GRPCのサービスではRequestとResponseのtypeを定義する必要があります
今回実装するのは
Point Feature Rectangle RouteSummary RouteNoteの5つです
exampleを見るとPointは次のようになっています
型や=1などの記法は基本的にprotocol buffersに沿った記法なのでこちらを参照してください!
message Point {
int32 lattitude = 1;
int32 longititude = 2;
}
ClientとServerのコードを生成する
次に.protoファイルで定義したクライアントとサーバーのインターフェースを生成する必要があります
これはgrpc goプラグインを利用したprotocコマンドによって生成します
Install protoc command
protoc -I routeguide/ routeguide/route_guide.proto --go_out=plugins=grpc:routeguide
plugins=grpcという引数を追加してコンパイルする際にgrpcのプラグインを使っています
このコマンドを使用するとrouteguideディレクトリ下にroute_guide.pb.go
というファイルが生成されるかと思います
これは
- シリアライズ、メッセージを受け取るための全てのリクエスト、レスポンスメッセージタイプ
- クライアントがコールするためのインターフェースタイプ(RouteGuideに定義したやつ)
- サーバーが実装するインターフェースタイプ(RouteGuideに定義したやつ)
を含んでいます
Serverの実装
まずはRouteGuideサーバーを実装する方法を見ていきます
RouteGuideサーバーを実装するためには2つのステップがあります
- サービス定義から生成されたサービスインターフェースの定義
- クライアントからのリクエストをListenするgRPCサーバーを走らせ、正しいサービスの実装へとdispatchする
RouteGuideの実装
exampleのserver.goのファイルを見てみるとrouteGuideServerという構造体が定義されているのを見ることができると思います
これはRouteGuideServerインターフェースを実装しています
type routeGuideServer struct {
savedFeatures []*pb.Feature // read-only after initialized
mu sync.Mutex // protects routeNotes
routeNotes map[string][]*pb.RouteNote
}
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
...
}
...
Simple RPC
もっともシンプルなGetFeatureの実装をまず見ていきます
これはPointをクライアントから受け取り対応するFeatureの情報をデータベースなどから取得しreturnします
func (s *routeGuideServer) GetFeature(ctx context.Context, point *pb.Point) (*pb.Feature, error) {
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
return feature, nil
}
}
// Featureが見つからない場合は空のFeatureを返す
return &pb.Feature{Location: point}, nil
}
メソッドはRPCのcontextオブジェクトとクライアントのprotocol bufferのリクエスト(Point)を受け取ります
そしてresponseの情報、エラー情報と共にprotocol bufferオブジェクト(Feature)を返します
Server-side streaming RPC
Streaming RPCを見ていきます
ListFeatureはServer-side streaming RPCです
ここでは複数のFeatureをclientに送り返す必要があります
func (s *routeGuideServer) ListFeature(rect *pb.Rectangle, stream pb.RouteGuide_ListFeaturesServer) error {
for _, feature := range s.savedFeatures {
if inRange(feature.Location, rect) {
if err := stream.Send(feature); err != nil {
return err
}
}
}
return nil
}
func inRange(point *pb.Point, rect *pb.Rectangle) bool {
left := math.Min(float64(rect.Lo.Longititude), float64(rect.Hi.Longititude))
right := math.Max(float64(rect.Lo.Longititude), float64(rect.Hi.Longititude))
top := math.Max(float64(rect.Lo.Lattitude), float64(rect.Hi.Lattitude))
bottom := math.Min(float64(rect.Lo.Lattitude), float64(rect.Hi.Lattitude))
if float64(point.Longititude) >= left &&
float64(point.Longititude) <= right &&
float64(point.Lattitude) <= top &&
float64(point.Lattitude) >= bottom {
return true
}
return false
}
シンプルなRPCでのリクエスト、レスポンスの代わりにListFeatureではリクエストオブジェクトとレスポンスを書き込む特別なRouteGuide_ListFeatureServerオブジェクトを使っています
ListFeatureメソッドではたくさんのFeatureオブジェクトを返す必要があります
これはRouteGuide_ListFeatureServerのSend()メソッドを使うことによって実現しています
Client-side streaming RPC
client-side streaming method であるRecordRouteはクライアントからstreamデータとしてPointを受け取り単一のRouteSummaryを返します
methodは全てのリクエストパラメータの代わりにstreamとしてRouteGuide_RecordRouteServerを受け取り書き込みと読み込みの両方を行います
RouteGuide_RecordRouteServerはRecv()メソッドでクライアントからメッセージを受け取ることができ
SendAndClose()メソッドで単一のレスポンスを返します
func (s *routeGuideServer) RecordRoute(stream pb.RouteGuide_RecordRouteServer) error {
var pointCount, featureCount, distance int32
var lastPoint *pb.Point
startTime := time.Now()
for {
point, err := stream.Recv()
if err == io.EOF {
endTime := time.Now()
return stream.SendAndClose(&pb.RouteSummary{
PointCount: pointCount,
FeatureCount: featureCount,
Distance: distance,
ElapsedTime: int32(endTime.Sub(startTime).Seconds()),
})
}
if err != nil {
return err
}
pointCount++
for _, feature := range s.savedFeatures {
if proto.Equal(feature.Location, point) {
featureCount++
}
}
if lastPoint != nil {
distance += calcDistance(lastPoint, point)
}
lastPoint = point
}
}
latitudeとlogitudeのスペル間違えてますが無視してください笑
func calcDistance(p1 *pb.Point, p2 *pb.Point) int32 {
const CordFactor float64 = 1e7
const R float64 = float64(6371000)
lat1 := toRadians(float64(p1.Lattitude) / CordFactor)
lat2 := toRadians(float64(p2.Lattitude) / CordFactor)
lng1 := toRadians(float64(p1.Longititude) / CordFactor)
lng2 := toRadians(float64(p2.Longititude) / CordFactor)
dlat := lat2 - lat1
dlng := lng2 - lng1
a := math.Sin(dlat/2)*math.Sin(dlat/2) +
math.Cos(lat1)*math.Cos(lat2)*
math.Sin(dlng/2)*math.Sin(dlng/2)
c := 2 * math.Atan2(math.Sqrt(a), math.Sqrt(1-a))
distance := R * c
return int32(distance)
}
func toRadians(num float64) float64 {
return num * math.Pi / float64(180)
}
method bodyではRouteGuide_RecordRouteServerのRecv()メソッドを使いメッセージがなくなるまで繰り返しクライアントのリクエストをリクエストオブジェクトに読み込んでいます
serverはRead()で帰ってきたerrorをチェックする必要があります
err == io.EOFだった場合はメッセージストリームが終了したということなのでRouteSummaryをこの時点で返します
Bidirectional streaming RPC
最後に双方向ストリーミングRPCであるRouteChat()を見ていきます
func (s *routeGuideServer) RouteChat(stream pb.RouteGuide_RouteChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
key := serialize(in.Location)
s.mu.Lock()
s.routeNotes[key] = append(s.routeNotes[key], in)
rn := make([]*pb.RouteNote, len(s.routeNotes[key]))
copy(rn, s.routeNotes[key])
s.mu.Unlock()
for _, note := range rn {
if err := stream.Send(note); err != nil {
return err
}
}
}
}
func serialize(point *pb.Point) string {
return fmt.Sprintf("%d %d", point.Lattitude, point.Longititude)
}
基本的にはクライアントサイドストリーミングとサーバーサイドストリーミングを組み合わせたような感じです
ここでサーバーがSendAndClose()ではなくSend()を使ってやり取りしているのは複数のレスポンスに書き込むからです
Starting the server
実装は全て完了したのでサーバーをスタートさせましょう
func main() {
flag.Parse()
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
grpcServer := grpc.NewServer()
pb.RegisterRouteGuideServer(grpcServer, newServer())
grpcServer.Serve(lis)
}
func newServer() *routeGuideServer {
s := &routeGuideServer{routeNotes: make(map[string][]*pb.RouteNote)}
s.loadFeatures(*jsonDBFile)
return s
}
// loadFeatures loads features from a JSON file.
func (s *routeGuideServer) loadFeatures(filePath string) {
file, err := ioutil.ReadFile(filePath)
if err != nil {
log.Fatalf("Failed to load default features: %v", err)
}
if err := json.Unmarshal(file, &s.savedFeatures); err != nil {
log.Fatalf("Failed to load default features: %v", err)
}
}
loadするテストデータはtestdata/route_guide_db.jsonに用意されています
サーバーをスタートさせる処理では
- クライアントが利用するポートをlistenする
lis, err := net.Listen("tcp", fmt.Sprintf(":%d", *port))
- grpc.NewServer()によってgRPC serverのインスタンスを作成する
- 実装したサービスを登録する
pb.RegisterRouteGuideServer(grpcServer, newServer())
- Serve()メソッドを呼び出す
の4ステップを踏んでいます
Creating the client
このセクションではサーバーを叩くためのgRPCクライアントを生成します
Creating a stub
まずサーバーと通信をするgRPC channelを作成する必要があります
grpc.Dial()によって実現させます
var (
tls = flag.Bool("tls", false, "Connection uses TLS if true, else plain TCP")
caFile = flag.String("ca_file", "", "The file containning the CA root cert file")
serverAddr = flag.String("server_addr", "127.0.0.1:10000", "The server address in the format of host:port")
serverHostOverride = flag.String("server_host_override", "x.test.youtube.com", "The server name use to verify the hostname returned by TLS handshake")
)
func main() {
flag.Parse()
// load options
var opts []grpc.DialOption
if *tls {
if *caFile == "" {
*caFile = testdata.Path("ca.pem")
}
creds, err := credentials.NewClientTLSFromFile(*caFile, *serverHostOverride)
if err != nil {
log.Fatalf("Failed to create TLS credentials %v", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
} else {
opts = append(opts, grpc.WithInsecure())
}
conn, err := grpc.Dial(*serverAddr, opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
defer conn.Close()
}
次にRPCを実行するためのstubを用意します
client := pb.NewRouteGuideClient(conn)
Calling service method
ここもSimple, Client, Server-side, Bidirectionalによって変わりますが
ちょっと長いためSimpleとBidirectionalのみ紹介します
Simple RPC
feature, err := client.GetFeature(context.Background(), &pb.Point{409146138, -746188906})
if err != nil {
...
}
上記のようにするとサーバーのGetFeatureメソッドにアクセスすることができます
Bidirectional RPC
stream, err := client.RouteChat(context.Background())
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
// read done.
close(waitc)
return
}
if err != nil {
log.Fatalf("Failed to receive a note : %v", err)
}
log.Printf("Got message %s at point(%d, %d)", in.Message, in.Location.Latitude, in.Location.Longitude)
}
}()
for _, note := range notes {
if err := stream.Send(note); err != nil {
log.Fatalf("Failed to send a note: %v", err)
}
}
stream.CloseSend()
<-waitc
CloseAndSend()によってstreamを閉じ、それぞれリクエストとレスポンスをストリーミングによって順次取得し処理しています
ここもServer側の処理と似ています
Try it out!!
$ go run server/server.go
$ go run client/client.go
これでそれぞれサーバーとクライアント間で通信できるようになっていると思います
ここでGRPC Go Basicは終了です
クライアントのコードをまだ深く読めていないので記事はところどころ修正していきたいと思います(clientの処理全部書けてないし...)
みなさん、最後まで見ていただきありがとうございました!
これでこの記事は終了です!
閲覧ありがとうございました