goでgRPCを実装してみました。
gRPCの4つの通信方式を実際にやってみます。
・Unary RPCs(SimpleRPC)
・Server streaming RPC
・Client streaming RPC
・Bidirectional streaming RPC
この記事内容を上から行えば、Dockerで動く環境が作れます。
Dockerでこの4つの実装してる記事は少ないはず。
gRPCとは
googleの「RPCのフレームワーク」です。
RPCは、アプリケーション間のデータ通信するためのプログラムのことです。
gRPCでは、以下を提供します。
HTTP/2、10言語の公式サポートライブラリ、プロトコルバッファ(構造体みたいなもの)を定義することにより言語を問わずにデータ通信が可能、ヘルスチェック、ロードバランシングなどをサポート。
一言で言うと、簡単にアプリケーション間のデータ通信が高速にできるやつ。
webからの呼び出し
以下のライブラリで可能。
grpc-gateway
( grpc-gateway使わなかったら、jsで自前でプロトコルバッファをコンパイルしないといけないはず)
ただ、双方向ストリーミングは、対応してません。(2020年4月)
gRPCの4つの通信方式
StreamingとUnaryの組み合わせのため4つあります。
以下用途と共に説明します。
公式を参考
##・Unary RPC
一つのデータを受け取って、一つのデータを返す。
動きはよくあるrestAPIと同じ。
用途:サーバ間通信、API、アプリとサーバのデータ送受信。
(Unary : Unary)
##・Server streaming RPC
サーバがクライアントに複数のリクエストを送る。
用途:サーバから任意のタイミングでクライアントに通知させたい時など。
(Unary : Streaming)
##・Client streaming RPC
クライアントがサーバに複数のリクエストを送る。
用途:データアップロードや、クライアントから多くのデータを送る場合。
(Streaming : Unary)
##・Bidirectional streaming RPC
双方向でデータのやりとりをする。
用途:チャットやオンライン対戦など。
(Streaming : Streaming)
サーバ間通信とかで使う
ある処理がめっちゃ「メモリ使う、CPU使う」という時は、サーバを分けるとコスト削減できます。
(ユーザ数とかが多くない場合は不要)
そういう時に、サーバ間でデータの受け渡しが必要になります。
gRPCを使うメリットとして、
・プロトコルバッファを定義すれば、そのデータ形式(構造体的な感じ)でやりとりができる。
=> restAPIだとエンドポイントを定義してそこにリクエストを叩かないといけないので、単純に面倒。
・プロトコルバッファに型とキー情報があるのでドキュメント不要。
=> restAPIと違って、エンドポイントのドキュメントなくても分かりやすい。
・速い
プロトコルバッファ
データや通信方式を定義するIDLのこと。
・「レスポンスとリクエスト」のデータ形式
・ストリーミングを使うか
・パッケージ名
などを定義する。
クライアントとサーバの両方で持ってないといけない。
protcというコンパイラがあるので、今回はこのコンパイラを使ってGO言語ファイルを吐き出す。
環境構築
Docker環境
・フォルダ構成
以下のフォルダを想定しています。
./infrastructure/docker/go-grpc/Dockerfile
./infrastructure/docker/docker-compose.yml
・フォルダを作成します。
mkdir -p ./infrastructure/docker/go-grpc
cd ./infrastructure/docker
touch docker-compose.yml ./go-grpc/Dockerfile
・Dockerfile
FROM golang:1.13-stretch
SHELL ["/bin/bash", "-c"]
RUN apt update && apt-get install -y vim unzip
# install protc
WORKDIR /protoc
RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v3.11.2/protoc-3.11.2-linux-x86_64.zip
RUN unzip protoc-3.11.2-linux-x86_64.zip
RUN ln -s /protoc/bin/protoc /bin/protoc
# golang
WORKDIR /go-grpc
ENV GO111MODULE on
RUN go get -u github.com/golang/protobuf/protoc-gen-go
・docker-compose.yml
version: "3.7"
services:
go-grpc:
build:
context: ./go-grpc/
dockerfile: Dockerfile
container_name: "go-grpc"
volumes:
- ../../:/go-grpc
tty: true
privileged: true
・環境に入る
以下のコマンドでdockerコンテナの中に入ります!
cd ./infrastructure/docker
docker-compose up -d
docker-compose exec go-grpc bash
実装
上の4つの通信方式を全て動かしてみます。
必要なやつだけを読んでも大丈夫です。
Unary RPC
2つの数字を送ったら足算して返すサーバとクライアントを作成します。
protoの作成
以下dockerコンテナに入った状態で作業します。
コンパイラである「protoc」は、dockerコンテナにインストールしてあります。
「a,bをintで受け取ってメッセージを返す」を想定したprotoです。
mkdir ./proto
touch ./proto/calc.proto
・calc.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.simple";
option java_outer_classname = "SimpleProto";
// package名
package calc;
// 上の4つのパターンをここで指定する。
service Calc {
rpc Sum (SumRequest) returns (SumReply) {}
}
// リクエストで送る値。「1、2」はそのデータの番号。
message SumRequest {
int32 a = 1;
int32 b = 2;
}
// レスポンスで送る値
message SumReply {
string message = 1;
}
・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/calcフォルダに「calc.pb.go」が自動生成されます。
protoc --proto_path ./proto --go_out=plugins=grpc:./pb/calc calc.proto
「calc.pb.go」が生成されました。
goで実行
・以下のコマンドで、gomoduleを初期設定をします。
go mod init grpc-sample
「受け取る側のサーバー」と「送る側のクライアント」の2つの実行ファイルを作成します。
mkdir server client
touch ./server/main.go ./client/main.go
・./server/main.go
package main
import (
"context"
"fmt"
"log"
"net"
pb "grpc-sample/pb/calc"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
const port = ":50051"
// ServerUnary is server
type ServerUnary struct {
pb.UnimplementedCalcServer
}
// Sum 二つの値を受け取り、合計してクライアントへ返す
func (s *ServerUnary) Sum(ctx context.Context, in *pb.SumRequest) (*pb.SumReply, error) {
a := in.GetA()
b := in.GetB()
fmt.Println(a, b)
reply := fmt.Sprintf("%d + %d = %d", a, b, a+b)
return &pb.SumReply{
Message: reply,
}, nil
}
func set() error {
lis, err := net.Listen("tcp", port)
if err != nil {
return errors.Wrap(err, "ポート失敗")
}
s := grpc.NewServer()
var server ServerUnary
pb.RegisterCalcServer(s, &server)
if err := s.Serve(lis); err != nil {
return errors.Wrap(err, "サーバ起動失敗")
}
return nil
}
func main() {
fmt.Println("起動")
if err := set(); err != nil {
log.Fatalf("%v", err)
}
}
・./client/main.go
package main
import (
"context"
"log"
"time"
"github.com/pkg/errors"
pb "grpc-sample/pb/calc"
"google.golang.org/grpc"
)
func request(client pb.CalcClient, a, b int32) error {
ctx, cancel := context.WithTimeout(
context.Background(),
time.Second,
)
defer cancel()
sumRequest := pb.SumRequest{
A: a,
B: b,
}
reply, err := client.Sum(ctx, &sumRequest)
if err != nil {
return errors.Wrap(err, "受取り失敗")
}
log.Printf("サーバからの受け取り\n %s", reply.GetMessage())
return nil
}
func sum(a, b int32) error {
address := "localhost:50051"
conn, err := grpc.Dial(
address,
grpc.WithInsecure(),
grpc.WithBlock(),
)
if err != nil {
return errors.Wrap(err, "コネクションエラー")
}
defer conn.Close()
client := pb.NewCalcClient(conn)
return request(client, a, b)
}
func main() {
a := int32(300)
b := int32(500)
if err := sum(a, b); err != nil {
log.Fatalf("%v", err)
}
}
・サーバを実行します。
go run server/main.go
これでサーバが立ったので、クライアントからデータを送ります。
go run client/main.go
サーバ側の表示
上のコマンドを実行したので、
クライアントから、300と500という値を受け取りました。
クライアント側の表示
300, 500という数字を送ったので、800という数字がサーバから返ってきました。
1リクエスト1レスポンスのサーバとクライアンを動かすことができました。
これで、「めっちゃ負荷が高い処理をする機能」があった場合とかにサーバを分割できたりもする。
Server streaming RPC
サーバから値を受け取りづつけます。
protoの作成
以下dockerコンテナに入った状態で作業します。
mkdir ./proto
touch ./proto/notification.proto
・notification.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.notification";
option java_outer_classname = "NotificationProto";
package notification;
service Notification {
rpc Notification (NotificationRequest) returns (stream NotificationReply) {}
}
message NotificationRequest {
int32 num = 1;
}
message NotificationReply {
string message = 1;
}
・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/notificationフォルダに「notification.pb.go」が自動生成されます。
protoc --proto_path ./proto --go_out=plugins=grpc:./pb/notification notification.proto
「notification.pb.go」が生成されました。
・clinet
package main
import (
"context"
"io"
"log"
"github.com/pkg/errors"
pb "grpc-sample/pb/notification"
"google.golang.org/grpc"
)
func request(client pb.NotificationClient, num int32) error {
req := &pb.NotificationRequest{
Num: num,
}
stream, err := client.Notification(context.Background(), req)
if err != nil {
return errors.Wrap(err, "streamエラー")
}
for {
reply, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
log.Println("これ:", reply.GetMessage())
}
return nil
}
func exec(num int32) error {
address := "localhost:50051"
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return errors.Wrap(err, "コネクションエラー")
}
defer conn.Close()
client := pb.NewNotificationClient(conn)
return request(client, num)
}
func main() {
num := int32(5)
if err := exec(num); err != nil {
log.Println(err)
}
}
・server
package main
import (
"fmt"
"log"
"net"
"time"
pb "grpc-sample/pb/notification"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
const port = ":50051"
// ServerServerSide is server
type ServerServerSide struct {
pb.UnimplementedNotificationServer
}
// Notification is
func (s *ServerServerSide) Notification(req *pb.NotificationRequest, stream pb.Notification_NotificationServer) error {
fmt.Println("リクエスト受け取った")
for i := int32(0); i < req.GetNum(); i++ {
message := fmt.Sprintf("%d", i)
if err := stream.Send(&pb.NotificationReply{
Message: message,
}); err != nil {
return err
}
time.Sleep(time.Second * 1)
}
return nil
}
func set() error {
lis, err := net.Listen("tcp", port)
if err != nil {
return errors.Wrap(err, "ポート失敗")
}
s := grpc.NewServer()
var server ServerServerSide
pb.RegisterNotificationServer(s, &server)
if err := s.Serve(lis); err != nil {
return errors.Wrap(err, "サーバ起動失敗")
}
return nil
}
func main() {
fmt.Println("起動")
if err := set(); err != nil {
log.Fatalf("%v", err)
}
}
Client streaming RPC
protoの作成
以下dockerコンテナに入った状態で作業します。
mkdir ./proto
touch ./proto/upload.proto
・upload.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.upload";
option java_outer_classname = "UploadProto";
package upload;
service Upload {
rpc Upload (stream UploadRequest) returns (UploadReply) {}
}
message UploadRequest {
int32 value = 1;
}
message UploadReply {
string message = 1;
}
・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/uploadフォルダに「upload.pb.go」が自動生成されます。
protoc --proto_path ./proto --go_out=plugins=grpc:./pb/upload upload.proto
「upload.pb.go」が生成されました。
・clinet
package main
import (
"context"
"fmt"
"io"
"log"
"time"
"github.com/pkg/errors"
pb "grpc-sample/pb/upload"
"google.golang.org/grpc"
)
func request(client pb.UploadClient) error {
stream, err := client.Upload(context.Background())
if err != nil {
return err
}
values := []int32{1, 2, 3, 4, 5}
for _, value := range values {
fmt.Println("送る値:", value)
if err := stream.Send(&pb.UploadRequest{
Value: value,
}); err != nil {
if err == io.EOF {
break
}
return err
}
time.Sleep(time.Second * 1)
}
reply, err := stream.CloseAndRecv()
if err != nil {
return err
}
log.Printf("結果: %v", reply)
return nil
}
func exec() error {
address := "localhost:50051"
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return errors.Wrap(err, "コネクションエラー")
}
defer conn.Close()
client := pb.NewUploadClient(conn)
return request(client)
}
func main() {
if err := exec(); err != nil {
log.Println(err)
}
}
・server
package main
import (
"fmt"
"io"
"log"
"net"
pb "grpc-sample/pb/upload"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
const port = ":50051"
// ServerClientSide is servre
type ServerClientSide struct {
pb.UnimplementedUploadServer
}
// Upload 複数の送られてきた数字を合計する
func (s *ServerClientSide) Upload(stream pb.Upload_UploadServer) error {
var sum int32
for {
req, err := stream.Recv()
if err == io.EOF {
message := fmt.Sprintf("DONE: sum = %d", sum)
return stream.SendAndClose(&pb.UploadReply{
Message: message,
})
}
if err != nil {
return err
}
fmt.Println(req.GetValue())
sum += req.GetValue()
}
}
func set() error {
lis, err := net.Listen("tcp", port)
if err != nil {
return errors.Wrap(err, "ポート失敗")
}
s := grpc.NewServer()
var server ServerClientSide
pb.RegisterUploadServer(s, &server)
if err := s.Serve(lis); err != nil {
return errors.Wrap(err, "サーバ起動失敗")
}
return nil
}
func main() {
fmt.Println("起動")
if err := set(); err != nil {
log.Fatalf("%v", err)
}
}
Bidirectional streaming RPC
protoの作成
以下dockerコンテナに入った状態で作業します。
mkdir ./proto
touch ./proto/chat.proto
・chat.proto
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.chat";
option java_outer_classname = "ChatProto";
package chat;
service Chat {
rpc Chat (stream ChatRequest) returns (stream ChatReply) {}
}
message ChatRequest {
string message = 1;
}
message ChatReply {
string message = 1;
}
・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/chatフォルダに「chat.pb.go」が自動生成されます。
protoc --proto_path ./proto --go_out=plugins=grpc:./pb/chat chat.proto
「chat.pb.go」が生成されました。
・clinet
package main
import (
"context"
"io"
"log"
"time"
"github.com/pkg/errors"
pb "grpc-sample/pb/chat"
"google.golang.org/grpc"
)
func receive(stream pb.Chat_ChatClient) error {
waitc := make(chan struct{})
go func() {
for {
in, err := stream.Recv()
if err == io.EOF {
close(waitc)
return
}
if err != nil {
log.Fatalf("エラー: %v", err)
}
log.Printf("サーバから:%s", in.Message)
// お返し
stream.Send(&pb.ChatRequest{
Message: time.Now().Format("2006-01-02 15:04:05"),
})
}
}()
<-waitc
return nil
}
func request(stream pb.Chat_ChatClient) error {
return stream.Send(&pb.ChatRequest{
Message: "こんにちは",
})
}
func chat(client pb.ChatClient) error {
stream, err := client.Chat(context.Background())
if err != nil {
return err
}
if err := request(stream); err != nil {
return err
}
if err := receive(stream); err != nil {
return err
}
stream.CloseSend()
return nil
}
func exec() error {
address := "localhost:50051"
conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
if err != nil {
return errors.Wrap(err, "コネクションエラー")
}
defer conn.Close()
client := pb.NewChatClient(conn)
return chat(client)
}
func main() {
if err := exec(); err != nil {
log.Println(err)
}
}
・server
package main
import (
"fmt"
"io"
"log"
"net"
"time"
pb "grpc-sample/pb/chat"
"github.com/pkg/errors"
"google.golang.org/grpc"
)
const port = ":50051"
// ServerBidirectional is server
type ServerBidirectional struct {
pb.UnimplementedChatServer
}
func request(stream pb.Chat_ChatServer, message string) error {
reply := fmt.Sprintf("%sを受け取ったよ!ありがとう^^", message)
return stream.Send(&pb.ChatReply{
Message: reply,
})
}
// Chat クライアントから受け取った言葉に、言葉を返す
func (s *ServerBidirectional) Chat(stream pb.Chat_ChatServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
return err
}
message := in.GetMessage()
fmt.Println("受取:", message)
if err := request(stream, message); err != nil {
return err
}
time.Sleep(time.Second * 1)
}
}
func set() error {
lis, err := net.Listen("tcp", port)
if err != nil {
return errors.Wrap(err, "ポート失敗")
}
s := grpc.NewServer()
var server ServerBidirectional
pb.RegisterChatServer(s, &server)
if err := s.Serve(lis); err != nil {
return errors.Wrap(err, "サーバ起動失敗")
}
return nil
}
func main() {
fmt.Println("起動")
if err := set(); err != nil {
log.Fatalf("%v", err)
}
}
感想
4方式をやってみて感覚が掴めました。
エンドポイントの定義とかしなくて良いし、簡単にストリーミングも使えるしめっちゃ便利だと思った。
サーバ間通信やUnityやスマホアプリで使ってみたい。