LoginSignup
1
0

More than 1 year has passed since last update.

gRPCに関して簡単にまとめてみた

Posted at

初めに

業務でgRPCを使用する機会があったので学習した内容をアウトプットのため投稿します。
プロトコルバッファに関してはこちらの記事でまとめています。
また、使用する言語はGoを選択しています。

gRPCとは

gRPCとは、クライアントがサーバーのメソッドを呼び出して通信することができるRPC(Remote Procedure Calls)です。
HTTP/2を使用し、デフォルトではProtocol BuffersをIDL・データのエンコーディングとして用います。
クライアントはスタブというものを用いてサーバーのメソッドを呼び出します。
HTTP/2の使用や通信データのバイナリ化などを用いることで高速な通信が可能です。
下図のようなマイクロサービスで用いられることが多いです。

スクリーンショット 2023-01-08 18.16.35.png
画像出典: gRPC公式

簡易的な例

gRPCでは、既存のRPCと同様にサービスを定義し、その中にメソッドやリクエスト・戻り値を指定します。
1つのサービスに複数のメソッドを定義することができ、メソッドはgRPCのエンドポイントになります。

service HelloService {
  rpc SayHello (HelloRequest) returns (HelloResponse);
}

message HelloRequest {
  string greeting = 1;
}

message HelloResponse {
  string reply = 1;
}

出典: gRPC公式

4種類のサービスメソッド

gRPCでは、下記の4種類のサービスメソッドを使用することができます。

  • Unary RPCs
  • Server streaming RPCs
  • Client streaming RPCs
  • Bidirectional streaming RPCs

それぞれの特徴を記載していきます。

Unary RPCs

Unary RPCsは1つのリクエストに1つのレスポンスを返す方式です。
API等に用いられます。

rpc メソッド名(リクエスト) returns (レスポンス);

処理の流れは下図の様になります。

スクリーンショット 2023-01-18 18.05.27.png

メタデータ

メタデータは、RPCコールの認証の詳細などの情報をキーと値のペアのリスト形式で表したもので、キーは文字列、値は通常文字列ですが、バイナリデータである場合もあります。

デットライン

クライアントがデータを受け取るまでの時間を設定することができます。
サーバーはタイムアウトしたかどうか、もしくはタイムアウトまでの残り時間を問い合わせることができます。

Server streaming RPCs

Server streaming RPCsは1つのリクエストに複数のレスポンスを返却する方式です。
プッシュ通知などに用いられます。

rpc メソッド名(リクエスト) returns (stream レスポンス);

スクリーンショット 2023-01-18 18.04.31.png

Client streaming RPCs

Client streaming RPCsは複数のリクエストに1つのレスポンスを返す方式です。
画像のアップロード等に用いられます。

rpc メソッド名(stream リクエスト) returns (レスポンス);

スクリーンショット 2023-01-18 18.10.48.png

Bidirectional streaming RPCs

Bidirectionalは双方向という意味です。
Bidirectional streaming RPCsは複数のリクエストに複数のレスポンスの方式で、どの様な順序でもやり取り可能です。
チャットなどに用いられます。

rpc メソッド名(stream リクエスト) returns (stream レスポンス);

スクリーンショット 2023-01-18 18.11.14.png

実際に使用してみる

それでは実際にgRPCを使用してみます。
今回は、Unary RPCsを使用して、クライアントがリクエストを投げてサーバーがレスポンスを返却する簡単なAPIを作成します。
最後にクライアントでレスポンスをJSONに変換して出力します。

protoファイルはこちらの記事で取り扱ったファイルとほぼ同様のものを使用します。
なるべく多くの型を使用したかったのでResponsemessageのフィールドは多めになっています。

※今回の実装においてエラーハンドリングは省略していますが、実際に運用する場合には状況に応じて追加する必要があります。

ptoroファイルの作成

ディレクトリ構成は以下の様になります。

.
└── proto
    └── main.proto

今回は、QiitaServiceというサービスのMethodというUnary RPCsを定義します。
protoというディレクトリを作成し、その配下にmain.protoを作成します。

main.proto

syntax = "proto3";

package main;

option go_package = "./pb";

message Request {}
message Response {
    string s = 1;
    int32 i = 2;
    enumerations enumeration = 3;
    repeated string arr = 4;
    map<string, MapValue> map = 5;
    oneof oneof {
        string oneofS = 6;
        OneofField oneofF = 7;
    }
    Parent.Cheidlen nest = 8;
}

service QiitaService {
    rpc Method (Request) returns (Response);
}

enum enumerations {
    UNKNOW = 0;
    ONE = 1;
    TWO = 2;
}

message MapValue {}
message OneofField {}
message Parent {
    message Cheidlen {}
}

コンパイルとgo mod

上記のファイルを下記コマンドでコンパイルします。

protoc -I. --go_out=. --go-grpc_out=. proto/main.proto

正常にコンパイルされるとpbディレクトリが作成されて配下にmain.pb.gomain_grpc.pb.goが生成されています。
次にgo modの初期化とtidyを実行します。

go mod init qiita
go mod tidy

コンパイルしてgo modの設定を行った後のディレクトリ構成は以下の様になります。

.
├── go.mod
├── go.sum
├── pb
│   ├── main.pb.go
│   └── main_grpc.pb.go
└── proto
    └── main.proto

サーバーの実装

それでは、サーバー側を実装します。
カレントディレクトリにserverというディレクトリを作成し、その中にmain.goというファイルを作成します。
ディレクトリ構成は以下の様になります。

.
├── go.mod
├── go.sum
├── pb
│   ├── main.pb.go
│   └── main_grpc.pb.go
├── proto
│   └── main.proto
└── server
    └── main.go

では、今回定義したMethodを使用してみようと思います。
※本来はSSL通信等を用いて暗号化する必要がありますが、今回はローカルのみの利用であるため省略しています。

server/main.go
package main

import (
	"context"
	"fmt"
	"net"
	"qiita/pb"

	"google.golang.org/grpc"
)

type server struct {
	// QiitaServiceServer interfaceを満たす
	pb.UnimplementedQiitaServiceServer
}

func (*server) Method(ctx context.Context, req *pb.Request) (*pb.Response, error) {
	fmt.Println("Method calling")

	data := &pb.Response{
		S:           "string",
		I:           1,
		Enumeration: pb.Enumerations_ONE,
		Arr:         []string{"zero", "one"},
		Map:         map[string]*pb.MapValue{"Map": &pb.MapValue{}},
		Oneof: &pb.Response_OneofS{
			OneofS: "one of type",
		},
		Nest: &pb.Parent_Cheidlen{},
	}

	return data, nil
}

func main() {
	lis, _ := net.Listen("tcp", "localhost:50051")

	s := grpc.NewServer()
	pb.RegisterQiitaServiceServer(s, &server{})

	s.Serve(lis)
}

解説

上記のserver/main.goに関して解説します。

server構造体とMethodメソッド

以下でserver構造体とそれをレシーバーにしているMethodというメソッドを定義しています。

server/mian.pb.go
type server struct {
	pb.UnimplementedQiitaServiceServer
}

func (*server) Method(ctx context.Context, req *pb.Request) (*pb.Response, error) {
 ...
}

server構造体のフィールドにあるpb.UnimplementedQiitaServiceServer構造体とその関係のあるメソッドやinterfaceを見てみると以下の様な定義になっています。

pb/mian_grpc.pb.go
type QiitaServiceServer interface {
	Method(context.Context, *Request) (*Response, error)
	mustEmbedUnimplementedQiitaServiceServer()
}

// UnimplementedQiitaServiceServer must be embedded to have forward compatible implementations.
type UnimplementedQiitaServiceServer struct {
}

func (UnimplementedQiitaServiceServer) Method(context.Context, *Request) (*Response, error) {
	return nil, status.Errorf(codes.Unimplemented, "method Method not implemented")
}
func (UnimplementedQiitaServiceServer) mustEmbedUnimplementedQiitaServiceServer() {}

上記を見て分かるようにpb.UnimplementedQiitaServiceServer構造体はQiitaServiceServerinterfacを満たしているため、MethodmustEmbedUnimplementedQiitaServiceServerの2つのメソッドを使用することができます。
よって、pb.UnimplementedQiitaServiceServer構造体をフィールドに持つserverも2つのメソッドを使用できるということになります。

Response構造体の初期化

下記でResponse構造体を初期化していますが、列挙型やMap型などのフイールドに関してどのコードを参照して初期化しているか確認していきます。

server/main.go
	data := &pb.Response{
		S:           "string",
		I:           1,
		Enumeration: pb.Enumerations_ONE,
		Arr:         []string{"zero", "one"},
		Map:         map[string]*pb.MapValue{"Map": &pb.MapValue{}},
		Oneof: &pb.Response_OneofS{
			OneofS: "one of type",
		},
		Nest: &pb.Parent_Cheidlen{},
	}

Enumeration

server/main.go
Enumeration: pb.Enumerations_ONE

列挙型のEnumerationというフィールドには下記定数のいずれかを格納することができます。

pb/mian.pb.go
const (
	Enumerations_UNKNOW Enumerations = 0
	Enumerations_ONE    Enumerations = 1
	Enumerations_TWO    Enumerations = 2
)

Map

server/main.go
Map: map[string]*pb.MapValue{"Map": &pb.MapValue{}}

Map フィールドは下記の様に定義されています。

pb/mian.pb.go
Associative map[string]*MapValue

Oneof

server/main.go
Oneof: &pb.Response_OneofS{
    OneofS: "one of type",
}

Response_OneofS構造体のOneofSフィールドを使っています。

pb/mian.pb.go
type Response_OneofS struct {
	OneofS string `protobuf:"bytes,6,opt,name=oneofS,proto3,oneof"`
}

Nest

server/main.go
Nest: &pb.Parent_Cheidlen{},

Parent_Cheidlen構造体を使っています。

pb/mian.pb.go
type Parent_Cheidlen struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields
}

main関数でのサーバー起動

main関数でサーバーの起動をしています。

server/main.go
func main() {
	lis, _ := net.Listen("tcp", "localhost:50051")

	s := grpc.NewServer()
	pb.RegisterQiitaServiceServer(s, &server{})

	s.Serve(lis)
}

下記コードで通信方法、ポート番号の設定をしています。

lis, _ := net.Listen("tcp", "localhost:50051")

下記では、google.golang.org/grpcNewServer関数を使用しています。

s := grpc.NewServer()

下記では、pb/mian_grpc.pb.goファイルのRegisterQiitaServiceServer関数を使用して、gRPCサーバーがMethodなどのメソッドを使用できるように設定しています。

pb.RegisterQiitaServiceServer(s, &server{})
pb/mian_grpc.pb.go
func RegisterQiitaServiceServer(s grpc.ServiceRegistrar, srv QiitaServiceServer) {
	s.RegisterService(&QiitaService_ServiceDesc, srv)
}

指定したポートでサーバーを起動しています。

s.Serve(lis)

クライアントの実装

では、クライアントを実装します。
クライアントに関しては、サーバーに接続してリクエストを送信し、レスポンスを出力する簡単な機能になるので各コードに何を行っているか軽くコメントしています。

client/main.go
package main

import (
	"context"
	"encoding/json"
	"fmt"
	"qiita/pb"

	"google.golang.org/grpc"
)

func main() {
	// ホストのアドレス/ポートを設定する
	connect, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
	// 関数終了後にサーバーとの接続を切る
	defer connect.Close()

	// main_grpc.pb.goのNewQiitaServiceClientを使用
	client := pb.NewQiitaServiceClient(connect)
	
	// Methodを呼び出して出力する
	res, _ := client.Method(context.Background(), &pb.Request{})
	out, _ := json.Marshal(res)
	fmt.Println(string(out))
}

通信してみる

それでは、以下のコマンドを実行して実際に通信してみます。

% go run server/main.go
// サーバー起動後にクライアントを実行
% go run client/main.go 

正常に処理が終了すると、クライアントで以下のデータを出力することができます。

{
    "s": "string",
    "i": 1,
    "enumeration": 1,
    "arr": [
        "zero",
        "one"
    ],
    "map": {
        "Map": {}
    },
    "Oneof": {
        "OneofS": "one of type"
    },
    "nest": {}
}

サーバー側で初期化したデータを受け取ることができました。

基本的な実装に関しては以上です。

interceptor

interceptorを使用すると、メソッドの前後で処理を行うことができます。
認証やロギングなどで使用されます。

今回はUnary RPCのサーバーにinterceptorを使用したいのでUnaryServerInterceptorというメソッドを使用します。
では、先ほどまで使用していたserver/main.goに追加・修正するコードのみ載せています。

server/main.go
func Log() grpc.UnaryServerInterceptor {
	return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
		fmt.Println("receive request")
		resp, err = handler(ctx, req)
		if err != nil {
			return nil, err
		}
		fmt.Println("resp: ", resp)
		return resp, nil
	}
}

func main() {
	// その他省略
	s := grpc.NewServer(grpc.UnaryInterceptor(Log()))
}

解説

まず、Logという関数を定義します。
Log関数はgoogle.golang.org/grpcUnaryServerInterceptorという関数型を戻り値としています。
google.golang.org/grpcUnaryServerInterceptorは下記のように定義されています。

type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

UnaryServerInterceptor関数の処理内ではログを埋め込んでいます。
今回はreceive requestという出力をし、handler関数の戻り値のレスポンスをresp変数に格納して出力しています。

それでは、サーバーを再起動してクライアントからリクエストを送信してみます。
そうするとLog関数内で記載したログが出力されていることが分かります。

receive request
Method calling
resp:  s:"string"  i:1  enumeration:ONE  arr:"zero"  arr:"one"  map:{key:"Map"  value:{}}  oneofS:"one of type"  nest:{}

今回はログを自作時ましたがgrpcのパッケージを使用すれば幅広くログや認証に対応することができるようです。

デットライン

Unary RPCsの章で説明しましたが、デットラインを用いることで指定の時間までにレスポンスがないとクライアントは処理を終了します。

では、実際にデットラインを使用してみます。

クライアント

クライアントはcontext.WithTimeoutを使用してタイムアウトする時間を設定してctxという変数に格納します。
その変数をMethodメソッドを呼び出す際の第一引数に設定してデットラインを設定することができます。

client/main.go
func main() {
    // 追加
	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
	defer cancel()

	connect, _ := grpc.Dial("localhost:50051", grpc.WithInsecure())
	defer connect.Close()

	client := pb.NewQiitaServiceClient(connect)
    // 更新
	res, _ := client.Method(ctx, &pb.Request{})
	out, _ := json.Marshal(res)
	fmt.Println(string(out))
}

サーバー

サーバーでは、意図的にタイムアウトさせるため。10秒間レスポンスを送信するまで待機します。

server/main.go
func (*server) Method(ctx context.Context, req *pb.Request) (*pb.Response, error) {
	fmt.Println("Method calling")
    // 追加
	time.Sleep(time.Second * 10)

    // 省略
}

以上で実装は完了です。
では、サーバーを起動してリクエストを送ってみるとクライアントのコンソールは以下の様な結果になり、レスポンスが返却されていないことが分かります。

% go run client/main.go
null

5秒経ってもレスポンスが帰ってきていないので処理が終了しています。

以上、gRPCに関して簡単にまとめてみました。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0