78
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

Organization

goでgRPCの4つの通信方式やってみた(Dockerのサンプルあり)

スクリーンショット 2020-01-25 12.46.05.png

goでgRPCを実装してみました。

gRPCの4つの通信方式を実際にやってみます。
・Unary RPCs(SimpleRPC)
・Server streaming RPC
・Client streaming RPC
・Bidirectional streaming RPC

この記事内容を上から行えば、Dockerで動く環境が作れます。
Dockerでこの4つの実装してる記事は少ないはず。

gRPCとは

googleの「RPCのフレームワーク」です。
RPCは、アプリケーション間のデータ通信するためのプログラムのことです。
gRPCでは、以下を提供します。
HTTP/2、10言語の公式サポートライブラリ、プロトコルバッファ(構造体みたいなもの)を定義することにより言語を問わずにデータ通信が可能、ヘルスチェック、ロードバランシングなどをサポート。

一言で言うと、簡単にアプリケーション間のデータ通信が高速にできるやつ。

webからの呼び出し

以下のライブラリで可能。
grpc-gateway
( grpc-gateway使わなかったら、jsで自前でプロトコルバッファをコンパイルしないといけないはず)
ただ、双方向ストリーミングは、対応してません。(2020年4月)

gRPCの4つの通信方式

StreamingとUnaryの組み合わせのため4つあります。
以下用途と共に説明します。
公式を参考

・Unary RPC

一つのデータを受け取って、一つのデータを返す。
動きはよくあるrestAPIと同じ。
用途:サーバ間通信、API、アプリとサーバのデータ送受信。
(Unary : Unary)
one.png

・Server streaming RPC

サーバがクライアントに複数のリクエストを送る。
用途:サーバから任意のタイミングでクライアントに通知させたい時など。
(Unary : Streaming)
three.jpg

・Client streaming RPC

クライアントがサーバに複数のリクエストを送る。
用途:データアップロードや、クライアントから多くのデータを送る場合。
(Streaming : Unary)
two.png

・Bidirectional streaming RPC

双方向でデータのやりとりをする。
用途:チャットやオンライン対戦など。
(Streaming : Streaming)
four.jpg

サーバ間通信とかで使う

ある処理がめっちゃ「メモリ使う、CPU使う」という時は、サーバを分けるとコスト削減できます。
(ユーザ数とかが多くない場合は不要)
そういう時に、サーバ間でデータの受け渡しが必要になります。

gRPCを使うメリットとして、
・プロトコルバッファを定義すれば、そのデータ形式(構造体的な感じ)でやりとりができる。
=> restAPIだとエンドポイントを定義してそこにリクエストを叩かないといけないので、単純に面倒。
・プロトコルバッファに型とキー情報があるのでドキュメント不要。
=> restAPIと違って、エンドポイントのドキュメントなくても分かりやすい。
・速い

プロトコルバッファ

データや通信方式を定義するIDLのこと。
・「レスポンスとリクエスト」のデータ形式
・ストリーミングを使うか
・パッケージ名
などを定義する。
クライアントとサーバの両方で持ってないといけない。
protcというコンパイラがあるので、今回はこのコンパイラを使ってGO言語ファイルを吐き出す。

環境構築

Docker環境

・フォルダ構成
以下のフォルダを想定しています。

./infrastructure/docker/go-grpc/Dockerfile
./infrastructure/docker/docker-compose.yml

・フォルダを作成します。

mkdir -p ./infrastructure/docker/go-grpc
cd ./infrastructure/docker
touch docker-compose.yml ./go-grpc/Dockerfile

作成されました。
スクリーンショット 2020-01-25 12.01.00.png

・Dockerfile

FROM golang:1.13-stretch

SHELL ["/bin/bash", "-c"]
RUN apt update && apt-get install -y vim unzip

# install protc
WORKDIR /protoc
RUN wget https://github.com/protocolbuffers/protobuf/releases/download/v3.11.2/protoc-3.11.2-linux-x86_64.zip
RUN unzip protoc-3.11.2-linux-x86_64.zip
RUN ln -s /protoc/bin/protoc /bin/protoc

# golang
WORKDIR /go-grpc
ENV GO111MODULE on
RUN go get -u github.com/golang/protobuf/protoc-gen-go

・docker-compose.yml

version: "3.7"
services:
  go-grpc:
    build:
      context: ./go-grpc/
      dockerfile: Dockerfile
    container_name: "go-grpc"
    volumes:
      - ../../:/go-grpc
    tty: true
    privileged: true

・環境に入る
以下のコマンドでdockerコンテナの中に入ります!

cd ./infrastructure/docker
docker-compose up -d
docker-compose exec go-grpc bash

スクリーンショット 2020-01-26 21.11.40.png

実装

上の4つの通信方式を全て動かしてみます。
必要なやつだけを読んでも大丈夫です。

Unary RPC

2つの数字を送ったら足算して返すサーバとクライアントを作成します。

one.png

protoの作成

以下dockerコンテナに入った状態で作業します。
コンパイラである「protoc」は、dockerコンテナにインストールしてあります。

「a,bをintで受け取ってメッセージを返す」を想定したprotoです。

mkdir ./proto
touch ./proto/calc.proto

・calc.proto


syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.simple";
option java_outer_classname = "SimpleProto";

// package名
package calc;

// 上の4つのパターンをここで指定する。
service Calc {
  rpc Sum (SumRequest) returns (SumReply) {}
}

// リクエストで送る値。「1、2」はそのデータの番号。
message SumRequest {
  int32 a = 1;
  int32 b = 2;
}

// レスポンスで送る値
message SumReply {
  string message = 1;
}

・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/calcフォルダに「calc.pb.go」が自動生成されます。

protoc --proto_path ./proto --go_out=plugins=grpc:./pb/calc calc.proto

「calc.pb.go」が生成されました。

スクリーンショット 2020-01-25 12.00.10.png

goで実行

・以下のコマンドで、gomoduleを初期設定をします。

go mod init grpc-sample

以下のファイルが作成されました。
スクリーンショット 2020-01-25 11.58.16.png

「受け取る側のサーバー」と「送る側のクライアント」の2つの実行ファイルを作成します。

mkdir server client
touch ./server/main.go ./client/main.go

・./server/main.go

package main

import (
    "context"
    "fmt"
    "log"
    "net"

    pb "grpc-sample/pb/calc"

    "github.com/pkg/errors"
    "google.golang.org/grpc"
)

const port = ":50051"

// ServerUnary is server
type ServerUnary struct {
    pb.UnimplementedCalcServer
}

// Sum 二つの値を受け取り、合計してクライアントへ返す
func (s *ServerUnary) Sum(ctx context.Context, in *pb.SumRequest) (*pb.SumReply, error) {
    a := in.GetA()
    b := in.GetB()
    fmt.Println(a, b)
    reply := fmt.Sprintf("%d + %d = %d", a, b, a+b)
    return &pb.SumReply{
        Message: reply,
    }, nil
}

func set() error {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        return errors.Wrap(err, "ポート失敗")
    }
    s := grpc.NewServer()
    var server ServerUnary
    pb.RegisterCalcServer(s, &server)
    if err := s.Serve(lis); err != nil {
        return errors.Wrap(err, "サーバ起動失敗")
    }
    return nil
}

func main() {
    fmt.Println("起動")
    if err := set(); err != nil {
        log.Fatalf("%v", err)
    }
}

・./client/main.go

package main

import (
    "context"
    "log"
    "time"

    "github.com/pkg/errors"

    pb "grpc-sample/pb/calc"

    "google.golang.org/grpc"
)

func request(client pb.CalcClient, a, b int32) error {
    ctx, cancel := context.WithTimeout(
        context.Background(),
        time.Second,
    )
    defer cancel()
    sumRequest := pb.SumRequest{
        A: a,
        B: b,
    }
    reply, err := client.Sum(ctx, &sumRequest)
    if err != nil {
        return errors.Wrap(err, "受取り失敗")
    }
    log.Printf("サーバからの受け取り\n %s", reply.GetMessage())
    return nil
}

func sum(a, b int32) error {
    address := "localhost:50051"
    conn, err := grpc.Dial(
        address,
        grpc.WithInsecure(),
        grpc.WithBlock(),
    )
    if err != nil {
        return errors.Wrap(err, "コネクションエラー")
    }
    defer conn.Close()
    client := pb.NewCalcClient(conn)
    return request(client, a, b)
}

func main() {
    a := int32(300)
    b := int32(500)
    if err := sum(a, b); err != nil {
        log.Fatalf("%v", err)
    }
}

・サーバを実行します。

go run server/main.go

これでサーバが立ったので、クライアントからデータを送ります。

go run client/main.go

サーバ側の表示
上のコマンドを実行したので、
クライアントから、300と500という値を受け取りました。
スクリーンショット 2020-01-25 12.05.38.png

クライアント側の表示
300, 500という数字を送ったので、800という数字がサーバから返ってきました。
スクリーンショット 2020-01-25 12.05.24.png

1リクエスト1レスポンスのサーバとクライアンを動かすことができました。
これで、「めっちゃ負荷が高い処理をする機能」があった場合とかにサーバを分割できたりもする。

Server streaming RPC

サーバから値を受け取りづつけます。

three.jpg

protoの作成

以下dockerコンテナに入った状態で作業します。

mkdir ./proto
touch ./proto/notification.proto

・notification.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.notification";
option java_outer_classname = "NotificationProto";

package notification;

service Notification {
  rpc Notification (NotificationRequest) returns (stream NotificationReply) {}
}

message NotificationRequest {
  int32 num = 1;
}

message NotificationReply {
  string message = 1;
}

・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/notificationフォルダに「notification.pb.go」が自動生成されます。

protoc --proto_path ./proto --go_out=plugins=grpc:./pb/notification notification.proto

「notification.pb.go」が生成されました。

・clinet

package main

import (
    "context"
    "io"
    "log"

    "github.com/pkg/errors"

    pb "grpc-sample/pb/notification"

    "google.golang.org/grpc"
)

func request(client pb.NotificationClient, num int32) error {
    req := &pb.NotificationRequest{
        Num: num,
    }
    stream, err := client.Notification(context.Background(), req)
    if err != nil {
        return errors.Wrap(err, "streamエラー")
    }
    for {
        reply, err := stream.Recv()
        if err == io.EOF {
            break
        }
        if err != nil {
            return err
        }
        log.Println("これ:", reply.GetMessage())
    }
    return nil
}

func exec(num int32) error {
    address := "localhost:50051"
    conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
    if err != nil {
        return errors.Wrap(err, "コネクションエラー")
    }
    defer conn.Close()
    client := pb.NewNotificationClient(conn)
    return request(client, num)
}

func main() {
    num := int32(5)
    if err := exec(num); err != nil {
        log.Println(err)
    }
}

・server

package main

import (
    "fmt"
    "log"
    "net"
    "time"

    pb "grpc-sample/pb/notification"

    "github.com/pkg/errors"
    "google.golang.org/grpc"
)

const port = ":50051"

// ServerServerSide is server
type ServerServerSide struct {
    pb.UnimplementedNotificationServer
}

// Notification is
func (s *ServerServerSide) Notification(req *pb.NotificationRequest, stream pb.Notification_NotificationServer) error {
    fmt.Println("リクエスト受け取った")
    for i := int32(0); i < req.GetNum(); i++ {
        message := fmt.Sprintf("%d", i)
        if err := stream.Send(&pb.NotificationReply{
            Message: message,
        }); err != nil {
            return err
        }
        time.Sleep(time.Second * 1)
    }
    return nil
}

func set() error {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        return errors.Wrap(err, "ポート失敗")
    }
    s := grpc.NewServer()
    var server ServerServerSide
    pb.RegisterNotificationServer(s, &server)
    if err := s.Serve(lis); err != nil {
        return errors.Wrap(err, "サーバ起動失敗")
    }
    return nil
}

func main() {
    fmt.Println("起動")
    if err := set(); err != nil {
        log.Fatalf("%v", err)
    }
}

Client streaming RPC

クライアントから値を受け取りづつけます。
two.png

protoの作成

以下dockerコンテナに入った状態で作業します。

mkdir ./proto
touch ./proto/upload.proto

・upload.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.upload";
option java_outer_classname = "UploadProto";

package upload;

service Upload {
  rpc Upload (stream UploadRequest) returns (UploadReply) {}
}

message UploadRequest {
  int32 value = 1;
}

message UploadReply {
  string message = 1;
}

・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/uploadフォルダに「upload.pb.go」が自動生成されます。

protoc --proto_path ./proto --go_out=plugins=grpc:./pb/upload upload.proto

「upload.pb.go」が生成されました。

・clinet

package main

import (
    "context"
    "fmt"
    "io"
    "log"
    "time"

    "github.com/pkg/errors"

    pb "grpc-sample/pb/upload"

    "google.golang.org/grpc"
)

func request(client pb.UploadClient) error {
    stream, err := client.Upload(context.Background())
    if err != nil {
        return err
    }
    values := []int32{1, 2, 3, 4, 5}
    for _, value := range values {
        fmt.Println("送る値:", value)
        if err := stream.Send(&pb.UploadRequest{
            Value: value,
        }); err != nil {
            if err == io.EOF {
                break
            }
            return err
        }
        time.Sleep(time.Second * 1)
    }
    reply, err := stream.CloseAndRecv()
    if err != nil {
        return err
    }
    log.Printf("結果: %v", reply)
    return nil
}

func exec() error {
    address := "localhost:50051"
    conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
    if err != nil {
        return errors.Wrap(err, "コネクションエラー")
    }
    defer conn.Close()
    client := pb.NewUploadClient(conn)
    return request(client)
}

func main() {
    if err := exec(); err != nil {
        log.Println(err)
    }
}

・server

package main

import (
    "fmt"
    "io"
    "log"
    "net"

    pb "grpc-sample/pb/upload"

    "github.com/pkg/errors"
    "google.golang.org/grpc"
)

const port = ":50051"

// ServerClientSide is servre
type ServerClientSide struct {
    pb.UnimplementedUploadServer
}

// Upload 複数の送られてきた数字を合計する
func (s *ServerClientSide) Upload(stream pb.Upload_UploadServer) error {
    var sum int32
    for {
        req, err := stream.Recv()
        if err == io.EOF {
            message := fmt.Sprintf("DONE: sum = %d", sum)
            return stream.SendAndClose(&pb.UploadReply{
                Message: message,
            })
        }
        if err != nil {
            return err
        }
        fmt.Println(req.GetValue())
        sum += req.GetValue()
    }
}

func set() error {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        return errors.Wrap(err, "ポート失敗")
    }
    s := grpc.NewServer()
    var server ServerClientSide
    pb.RegisterUploadServer(s, &server)
    if err := s.Serve(lis); err != nil {
        return errors.Wrap(err, "サーバ起動失敗")
    }
    return nil
}

func main() {
    fmt.Println("起動")
    if err := set(); err != nil {
        log.Fatalf("%v", err)
    }
}

Bidirectional streaming RPC

双方で、値を受け取りづつけます。
four.jpg

protoの作成

以下dockerコンテナに入った状態で作業します。

mkdir ./proto
touch ./proto/chat.proto

・chat.proto

syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.chat";
option java_outer_classname = "ChatProto";

package chat;

service Chat {
  rpc Chat (stream ChatRequest) returns (stream ChatReply) {}
}

message ChatRequest {
  string message = 1;
}

message ChatReply {
  string message = 1;
}

・上の「.protoファイル」を、goファイルにコンパイルします。
以下のコマンドで、./pb/chatフォルダに「chat.pb.go」が自動生成されます。

protoc --proto_path ./proto --go_out=plugins=grpc:./pb/chat chat.proto

「chat.pb.go」が生成されました。

・clinet

package main

import (
    "context"
    "io"
    "log"
    "time"

    "github.com/pkg/errors"

    pb "grpc-sample/pb/chat"

    "google.golang.org/grpc"
)

func receive(stream pb.Chat_ChatClient) error {
    waitc := make(chan struct{})
    go func() {
        for {
            in, err := stream.Recv()
            if err == io.EOF {
                close(waitc)
                return
            }
            if err != nil {
                log.Fatalf("エラー: %v", err)
            }
            log.Printf("サーバから:%s", in.Message)

            // お返し
            stream.Send(&pb.ChatRequest{
                Message: time.Now().Format("2006-01-02 15:04:05"),
            })
        }
    }()
    <-waitc
    return nil
}

func request(stream pb.Chat_ChatClient) error {
    return stream.Send(&pb.ChatRequest{
        Message: "こんにちは",
    })
}

func chat(client pb.ChatClient) error {
    stream, err := client.Chat(context.Background())
    if err != nil {
        return err
    }
    if err := request(stream); err != nil {
        return err
    }
    if err := receive(stream); err != nil {
        return err
    }
    stream.CloseSend()
    return nil
}

func exec() error {
    address := "localhost:50051"
    conn, err := grpc.Dial(address, grpc.WithInsecure(), grpc.WithBlock())
    if err != nil {
        return errors.Wrap(err, "コネクションエラー")
    }
    defer conn.Close()
    client := pb.NewChatClient(conn)
    return chat(client)
}

func main() {
    if err := exec(); err != nil {
        log.Println(err)
    }
}

・server

package main

import (
    "fmt"
    "io"
    "log"
    "net"
    "time"

    pb "grpc-sample/pb/chat"

    "github.com/pkg/errors"
    "google.golang.org/grpc"
)

const port = ":50051"

// ServerBidirectional is server
type ServerBidirectional struct {
    pb.UnimplementedChatServer
}

func request(stream pb.Chat_ChatServer, message string) error {
    reply := fmt.Sprintf("%sを受け取ったよ!ありがとう^^", message)
    return stream.Send(&pb.ChatReply{
        Message: reply,
    })
}

// Chat クライアントから受け取った言葉に、言葉を返す
func (s *ServerBidirectional) Chat(stream pb.Chat_ChatServer) error {
    for {
        in, err := stream.Recv()
        if err == io.EOF {
            return nil
        }
        if err != nil {
            return err
        }
        message := in.GetMessage()
        fmt.Println("受取:", message)

        if err := request(stream, message); err != nil {
            return err
        }
        time.Sleep(time.Second * 1)
    }
}

func set() error {
    lis, err := net.Listen("tcp", port)
    if err != nil {
        return errors.Wrap(err, "ポート失敗")
    }
    s := grpc.NewServer()
    var server ServerBidirectional
    pb.RegisterChatServer(s, &server)
    if err := s.Serve(lis); err != nil {
        return errors.Wrap(err, "サーバ起動失敗")
    }
    return nil
}

func main() {
    fmt.Println("起動")
    if err := set(); err != nil {
        log.Fatalf("%v", err)
    }
}

感想

4方式をやってみて感覚が掴めました。
エンドポイントの定義とかしなくて良いし、簡単にストリーミングも使えるしめっちゃ便利だと思った。
サーバ間通信やUnityやスマホアプリで使ってみたい。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Sign upLogin
78
Help us understand the problem. What are the problem?