113
88

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

RE:CODEAdvent Calendar 2019

Day 24

goでgRPCの4つの通信方式やってみた(Dockerのサンプルあり)

Last updated at Posted at 2020-01-25
スクリーンショット 2020-01-25 12.46.05.png

goでgRPCを実装してみました。

gRPCの4つの通信方式を実際にやってみます。
・Unary RPCs(SimpleRPC)
・Server streaming RPC
・Client streaming RPC
・Bidirectional streaming RPC

この記事内容を上から行えば、Dockerで動く環境が作れます。
Dockerでこの4つの実装してる記事は少ないはず。

gRPCとは

googleの「RPCのフレームワーク」です。
RPCは、アプリケーション間のデータ通信するためのプログラムのことです。
gRPCでは、以下を提供します。
HTTP/2、10言語の公式サポートライブラリ、プロトコルバッファ(構造体みたいなもの)を定義することにより言語を問わずにデータ通信が可能、ヘルスチェック、ロードバランシングなどをサポート。

一言で言うと、簡単にアプリケーション間のデータ通信が高速にできるやつ。

webからの呼び出し

以下のライブラリで可能。
grpc-gateway
( grpc-gateway使わなかったら、jsで自前でプロトコルバッファをコンパイルしないといけないはず)
ただ、双方向ストリーミングは、対応してません。(2020年4月)

gRPCの4つの通信方式

StreamingとUnaryの組み合わせのため4つあります。
以下用途と共に説明します。
公式を参考

##・Unary RPC
一つのデータを受け取って、一つのデータを返す。
動きはよくあるrestAPIと同じ。
用途:サーバ間通信、API、アプリとサーバのデータ送受信。
(Unary : Unary)
one.png

##・Server streaming RPC
サーバがクライアントに複数のリクエストを送る。
用途:サーバから任意のタイミングでクライアントに通知させたい時など。
(Unary : Streaming)
three.jpg

##・Client streaming RPC
クライアントがサーバに複数のリクエストを送る。
用途:データアップロードや、クライアントから多くのデータを送る場合。
(Streaming : Unary)
two.png

##・Bidirectional streaming RPC
双方向でデータのやりとりをする。
用途:チャットやオンライン対戦など。
(Streaming : Streaming)
four.jpg

サーバ間通信とかで使う

ある処理がめっちゃ「メモリ使う、CPU使う」という時は、サーバを分けるとコスト削減できます。
(ユーザ数とかが多くない場合は不要)
そういう時に、サーバ間でデータの受け渡しが必要になります。

gRPCを使うメリットとして、
・プロトコルバッファを定義すれば、そのデータ形式(構造体的な感じ)でやりとりができる。
=> restAPIだとエンドポイントを定義してそこにリクエストを叩かないといけないので、単純に面倒。
・プロトコルバッファに型とキー情報があるのでドキュメント不要。
=> restAPIと違って、エンドポイントのドキュメントなくても分かりやすい。
・速い

プロトコルバッファ

データや通信方式を定義するIDLのこと。
・「レスポンスとリクエスト」のデータ形式
・ストリーミングを使うか
・パッケージ名
などを定義する。
クライアントとサーバの両方で持ってないといけない。
protcというコンパイラがあるので、今回はこのコンパイラを使ってGO言語ファイルを吐き出す。

環境構築

Docker環境

・フォルダ構成
以下のフォルダを想定しています。

./infrastructure/docker/go-grpc/Dockerfile
./infrastructure/docker/docker-compose.yml

・フォルダを作成します。

mkdir -p ./infrastructure/docker/go-grpc
cd ./infrastructure/docker
touch docker-compose.yml ./go-grpc/Dockerfile

作成されました。
スクリーンショット 2020-01-25 12.01.00.png

・Dockerfile

FROM golang:1.13-stretch

SHELL ["/bin/bash", "-c"]
RUN apt update && apt-get install -y vim unzip

# install protc
WORKDIR /protoc
RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v3.11.2/protoc-3.11.2-linux-x86_64.zip
RUN unzip protoc-3.11.2-linux-x86_64.zip
RUN ln -s /protoc/bin/protoc /bin/protoc

# golang
WORKDIR /go-grpc
ENV GO111MODULE on
RUN go get -u github.com/golang/protobuf/protoc-gen-go

・docker-compose.yml

version: "3.7"
services:
  go-grpc:
    build:
      context: ./go-grpc/
      dockerfile: Dockerfile
    container_name: "go-grpc"
    volumes:
      - ../../:/go-grpc
    tty: true
    privileged: true

・環境に入る
以下のコマンドでdockerコンテナの中に入ります!

cd ./infrastructure/docker
docker-compose up -d
docker-compose exec go-grpc bash
スクリーンショット 2020-01-26 21.11.40.png

実装

上の4つの通信方式を全て動かしてみます。
必要なやつだけを読んでも大丈夫です。

Unary RPC

2つの数字を送ったら足算して返すサーバとクライアントを作成します。

one.png

protoの作成

以下dockerコンテナに入った状態で作業します。
コンパイラである「protoc」は、dockerコンテナにインストールしてあります。

「a,bをintで受け取ってメッセージを返す」を想定したprotoです。

mkdir ./proto
touch ./proto/calc.proto

・calc.proto


syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.simple";
option java_outer_classname = "SimpleProto";

// package名
package calc;

// 上の4つのパターンをここで指定する。
service Calc {
  rpc Sum (SumRequest) returns (SumReply) {}
}

// リクエストで送る値。「1、2」はそのデータの番号。
message SumRequest {
  int32 a = 1;
  int32 b = 2;
}

// レスポンスで送る値
message SumReply {
  string message = 1;
}

・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/calcフォルダに「calc.pb.go」が自動生成されます。

protoc --proto_path ./proto --go_out=plugins=grpc:./pb/calc calc.proto

「calc.pb.go」が生成されました。

スクリーンショット 2020-01-25 12.00.10.png

goで実行

・以下のコマンドで、gomoduleを初期設定をします。

go mod init grpc-sample

以下のファイルが作成されました。
スクリーンショット 2020-01-25 11.58.16.png

「受け取る側のサーバー」と「送る側のクライアント」の2つの実行ファイルを作成します。

mkdir server client
touch ./server/main.go ./client/main.go

・./server/main.go

package main

import (
	"context"
	"fmt"
	"log"
	"net"

	pb "grpc-sample/pb/calc"

	"github.com/pkg/errors"
	"google.golang.org/grpc"
)

const port = ":50051"

// ServerUnary is server
type ServerUnary struct {
	pb.UnimplementedCalcServer
}

// Sum 二つの値を受け取り、合計してクライアントへ返す
func (s *ServerUnary) Sum(ctx context.Context, in *pb.SumRequest) (*pb.SumReply, error) {
	a := in.GetA()
	b := in.GetB()
	fmt.Println(a, b)
	reply := fmt.Sprintf("%d + %d = %d", a, b, a+b)
	return &pb.SumReply{
		Message: reply,
	}, nil
}

func set() error {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		return errors.Wrap(err, "ポート失敗")
	}
	s := grpc.NewServer()
	var server ServerUnary
	pb.RegisterCalcServer(s, &server)
	if err := s.Serve(lis); err != nil {
		return errors.Wrap(err, "サーバ起動失敗")
	}
	return nil
}

func main() {
	fmt.Println("起動")
	if err := set(); err != nil {
		log.Fatalf("%v", err)
	}
}

・./client/main.go

package main

import (
	"context"
	"log"
	"time"

	"github.com/pkg/errors"

	pb "grpc-sample/pb/calc"

	"google.golang.org/grpc"
)

func request(client pb.CalcClient, a, b int32) error {
	ctx, cancel := context.WithTimeout(
		context.Background(),
		time.Second,
	)
	defer cancel()
	sumRequest := pb.SumRequest{
		A: a,
		B: b,
	}
	reply, err := client.Sum(ctx, &sumRequest)
	if err != nil {
		return errors.Wrap(err, "受取り失敗")
	}
	log.Printf("サーバからの受け取り\n %s", reply.GetMessage())
	return nil
}

func sum(a, b int32) error {
	address := "localhost:50051"
	conn, err := grpc.Dial(
		address,
		grpc.WithInsecure(),
		grpc.WithBlock(),
	)
	if err != nil {
		return errors.Wrap(err, "コネクションエラー")
	}
	defer conn.Close()
	client := pb.NewCalcClient(conn)
	return request(client, a, b)
}

func main() {
	a := int32(300)
	b := int32(500)
	if err := sum(a, b); err != nil {
		log.Fatalf("%v", err)
	}
}

・サーバを実行します。

go run server/main.go

これでサーバが立ったので、クライアントからデータを送ります。

go run client/main.go

サーバ側の表示
上のコマンドを実行したので、
クライアントから、300と500という値を受け取りました。
スクリーンショット 2020-01-25 12.05.38.png

クライアント側の表示
300, 500という数字を送ったので、800という数字がサーバから返ってきました。
スクリーンショット 2020-01-25 12.05.24.png

1リクエスト1レスポンスのサーバとクライアンを動かすことができました。
これで、「めっちゃ負荷が高い処理をする機能」があった場合とかにサーバを分割できたりもする。

Server streaming RPC

サーバから値を受け取りづつけます。

three.jpg

protoの作成

以下dockerコンテナに入った状態で作業します。

mkdir ./proto
touch ./proto/notification.proto

・notification.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.notification";
option java_outer_classname = "NotificationProto";

package notification;

service Notification {
  rpc Notification (NotificationRequest) returns (stream NotificationReply) {}
}

message NotificationRequest {
  int32 num = 1;
}

message NotificationReply {
  string message = 1;
}

・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/notificationフォルダに「notification.pb.go」が自動生成されます。

protoc --proto_path ./proto --go_out=plugins=grpc:./pb/notification notification.proto

「notification.pb.go」が生成されました。

・clinet

package main

import (
	"context"
	"io"
	"log"

	"github.com/pkg/errors"

	pb "grpc-sample/pb/notification"

	"google.golang.org/grpc"
)

func request(client pb.NotificationClient, num int32) error {
	req := &pb.NotificationRequest{
		Num: num,
	}
	stream, err := client.Notification(context.Background(), req)
	if err != nil {
		return errors.Wrap(err, "streamエラー")
	}
	for {
		reply, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return err
		}
		log.Println("これ:", reply.GetMessage())
	}
	return nil
}

func exec(num int32) error {
	address := "localhost:50051"
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		return errors.Wrap(err, "コネクションエラー")
	}
	defer conn.Close()
	client := pb.NewNotificationClient(conn)
	return request(client, num)
}

func main() {
	num := int32(5)
	if err := exec(num); err != nil {
		log.Println(err)
	}
}

・server

package main

import (
	"fmt"
	"log"
	"net"
	"time"

	pb "grpc-sample/pb/notification"

	"github.com/pkg/errors"
	"google.golang.org/grpc"
)

const port = ":50051"

// ServerServerSide is server
type ServerServerSide struct {
	pb.UnimplementedNotificationServer
}

// Notification is
func (s *ServerServerSide) Notification(req *pb.NotificationRequest, stream pb.Notification_NotificationServer) error {
	fmt.Println("リクエスト受け取った")
	for i := int32(0); i < req.GetNum(); i++ {
		message := fmt.Sprintf("%d", i)
		if err := stream.Send(&pb.NotificationReply{
			Message: message,
		}); err != nil {
			return err
		}
		time.Sleep(time.Second * 1)
	}
	return nil
}

func set() error {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		return errors.Wrap(err, "ポート失敗")
	}
	s := grpc.NewServer()
	var server ServerServerSide
	pb.RegisterNotificationServer(s, &server)
	if err := s.Serve(lis); err != nil {
		return errors.Wrap(err, "サーバ起動失敗")
	}
	return nil
}

func main() {
	fmt.Println("起動")
	if err := set(); err != nil {
		log.Fatalf("%v", err)
	}
}

Client streaming RPC

クライアントから値を受け取りづつけます。
two.png

protoの作成

以下dockerコンテナに入った状態で作業します。

mkdir ./proto
touch ./proto/upload.proto

・upload.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.upload";
option java_outer_classname = "UploadProto";

package upload;

service Upload {
  rpc Upload (stream UploadRequest) returns (UploadReply) {}
}

message UploadRequest {
  int32 value = 1;
}

message UploadReply {
  string message = 1;
}

・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/uploadフォルダに「upload.pb.go」が自動生成されます。

protoc --proto_path ./proto --go_out=plugins=grpc:./pb/upload upload.proto

「upload.pb.go」が生成されました。

・clinet

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"time"

	"github.com/pkg/errors"

	pb "grpc-sample/pb/upload"

	"google.golang.org/grpc"
)

func request(client pb.UploadClient) error {
	stream, err := client.Upload(context.Background())
	if err != nil {
		return err
	}
	values := []int32{1, 2, 3, 4, 5}
	for _, value := range values {
		fmt.Println("送る値:", value)
		if err := stream.Send(&pb.UploadRequest{
			Value: value,
		}); err != nil {
			if err == io.EOF {
				break
			}
			return err
		}
		time.Sleep(time.Second * 1)
	}
	reply, err := stream.CloseAndRecv()
	if err != nil {
		return err
	}
	log.Printf("結果: %v", reply)
	return nil
}

func exec() error {
	address := "localhost:50051"
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		return errors.Wrap(err, "コネクションエラー")
	}
	defer conn.Close()
	client := pb.NewUploadClient(conn)
	return request(client)
}

func main() {
	if err := exec(); err != nil {
		log.Println(err)
	}
}

・server

package main

import (
	"fmt"
	"io"
	"log"
	"net"

	pb "grpc-sample/pb/upload"

	"github.com/pkg/errors"
	"google.golang.org/grpc"
)

const port = ":50051"

// ServerClientSide is servre
type ServerClientSide struct {
	pb.UnimplementedUploadServer
}

// Upload 複数の送られてきた数字を合計する
func (s *ServerClientSide) Upload(stream pb.Upload_UploadServer) error {
	var sum int32
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			message := fmt.Sprintf("DONE: sum = %d", sum)
			return stream.SendAndClose(&pb.UploadReply{
				Message: message,
			})
		}
		if err != nil {
			return err
		}
		fmt.Println(req.GetValue())
		sum += req.GetValue()
	}
}

func set() error {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		return errors.Wrap(err, "ポート失敗")
	}
	s := grpc.NewServer()
	var server ServerClientSide
	pb.RegisterUploadServer(s, &server)
	if err := s.Serve(lis); err != nil {
		return errors.Wrap(err, "サーバ起動失敗")
	}
	return nil
}

func main() {
	fmt.Println("起動")
	if err := set(); err != nil {
		log.Fatalf("%v", err)
	}
}

Bidirectional streaming RPC

双方で、値を受け取りづつけます。
four.jpg

protoの作成

以下dockerコンテナに入った状態で作業します。

mkdir ./proto
touch ./proto/chat.proto

・chat.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.chat";
option java_outer_classname = "ChatProto";

package chat;

service Chat {
  rpc Chat (stream ChatRequest) returns (stream ChatReply) {}
}

message ChatRequest {
  string message = 1;
}

message ChatReply {
  string message = 1;
}

・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/chatフォルダに「chat.pb.go」が自動生成されます。

protoc --proto_path ./proto --go_out=plugins=grpc:./pb/chat chat.proto

「chat.pb.go」が生成されました。

・clinet

package main

import (
	"context"
	"io"
	"log"
	"time"

	"github.com/pkg/errors"

	pb "grpc-sample/pb/chat"

	"google.golang.org/grpc"
)

func receive(stream pb.Chat_ChatClient) error {
	waitc := make(chan struct{})
	go func() {
		for {
			in, err := stream.Recv()
			if err == io.EOF {
				close(waitc)
				return
			}
			if err != nil {
				log.Fatalf("エラー: %v", err)
			}
			log.Printf("サーバから:%s", in.Message)

			// お返し
			stream.Send(&pb.ChatRequest{
				Message: time.Now().Format("2006-01-02 15:04:05"),
			})
		}
	}()
	<-waitc
	return nil
}

func request(stream pb.Chat_ChatClient) error {
	return stream.Send(&pb.ChatRequest{
		Message: "こんにちは",
	})
}

func chat(client pb.ChatClient) error {
	stream, err := client.Chat(context.Background())
	if err != nil {
		return err
	}
	if err := request(stream); err != nil {
		return err
	}
	if err := receive(stream); err != nil {
		return err
	}
	stream.CloseSend()
	return nil
}

func exec() error {
	address := "localhost:50051"
	conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
	if err != nil {
		return errors.Wrap(err, "コネクションエラー")
	}
	defer conn.Close()
	client := pb.NewChatClient(conn)
	return chat(client)
}

func main() {
	if err := exec(); err != nil {
		log.Println(err)
	}
}

・server

package main

import (
	"fmt"
	"io"
	"log"
	"net"
	"time"

	pb "grpc-sample/pb/chat"

	"github.com/pkg/errors"
	"google.golang.org/grpc"
)

const port = ":50051"

// ServerBidirectional is server
type ServerBidirectional struct {
	pb.UnimplementedChatServer
}

func request(stream pb.Chat_ChatServer, message string) error {
	reply := fmt.Sprintf("%sを受け取ったよ!ありがとう^^", message)
	return stream.Send(&pb.ChatReply{
		Message: reply,
	})
}

// Chat クライアントから受け取った言葉に、言葉を返す
func (s *ServerBidirectional) Chat(stream pb.Chat_ChatServer) error {
	for {
		in, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return err
		}
		message := in.GetMessage()
		fmt.Println("受取:", message)

		if err := request(stream, message); err != nil {
			return err
		}
		time.Sleep(time.Second * 1)
	}
}

func set() error {
	lis, err := net.Listen("tcp", port)
	if err != nil {
		return errors.Wrap(err, "ポート失敗")
	}
	s := grpc.NewServer()
	var server ServerBidirectional
	pb.RegisterChatServer(s, &server)
	if err := s.Serve(lis); err != nil {
		return errors.Wrap(err, "サーバ起動失敗")
	}
	return nil
}

func main() {
	fmt.Println("起動")
	if err := set(); err != nil {
		log.Fatalf("%v", err)
	}
}

感想

4方式をやってみて感覚が掴めました。
エンドポイントの定義とかしなくて良いし、簡単にストリーミングも使えるしめっちゃ便利だと思った。
サーバ間通信やUnityやスマホアプリで使ってみたい。

113
88
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
113
88

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?