3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

はじめてのgRPC

Posted at

業務で使う機会があり、ちゃんと理解したいなと思いやってみました。

gRPCとは?

Remote Procedure Call

別の場所にある・別のサーバにあるProcedureを呼び出すものです。
それがRPCでそのRPCをGoogleが使えるようにしたものがgRPCです。

gRPCのメリットとは?

gRPCを使うかというと以下のようなメリットがあります。

1. 高性能と効率性

  • Protocol Buffers(プロトコルバッファー)を使って通信をすることで高速に通信することができる
  • JSONやXMLより軽量で速い
  • HTTP/2を利用することでヘッダーの圧縮やストリームの多重化をすることができる

2. 言語とプラットフォームの独立

  • 多数のプログラミング言語で利用可能
  • なので、システム間の連携とかもしやすい

3. 強力な型付け

  • プロトコルバッファーを使うことで型をちゃんとつけることができる

4. ストリーミングサポート

  • 以下のようなストリーミング機能をサポートしている
    • クライアントストリーミング
    • サーバーストリーミング
    • 双方向ストリーミング

gRPCを構成するもの

  • HTTP/2
  • Protocol Buffers

HTTP/2

通信方式のことです。クライアントからサーバ側へリクエストを送り、レスポンスを受けとります。

Protocol Buffers

シリアライズ方式です。
プレーンなテキストではなく、独自のシリアライズ方式を使って通信します。

実際に導入してみる

今回はサーバストリーミングを使ってみます。
今回のユースケースは以下のようなケースとします。

ユースケース

  • リクエストされた時にログを出力する
  • 本来はレスポンスにするべきだが、動かしたいだけなので、ログで出力できる=レスポンスみたいな立ち位置にする
  • Dinnerデータを取得する
  • またDessertデータも取得する
    • ただし、このデータを取得するには別のサーバのAPIを叩く、みたいなシチュエーションとする
  • この二つのデータには関連性がなく、同時に処理もしていい
  • またデザートデータはランダムなランクをつけてログを出力する
  • /usage/sampleGrpcがAPIのIFとする
  • DBを使ったりしないので、通信してるというのはSleepさせて擬似的に表現する
  • エラーハンドリングも少なめ

上記方針で実際に作ってみます。
また、今回はEchoを使ってますが、あまりフレームワーク部分には触れないので、
説明は割愛すると思います。

今回の構成

今回の処理は以下のような構成になります。

- main.go -- APIのエンドポイント、今回はほとんど触れません
- service
    - http
        - handler.go -- ユースケースを呼ぶためのハンドラ
        - route.go -- ハンドラの設定とgRPCサーバの設定をする
    - usage
      - usecase
          - usage_usecase.go -- 今回のクライアント側のサーバ処理を担当
- grpc
    - usecase.go -- 今回のgRPCのサーバ処理を担当
    - dessert.proto -- プロトコル設定ファイル
- pkg
    - grpc
        - dessert.pb.go -- プロトコル設定ファイルから生成されたファイルでメッセージが定義
        - dessert_grpc.pb.go -- プロトコル設定ファイルから生成されたファイルでクライアントとサーバのIFが定義

まずはprotoファイルを作成

gRPCで通信するために、今回のdessertデータのやり取りを定義します。

grpc/dessert.proto
// バージョンの指定が必要、指定がなければデフォルトで2になる
syntax = "proto3";

// パッケージの宣言
package grpcApi;

// 自動生成した際に出力されるディレクトリを指定
option go_package = "pkg/grpc";

// リクエストの定義
message DessertRequest {
  // intやstringなどもあれば独自のものもある
  // データ型 フィールド名 = フィールド番号
  string name = 1;
  int32 id = 2;
}

// レスポンスの定義
message DessertResponse {
  string name = 1;
  string description = 2;
}

// サービスの定義
service DessertService {
  // stream型を指定することでストリーミングでデータを送受信できる
  rpc GetDessertStream(DessertRequest) returns (stream DessertResponse);
}

概ねコメントで補足していますが、上記のような形でどういったサービスなのか、どういうリクエスト/レスポンスを受け渡すのかを記述します。

protoファイルから関連コードを自動生成

上記ファイルをもとに関連コードを生成します。
ここではprotocコマンドを使うので、インストールできていない方はインストールしてください。

コマンドを使用しファイルを生成します。

protoc --go_out=. --go-grpc_out=. --proto_path=grpc ./grpc/dessert.proto

各オプションについては以下の通りです。

  • --go_out
    • メッセージの定義ファイルの出力先を指定
    • 今回はこのディレクトリを対象
  • --go-grpc_out
    • サーバのIFの定義ファイルの出力先を指定
    • 今回はこのディレクトリを対象
  • --proto_path
    • プロトコルファイルの存在するディレクトリを指定
    • 今回の対象である./grpc/dessert.protoを指定

生成されると以下のようなファイルが確認できます。

pkg/grpc/dessert.pb.go
// バージョンの指定が必要、指定がなければデフォルトで2になる

// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// 	protoc-gen-go v1.32.0
// 	protoc        v4.25.3
// source: dessert.proto

// パッケージの宣言

package grpc

import (
	protoreflect "google.golang.org/protobuf/reflect/protoreflect"
	protoimpl "google.golang.org/protobuf/runtime/protoimpl"
	reflect "reflect"
	sync "sync"
)

const (
	// Verify that this generated code is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
	// Verify that runtime/protoimpl is sufficiently up-to-date.
	_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)

// リクエストの定義
type DessertRequest struct {
	state         protoimpl.MessageState
	sizeCache     protoimpl.SizeCache
	unknownFields protoimpl.UnknownFields

	// intやstringなどもあれば独自のものもある
	// データ型 フィールド名 = フィールド番号
	Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
	Id   int32  `protobuf:"varint,2,opt,name=id,proto3" json:"id,omitempty"`
}

func (x *DessertRequest) Reset() {
	*x = DessertRequest{}
	if protoimpl.UnsafeEnabled {
		mi := &file_dessert_proto_msgTypes[0]
		ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
		ms.StoreMessageInfo(mi)
	}
}

func (x *DessertRequest) String() string {
	return protoimpl.X.MessageStringOf(x)
}
.
.
.
pkg/grpc/dessert_grpc.pb.go
// バージョンの指定が必要、指定がなければデフォルトで2になる

// Code generated by protoc-gen-go-grpc. DO NOT EDIT.
// versions:
// - protoc-gen-go-grpc v1.3.0
// - protoc             v4.25.3
// source: dessert.proto

// パッケージの宣言

package grpc

import (
	context "context"
	grpc "google.golang.org/grpc"
	codes "google.golang.org/grpc/codes"
	status "google.golang.org/grpc/status"
)

// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
// Requires gRPC-Go v1.32.0 or later.
const _ = grpc.SupportPackageIsVersion7

const (
	DessertService_GetDessertStream_FullMethodName = "/grpcApi.DessertService/GetDessertStream"
)

// DessertServiceClient is the client API for DessertService service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream.
type DessertServiceClient interface {
	// stream型を指定することでストリーミングでデータを送受信できる
	GetDessertStream(ctx context.Context, in *DessertRequest, opts ...grpc.CallOption) (DessertService_GetDessertStreamClient, error)
}

type dessertServiceClient struct {
	cc grpc.ClientConnInterface
}

func NewDessertServiceClient(cc grpc.ClientConnInterface) DessertServiceClient {
	return &dessertServiceClient{cc}
}

func (c *dessertServiceClient) GetDessertStream(ctx context.Context, in *DessertRequest, opts ...grpc.CallOption) (DessertService_GetDessertStreamClient, error) {
	stream, err := c.cc.NewStream(ctx, &DessertService_ServiceDesc.Streams[0], DessertService_GetDessertStream_FullMethodName, opts...)
	if err != nil {
		return nil, err
	}
	x := &dessertServiceGetDessertStreamClient{stream}
	if err := x.ClientStream.SendMsg(in); err != nil {
		return nil, err
	}
	if err := x.ClientStream.CloseSend(); err != nil {
		return nil, err
	}
	return x, nil
}
.
.
.

protoファイルをもとにコマンドで指定したようにファイルが生成されます。
ただコメントとかも受け継がれるみたいですね。
あと少し省略していますが、自分が定義していないmustEmbedUnimplementedDessertServiceServerとかもありますが、これはgRPC側が勝手に作ってるお作法みたいなものだと思っています。

次はそのサービスを使って別で動いてる想定のユースケースを実装します。

grpc/usecase.go
package grpc

import (
	"time"

	pb "project/pkg/grpc" // 自分のプロジェクトに変えて読んでください

	"google.golang.org/grpc"
)

type DessertStreamServer struct {
	pb.DessertServiceServer
}

// NewServer デザートを返却するgRPCサーバーを作成します。
func NewServer() *grpc.Server {
	s := grpc.NewServer()

	pb.RegisterDessertServiceServer(s, &DessertStreamServer{})
	return s
}

// GetDessertStream デザート情報をストリームで送信します。
func (s *DessertStreamServer) GetDessertStream(req *pb.DessertRequest, stream pb.DessertService_GetDessertStreamServer) error {
	desserts := []string{"チーズケーキ", "ティラミス", "マカロン", "エクレア", "カンノーリ", "パンナコッタ", "モンブラン", "クレープ", "シュークリーム", "フルーツタルト"}

	for _, dessertName := range desserts {
		time.Sleep(500 * time.Millisecond)

		err := stream.Send(&pb.DessertResponse{
			Description: "美味しい" + dessertName + "です",
			Name:        dessertName,
		})
		if err != nil {
			return err
		}
	}
	return nil
}

デザートは固定のデータを0.5秒ごとに返す設定としています。
gRPCに関連したところを何点かフォーカスします。

type DessertStreamServer struct {
	pb.DessertServiceServer
}

// NewServer デザートを返却するgRPCサーバーを作成します。
func NewServer() *grpc.Server {
	s := grpc.NewServer()

	pb.RegisterDessertServiceServer(s, &DessertStreamServer{})
	return s
}

こちらはこのgRPCを動かすためにサービスの設定を登録しています。
RegisterDessertServiceServerはgRPC側で自動生成されたコードを使っています。
Register + サービス名 + ServiceServerの構成で自動で作成されるようです。

// GetDessertStream デザート情報をストリームで送信します。
func (s *DessertStreamServer) GetDessertStream(req *pb.DessertRequest, stream pb.DessertService_GetDessertStreamServer) error {

ここではprotoファイルで定義したリクエスト/レスポンスを定義しています。DessertService_GetDessertStreamServerについてもgRPC側で自動生成された定義です。
こちらサンプルが悪いのですが、IDなどのリクエストパラメータは今回使用していません。

この後触れますが、ここは定義としてはerrorだけを返却しています。
処理の中でstreamを送るようにします。

		err := stream.Send(&pb.DessertResponse{
			Description: "美味しい" + dessertName + "です",
			Name:        dessertName,
		})

gRPCの処理では、stream.Sendを使うことで呼び出し元の関数にデータを送信することができます。この処理では、これがよばれるたび元の処理へデータを送り続けます。
通常の処理だと、この処理はループの終了時に全て送るということが多いと思いますが、ストリーム処理のおかげで都度データを送信することができます。

こちらでgRPCサーバ側の処理は完成です。

gRPCのサーバの起動設定

先ほど設定したサーバを動くようにします。
routeはmain.goでローカル環境を動かせるようにしていますが、そこは今回説明割愛します。

service/usage/http/route.go
package http

import (
	"fmt"
	"log"
	"net"
	"project/service/usage/usecase"

	dessertGrpc "project/grpc"

	"github.com/labstack/echo/v4"
)

func UsageRoutes(g *echo.Group, handler IUsageHandler) {
	g.GET("/sampleGrpc", handler.SampleGrpc)
}

func InitializeUsageRoutes(e *echo.Echo) {
	usageUsecase := usecase.NewUsageUsecase()
	usageHandler := NewUsageHandler(usageUsecase)

	usageGroup := e.Group("/usage")

	UsageRoutes(usageGroup, usageHandler)

	go startGrpcServer()
}

// gRPCサーバーを起動する関数
func startGrpcServer() {
	// gRPCサーバをlocalhost:10001で起動します
    lis, err := net.Listen("tcp", "localhost:10001")
	if err != nil {
		log.Fatalf("failed to listen: %v", err)
	}

	// grpc/usecase.goで定義したサーバを動かす処理を起動
    s := dessertGrpc.NewServer()

	fmt.Printf("server listening at %v", lis.Addr())
	if err := s.Serve(lis); err != nil {
		fmt.Printf("failed to serve: %v", err)
	}
}

startGrpcServerを呼び出すことで、grpcのサーバを起動させています。
今回はローカルで8080も起動してるので非同期でgrpcサーバは10001で動かしています。

service/usage/http/handler.go
package http

import (
	"net/http"

	"project/service/usage/usecase"

	"github.com/labstack/echo/v4"
)

type IUsageHandler interface {
	SampleGrpc(c echo.Context) error
}

type handler struct {
	uu usecase.IUsageUsecase
}

func NewUsageHandler(uu usecase.IUsageUsecase) IUsageHandler {
	return &handler{uu}
}

func (tc *handler) SampleGrpc(c echo.Context) error {
	// usecaseを呼び出す
	tc.uu.GetDessertStream()
	return c.JSON(http.StatusOK, "Success!")
}

ハンドラはこれから呼び出すためのユースケースを呼び出すだけです。
それではそのユースケースを実装します。

クライアント側のサーバ処理

usage/usecase/usage_usecase.go
package usecase

import (
	"context"
	"fmt"
	"io"
	"log"
	"math/rand"
	"sync"
	"time"

	pb "project/pkg/grpc"

	"google.golang.org/grpc"
)

type IUsageUsecase interface {
	GetDessertStream()
}

type usageUsecase struct{}

func NewUsageUsecase() IUsageUsecase {
	return &usageUsecase{}
}
func (uu *usageUsecase) GetDessertStream() {
	startTime := time.Now() // 関数実行開始時刻を記録

	fmt.Printf("---- Start GetDessertStream ----\n\n")
    port := "localhost:10001"
    conn, err := grpc.Dial(
    	port,
    	grpc.WithTransportCredentials(insecure.NewCredentials()),
    	grpc.WithBlock(),
    )
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewDessertServiceClient(conn)

	// 使用したいストリームを取得しておく
	stream, err := client.GetDessertStream(context.Background(), &pb.DessertRequest{
		Name: "アップルパイ", // このサンプルではリクエストの値は使わない
		Id:   1,
	})
	if err != nil {
		fmt.Printf("could not get dessert stream: %v", err)
	}

	// 非同期処理のための待機グループを作成
	var wg sync.WaitGroup

	// ディナーデータの処理(例:カレー)
	wg.Add(1)
	go func() {
		defer wg.Done()
		// ディナーデータの配列
		dinners := []string{"カレー", "スパゲッティ", "寿司", "ラーメン"}

		// ディナーデータの処理
		for _, dinner := range dinners {
			time.Sleep(1 * time.Second) // 時間待ちを1秒に変更
			fmt.Printf("晩御飯データ :: %s\n", dinner)
		}
	}()

	// デザートデータの受信
	wg.Add(1)
	go func() {
		defer wg.Done()
		for {
			dessert, err := stream.Recv()
			// ストリーム処理が終了した場合はエラーが「io.EOF」になる
			if err == io.EOF {
				fmt.Println("gRPCストリーム処理完了!")
				break
			}
			if err != nil {
				fmt.Printf("デザートの取得でエラー発生! : %v", err)
				return
			}
			// ランダムなランクを生成
			rank := rand.Intn(5) + 1
			fmt.Printf("gRPC通信で受け取ったデザート名: %s, 説明: %s, ランク: %d\n", dessert.Name, dessert.Description, rank)
		}
	}()

	// 待機グループの完了を待つ
	wg.Wait()

	defer fmt.Println("\n\n---- End GetDessertStream ----\n")

	endTime := time.Now()              // 関数実行終了時刻を記録
	duration := endTime.Sub(startTime) // 実行時間を計算
	fmt.Printf("実行時間: %v\n", duration)
}

解説していきます。

    port := "localhost:10001"
    conn, err := grpc.Dial(
    	port,
    	grpc.WithTransportCredentials(insecure.NewCredentials()),
    	grpc.WithBlock(),
    )
	if err != nil {
		log.Fatalf("did not connect: %v", err)
	}
	defer conn.Close()

	client := pb.NewDessertServiceClient(conn)

上述したgRPCサーバのコネクションを使って、gRPCのサービスのクライアントを受け取ります。
clientからgRPCサーバを使ってこのサービス(=デザート取得)を利用することができます。
またポート以外に渡しているオプションは以下の通りです。

  • grpc.WithTransportCredentials(insecure.NewCredentials())
    • コネクションでSSL/TLSを使用しない
  • grpc.WithBlock()
    • コネクションが確立されるまで待機する(同期処理をする)
	// 非同期処理のための待機グループを作成
	var wg sync.WaitGroup

今回はgRPCのストリーム処理を活かすために、非同期処理とします。
ディナーデータとデザートデータを非同期で取得して処理します。

		dinners := []string{"カレー", "スパゲッティ", "寿司", "ラーメン"}

		// ディナーデータの処理
		for _, dinner := range dinners {
			time.Sleep(1 * time.Second) // 時間待ちを1秒に変更
			fmt.Printf("晩御飯データ :: %s\n", dinner)
		}

ディナーデータを取得しています。
実際にDB処理などを想定して1秒Sleepしています。

	go func() {
		defer wg.Done()
		for {
			dessert, err := stream.Recv()
			// ストリーム処理が終了した場合はエラーが「io.EOF」になる
			if err == io.EOF {
				fmt.Println("gRPCストリーム処理完了!")
				break
			}
			if err != nil {
				fmt.Printf("デザートの取得でエラー発生! : %v", err)
				return
			}
			// ランダムなランクを生成
			rank := rand.Intn(5) + 1
			fmt.Printf("gRPC通信で受け取ったデザート名: %s, 説明: %s, ランク: %d\n", dessert.Name, dessert.Description, rank)
		}
	}()

ここが今回のメインのストリーム処理です。
デザートデータをストリーミングで取得しながらランクをつけてレスポンスを表示しています。

			dessert, err := stream.Recv()

ここがストリーミング処理を取得している箇所になります。
gRPCの処理側でSend()されたデータをここで受け取ることができます。
つまりこの例だとデザートのデータを10回受け取ることになると思います。

本来の関数だと成形されたデータを1度に受け取って処理をするため、
gRPCのストリーミング処理にすることでデータを待つことなくオーバーヘッドなしで
処理を進めることができるということです。

実行結果です。

$ go run main.go
server listening at 127.0.0.1:10001
   ____    __
  / __/___/ /  ___
 / _// __/ _ \/ _ \
/___/\__/_//_/\___/ v4.10.2
High performance, minimalist Go web framework
https://echo.labstack.com
____________________________________O/_______
                                    O\
⇨ http server started on [::]:8080

まずgRPCのサーバの10001が動いています。
次にクライアント側のサーバである8080も起動しています。
ではリクエストしてみます。その結果のログです。

2024/03/08 07:19:02 ---- Start GetDessertStream ----

2024/03/08 07:19:02 gRPC通信で受け取ったデザート名: チーズケーキ, 説明: 美味しいチーズケーキです, ランク: 2
2024/03/08 07:19:03 晩御飯データ :: カレー
2024/03/08 07:19:03 gRPC通信で受け取ったデザート名: ティラミス, 説明: 美味しいティラミスです, ランク: 3
2024/03/08 07:19:03 gRPC通信で受け取ったデザート名: マカロン, 説明: 美味しいマカロンです, ランク: 2
2024/03/08 07:19:04 晩御飯データ :: スパゲッティ
2024/03/08 07:19:04 gRPC通信で受け取ったデザート名: エクレア, 説明: 美味しいエクレアです, ランク: 2
2024/03/08 07:19:04 gRPC通信で受け取ったデザート名: カンノーリ, 説明: 美味しいカンノーリです, ランク: 2
2024/03/08 07:19:05 晩御飯データ :: 寿司
2024/03/08 07:19:05 gRPC通信で受け取ったデザート名: パンナコッタ, 説明: 美味しいパンナコッタです, ランク: 5
2024/03/08 07:19:05 gRPC通信で受け取ったデザート名: モンブラン, 説明: 美味しいモンブランです, ランク: 1
2024/03/08 07:19:06 晩御飯データ :: ラーメン
2024/03/08 07:19:06 gRPC通信で受け取ったデザート名: クレープ, 説明: 美味しいクレープです, ランク: 1
2024/03/08 07:19:06 gRPC通信で受け取ったデザート名: シュークリーム, 説明: 美味しいシュークリームです, ランク: 4
2024/03/08 07:19:07 gRPC通信で受け取ったデザート名: フルーツタルト, 説明: 美味しいフルーツタルトです, ランク: 3
2024/03/08 07:19:07 gRPCストリーム処理完了!
2024/03/08 07:19:07 実行時間: 5.011850709s
2024/03/08 07:19:07

---- End GetDessertStream ----

想定通りデータを取得し表示できました。
上記のログのようにストリーミングで取得しながら並列で処理を行うことができました。

終わりに

他の方が書いてる記事なんですが、すごくわかりやすく学ぶことができました。
また今回使わなかった他のストリーミングについてももっと勉強したいです。

3
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?