初めに
業務でgRPCを使用する機会があったので学習した内容をアウトプットのため投稿します。
プロトコルバッファに関してはこちらの記事でまとめています。
また、使用する言語はGoを選択しています。
gRPCとは
gRPCとは、クライアントがサーバーのメソッドを呼び出して通信することができるRPC(Remote Procedure Calls)です。
HTTP/2を使用し、デフォルトではProtocol BuffersをIDL・データのエンコーディングとして用います。
クライアントはスタブというものを用いてサーバーのメソッドを呼び出します。
HTTP/2の使用や通信データのバイナリ化などを用いることで高速な通信が可能です。
下図のようなマイクロサービスで用いられることが多いです。
図
画像出典: gRPC公式
簡易的な例
gRPCでは、既存のRPCと同様にサービスを定義し、その中にメソッドやリクエスト・戻り値を指定します。
1つのサービスに複数のメソッドを定義することができ、メソッドはgRPCのエンドポイントになります。
service HelloService {
rpc SayHello (HelloRequest) returns (HelloResponse);
}
message HelloRequest {
string greeting = 1;
}
message HelloResponse {
string reply = 1;
}
出典: gRPC公式
4種類のサービスメソッド
gRPCでは、下記の4種類のサービスメソッドを使用することができます。
- Unary RPCs
- Server streaming RPCs
- Client streaming RPCs
- Bidirectional streaming RPCs
それぞれの特徴を記載していきます。
Unary RPCs
Unary RPCsは1つのリクエストに1つのレスポンスを返す方式です。
API等に用いられます。
rpc メソッド名(リクエスト) returns (レスポンス);
処理の流れは下図の様になります。
メタデータ
メタデータは、RPCコールの認証の詳細などの情報をキーと値のペアのリスト形式で表したもので、キーは文字列、値は通常文字列ですが、バイナリデータである場合もあります。
デットライン
クライアントがデータを受け取るまでの時間を設定することができます。
サーバーはタイムアウトしたかどうか、もしくはタイムアウトまでの残り時間を問い合わせることができます。
Server streaming RPCs
Server streaming RPCsは1つのリクエストに複数のレスポンスを返却する方式です。
プッシュ通知などに用いられます。
rpc メソッド名(リクエスト) returns (stream レスポンス);
Client streaming RPCs
Client streaming RPCsは複数のリクエストに1つのレスポンスを返す方式です。
画像のアップロード等に用いられます。
rpc メソッド名(stream リクエスト) returns (レスポンス);
Bidirectional streaming RPCs
Bidirectionalは双方向という意味です。
Bidirectional streaming RPCsは複数のリクエストに複数のレスポンスの方式で、どの様な順序でもやり取り可能です。
チャットなどに用いられます。
rpc メソッド名(stream リクエスト) returns (stream レスポンス);
実際に使用してみる
それでは実際にgRPCを使用してみます。
今回は、Unary RPCsを使用して、クライアントがリクエストを投げてサーバーがレスポンスを返却する簡単なAPIを作成します。
最後にクライアントでレスポンスをJSONに変換して出力します。
protoファイルはこちらの記事で取り扱ったファイルとほぼ同様のものを使用します。
なるべく多くの型を使用したかったのでResponse
messageのフィールドは多めになっています。
※今回の実装においてエラーハンドリングは省略していますが、実際に運用する場合には状況に応じて追加する必要があります。
ptoroファイルの作成
ディレクトリ構成は以下の様になります。
.
└── proto
└── main.proto
今回は、QiitaService
というサービスのMethod
というUnary RPCsを定義します。
proto
というディレクトリを作成し、その配下にmain.proto
を作成します。
main.proto
syntax = "proto3";
package main;
option go_package = "./pb";
message Request {}
message Response {
string s = 1;
int32 i = 2;
enumerations enumeration = 3;
repeated string arr = 4;
map<string, MapValue> map = 5;
oneof oneof {
string oneofS = 6;
OneofField oneofF = 7;
}
Parent.Cheidlen nest = 8;
}
service QiitaService {
rpc Method (Request) returns (Response);
}
enum enumerations {
UNKNOW = 0;
ONE = 1;
TWO = 2;
}
message MapValue {}
message OneofField {}
message Parent {
message Cheidlen {}
}
コンパイルとgo mod
上記のファイルを下記コマンドでコンパイルします。
protoc -I. --go_out=. --go-grpc_out=. proto/main.proto
正常にコンパイルされるとpb
ディレクトリが作成されて配下にmain.pb.go
、main_grpc.pb.go
が生成されています。
次にgo modの初期化とtidyを実行します。
go mod init qiita
go mod tidy
コンパイルしてgo modの設定を行った後のディレクトリ構成は以下の様になります。
.
├── go.mod
├── go.sum
├── pb
│ ├── main.pb.go
│ └── main_grpc.pb.go
└── proto
└── main.proto
サーバーの実装
それでは、サーバー側を実装します。
カレントディレクトリにserver
というディレクトリを作成し、その中にmain.go
というファイルを作成します。
ディレクトリ構成は以下の様になります。
.
├── go.mod
├── go.sum
├── pb
│ ├── main.pb.go
│ └── main_grpc.pb.go
├── proto
│ └── main.proto
└── server
└── main.go
では、今回定義したMethod
を使用してみようと思います。
※本来はSSL通信等を用いて暗号化する必要がありますが、今回はローカルのみの利用であるため省略しています。
package main
import (
"context"
"fmt"
"net"
"qiita/pb"
"google.golang.org/grpc"
)
type server struct {
// QiitaServiceServer interfaceを満たす
pb.UnimplementedQiitaServiceServer
}
func (*server) Method(ctx context.Context, req *pb.Request) (*pb.Response, error) {
fmt.Println("Method calling")
data := &pb.Response{
S: "string",
I: 1,
Enumeration: pb.Enumerations_ONE,
Arr: []string{"zero", "one"},
Map: map[string]*pb.MapValue{"Map": &pb.MapValue{}},
Oneof: &pb.Response_OneofS{
OneofS: "one of type",
},
Nest: &pb.Parent_Cheidlen{},
}
return data, nil
}
func main() {
lis, _ := net.Listen("tcp", "localhost:50051")
s := grpc.NewServer()
pb.RegisterQiitaServiceServer(s, &server{})
s.Serve(lis)
}
解説
上記のserver/main.go
に関して解説します。
server構造体とMethodメソッド
以下でserver
構造体とそれをレシーバーにしているMethod
というメソッドを定義しています。
type server struct {
pb.UnimplementedQiitaServiceServer
}
func (*server) Method(ctx context.Context, req *pb.Request) (*pb.Response, error) {
...
}
server構造体のフィールドにあるpb.UnimplementedQiitaServiceServer
構造体とその関係のあるメソッドやinterfaceを見てみると以下の様な定義になっています。
type QiitaServiceServer interface {
Method(context.Context, *Request) (*Response, error)
mustEmbedUnimplementedQiitaServiceServer()
}
// UnimplementedQiitaServiceServer must be embedded to have forward compatible implementations.
type UnimplementedQiitaServiceServer struct {
}
func (UnimplementedQiitaServiceServer) Method(context.Context, *Request) (*Response, error) {
return nil, status.Errorf(codes.Unimplemented, "method Method not implemented")
}
func (UnimplementedQiitaServiceServer) mustEmbedUnimplementedQiitaServiceServer() {}
上記を見て分かるようにpb.UnimplementedQiitaServiceServer
構造体はQiitaServiceServer
interfacを満たしているため、Method
、mustEmbedUnimplementedQiitaServiceServer
の2つのメソッドを使用することができます。
よって、pb.UnimplementedQiitaServiceServer
構造体をフィールドに持つserver
も2つのメソッドを使用できるということになります。
Response構造体の初期化
下記でResponse構造体を初期化していますが、列挙型やMap型などのフイールドに関してどのコードを参照して初期化しているか確認していきます。
data := &pb.Response{
S: "string",
I: 1,
Enumeration: pb.Enumerations_ONE,
Arr: []string{"zero", "one"},
Map: map[string]*pb.MapValue{"Map": &pb.MapValue{}},
Oneof: &pb.Response_OneofS{
OneofS: "one of type",
},
Nest: &pb.Parent_Cheidlen{},
}
Enumeration
Enumeration: pb.Enumerations_ONE
列挙型のEnumeration
というフィールドには下記定数のいずれかを格納することができます。
const (
Enumerations_UNKNOW Enumerations = 0
Enumerations_ONE Enumerations = 1
Enumerations_TWO Enumerations = 2
)
Map
Map: map[string]*pb.MapValue{"Map": &pb.MapValue{}}
Map
フィールドは下記の様に定義されています。
Associative map[string]*MapValue
Oneof
Oneof: &pb.Response_OneofS{
OneofS: "one of type",
}
Response_OneofS
構造体のOneofS
フィールドを使っています。
type Response_OneofS struct {
OneofS string `protobuf:"bytes,6,opt,name=oneofS,proto3,oneof"`
}
Nest
Nest: &pb.Parent_Cheidlen{},
Parent_Cheidlen
構造体を使っています。
type Parent_Cheidlen struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
}
main関数でのサーバー起動
main
関数でサーバーの起動をしています。
func main() {
lis, _ := net.Listen("tcp", "localhost:50051")
s := grpc.NewServer()
pb.RegisterQiitaServiceServer(s, &server{})
s.Serve(lis)
}
下記コードで通信方法、ポート番号の設定をしています。
lis, _ := net.Listen("tcp", "localhost:50051")
下記では、google.golang.org/grpc
のNewServer
関数を使用しています。
s := grpc.NewServer()
下記では、pb/mian_grpc.pb.go
ファイルのRegisterQiitaServiceServer
関数を使用して、gRPCサーバーがMethod
などのメソッドを使用できるように設定しています。
pb.RegisterQiitaServiceServer(s, &server{})
func RegisterQiitaServiceServer(s grpc.ServiceRegistrar, srv QiitaServiceServer) {
s.RegisterService(&QiitaService_ServiceDesc, srv)
}
指定したポートでサーバーを起動しています。
s.Serve(lis)
クライアントの実装
では、クライアントを実装します。
クライアントに関しては、サーバーに接続してリクエストを送信し、レスポンスを出力する簡単な機能になるので各コードに何を行っているか軽くコメントしています。
package main
import (
"context"
"encoding/json"
"fmt"
"qiita/pb"
"google.golang.org/grpc"
)
func main() {
// ホストのアドレス/ポートを設定する
connect, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
// 関数終了後にサーバーとの接続を切る
defer connect.Close()
// main_grpc.pb.goのNewQiitaServiceClientを使用
client := pb.NewQiitaServiceClient(connect)
// Methodを呼び出して出力する
res, _ := client.Method(context.Background(), &pb.Request{})
out, _ := json.Marshal(res)
fmt.Println(string(out))
}
通信してみる
それでは、以下のコマンドを実行して実際に通信してみます。
% go run server/main.go
// サーバー起動後にクライアントを実行
% go run client/main.go
正常に処理が終了すると、クライアントで以下のデータを出力することができます。
{
"s": "string",
"i": 1,
"enumeration": 1,
"arr": [
"zero",
"one"
],
"map": {
"Map": {}
},
"Oneof": {
"OneofS": "one of type"
},
"nest": {}
}
サーバー側で初期化したデータを受け取ることができました。
基本的な実装に関しては以上です。
interceptor
interceptorを使用すると、メソッドの前後で処理を行うことができます。
認証やロギングなどで使用されます。
今回はUnary RPCのサーバーにinterceptorを使用したいのでUnaryServerInterceptor
というメソッドを使用します。
では、先ほどまで使用していたserver/main.go
に追加・修正するコードのみ載せています。
func Log() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
fmt.Println("receive request")
resp, err = handler(ctx, req)
if err != nil {
return nil, err
}
fmt.Println("resp: ", resp)
return resp, nil
}
}
func main() {
// その他省略
s := grpc.NewServer(grpc.UnaryInterceptor(Log()))
}
解説
まず、Log
という関数を定義します。
Log
関数はgoogle.golang.org/grpc
のUnaryServerInterceptor
という関数型を戻り値としています。
google.golang.org/grpc
のUnaryServerInterceptor
は下記のように定義されています。
type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
UnaryServerInterceptor
関数の処理内ではログを埋め込んでいます。
今回はreceive request
という出力をし、handler
関数の戻り値のレスポンスをresp
変数に格納して出力しています。
それでは、サーバーを再起動してクライアントからリクエストを送信してみます。
そうするとLog
関数内で記載したログが出力されていることが分かります。
receive request
Method calling
resp: s:"string" i:1 enumeration:ONE arr:"zero" arr:"one" map:{key:"Map" value:{}} oneofS:"one of type" nest:{}
今回はログを自作時ましたがgrpcのパッケージを使用すれば幅広くログや認証に対応することができるようです。
デットライン
Unary RPCsの章で説明しましたが、デットラインを用いることで指定の時間までにレスポンスがないとクライアントは処理を終了します。
では、実際にデットラインを使用してみます。
クライアント
クライアントはcontext.WithTimeout
を使用してタイムアウトする時間を設定してctx
という変数に格納します。
その変数をMethod
メソッドを呼び出す際の第一引数に設定してデットラインを設定することができます。
func main() {
// 追加
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
connect, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
defer connect.Close()
client := pb.NewQiitaServiceClient(connect)
// 更新
res, _ := client.Method(ctx, &pb.Request{})
out, _ := json.Marshal(res)
fmt.Println(string(out))
}
サーバー
サーバーでは、意図的にタイムアウトさせるため。10秒間レスポンスを送信するまで待機します。
func (*server) Method(ctx context.Context, req *pb.Request) (*pb.Response, error) {
fmt.Println("Method calling")
// 追加
time.Sleep(time.Second * 10)
// 省略
}
以上で実装は完了です。
では、サーバーを起動してリクエストを送ってみるとクライアントのコンソールは以下の様な結果になり、レスポンスが返却されていないことが分かります。
% go run client/main.go
null
5秒経ってもレスポンスが帰ってきていないので処理が終了しています。
以上、gRPCに関して簡単にまとめてみました。