0
Help us understand the problem. What are the problem?

posted at

gRPCのstream方式でファイルアップロード処理を実装する

はじめに

マイクロサービス方式の開発で選定されることが多いgRPCを用いてクライアントからのファイルアップロード処理を実装してみました。

gRPCの通信方式

  • Unary RPCs
    シンプルな通信方式。1つのrequestに対して1つのresponseが返ってくる方式になります。gRPCの通信方式の中で一番REST方式に近いものになると思います。
  • Server streaming RPCs
    クライアントからの1つのrequestに対して、サーバーから複数のresponseを返すことができる通信方式になります。サーバーからクライアントに向けてサイズの大きいファイルなどを送信したい場合に使うことが多いと思います。
  • Client streaming RPCs
    クライアントからの複数のrequestに対して、サーバーから1つのresponseを返す通信方式になります。クライアントからサーバーに向けてサイズの大きいファイルなどをアップロードする際に使うことが多いと思います。
  • Bidirectional streaming RPCs
    クライアントからの複数のrequestに対して、サーバーから複数のresposeを返すことができる通信方式になります。私自身、こちらの通信方式を用いての実装を未だ行ったことがありません。今後ユースケースを探っていきたいと思います。

実装方針

今回はクライアントからPDFファイルやテキストファイルをサーバーに送信する形式で実装を進めたいと思います。
ファイルアップロード処理の大まかな流れは以下のようなイメージになります。

  • gRPCのstream方式を用いてクライアント側からアップロードするファイルをバイナリデータに変換したうえで送信する。
  • クライアント側から送信されてきたバイナリデータをサーバー側で復元し、ファイルを作成する。

クライアントからアップロードされるファイルのサイズが大きなものになるケースを想定して、 今回は、Client streaming RPCs 方式で実装を進めます。

protocol buffersの定義

Client streaming RPCs を使用する場合には、protoのrpc関数定義箇所にて、以下のように stream という設定を追加する必要があります。
クライアントから送信するrequestのフィールド定義には、byte型を指定してupload_fileを追加し、バイナリ形式のデータを送信できる様にします。

sample.proto
service ArchiveService {
  rpc Update(stream UploadArchiveRequest) returns (UploadArchiveResponse) {}
  rpc Create(stream UploadArchiveRequest) returns (UploadArchiveResponse) {}
}

message UploadArchiveRequest {
  Archive archive = 1;
  bytes upload_file = 2;
}

message UploadArchiveResponse {
  Archive archive = 1;
}

client側の実装

先ほどprotocol buffersにてstrema方式で Create rpc関数を定義したと思います。
stream方式で定義されたrpc関数を呼び出すとサーバーとクライアント間で通信のコネクションが発生します。
クラインと側で stream.Send(&genproto.UploadArchiveRequest{UploadFile:convertFileToBinary}) を1回呼び出すと、サーバー側に1つのリクエストが送信されます。
stream.Sendを複数回実行すると1回のコネクションでクライアントから複数のrequestを送信することができます。
この特徴を活かして、容量の大きなデータをいくつかに分割し、複数回のrequestとして送信するということが実現可能となります。

archive_service_test.go
package services

import (
	"backend-go/genproto"
	"context"
	"io"
	"log"
	"os"
	"testing"

	_ "github.com/go-sql-driver/mysql"
	"github.com/stretchr/testify/assert"
	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
)

func TestArchiveServiceCreate(t *testing.T) {
	if err := fixtures.Load(); err != nil {
		log.Fatalf("failed load fixtures: %v", err)
	}
	ctx := context.Background()
	conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithTransportCredentials(insecure.NewCredentials()))
	if err != nil {
		t.Fatalf("failed connect grpc: %v", err)
	}
	defer conn.Close()

	fileName := "dummyFile.xls"
	client := genproto.NewArchiveServiceClient(conn)
	stream, err := client.Create(ctx)

	if err != nil {
		t.Fatalf("failed start to connecting stream : %v", err)
	}

	stream.Send(&genproto.UploadArchiveRequest{
		Archive: &genproto.Archive{
			OperatorId:       "1",
			OperatorCoId:     1,
			FileType:         genproto.Archive_ARCHIVE_FILE_TYPE_INTERNAL,
			FileName:         fileName,
			ServiceNo:        false,
			FileTitle:        "dummyFileTitle",
			FileVersion:      1.0,
			FileExplanation:  "dummyFileExplanation",
			FileConfidential: true,
			FileDeleteFlg:    false,
		},
	})
	file, _ := os.Open("../testdata/" + fileName)
	defer file.Close()

	for {
		convertFileToBinary, err := convertFileToBinary(t, file)
		if err == io.EOF {
			break
		}
		if err != nil {
			t.Fatalf("failed upload file: %v", err)
		}

		stream.Send(&genproto.UploadArchiveRequest{
			UploadFile: convertFileToBinary,
		})
	}

	create, err := stream.CloseAndRecv()
	if err != nil {
		t.Fatalf("failed upload file: %v", err)
	}

	assert.Nil(t, err)
	assert.Equal(t, "dummyCoName1", create.Archive.GetOperatorCoName())
	assert.Equal(t, genproto.Archive_ArchiveFileType(genproto.Archive_ARCHIVE_FILE_TYPE_INTERNAL), create.Archive.GetFileType())
	assert.Equal(t, fileName, create.Archive.GetFileName())
	assert.Equal(t, false, create.Archive.GetServiceNo())
	assert.Equal(t, "dummyFileTitle", create.Archive.GetFileTitle())
	assert.Equal(t, ".xls", create.Archive.GetFileExtension())
	assert.Equal(t, 1.0, create.Archive.GetFileVersion())
	assert.Equal(t, "dummyFileExplanation", create.Archive.GetFileExplanation())
	assert.Equal(t, true, create.Archive.GetFileConfidential())
	assert.Equal(t, false, create.Archive.GetFileDeleteFlg())
}

func convertFileToBinary(t *testing.T, file *os.File) (fileBinary []byte, err error) {
	uploadFileBinary := make([]byte, 1024)
	count, err := file.Read(uploadFileBinary)

	t.Log("file successfully loaded", count)

	if err != nil {
		return nil, err
	}

	return uploadFileBinary, nil
}

server側の実装

サーバー側でfor文を回して複数のリクエストを受け取る処理を書かなければいけないので、サーバー側の実装については少々面倒な印象を持ちました。
複数のリクエストを受け取る処理に関しては、Server Streaming RPCS方式を用いてCSV出力処理を実装した時と同様に、ヘルパー関数を定義して、抽象化したいところではあります。

archive_service.go
package services

import (
	"backend-go/ent"
	"backend-go/genproto"
	"context"
	"io"
)

// ArchiveService implements ArchiveServiceServer
type ArchiveService struct {
	client *ent.Client
	genproto.UnimplementedArchiveServiceServer
}

// NewArchiveService returns a new ArchiveService
func NewArchiveService(client *ent.Client) *ArchiveService {
	return &ArchiveService{
		client: client,
	}
}

// Create implements ArchiveServiceServer.Create
func (svc *ArchiveService) Create(stream genproto.ArchiveService_CreateServer) error {
	var (
		archive *genproto.Archive
	)
	for {
		res, err := stream.Recv()

		if err == io.EOF {
			break
		}

		if err != nil {
			return err
		}

		if res.Archive != nil {
			archive = res.GetArchive()
		}
	}
	// TODO: implement logic of Uploading file to s3

	ext := getFileExtension(archive.FileName)

	m := svc.client.ArchiveFile.Create()
	m.SetOperatorCoID(uint32(1))
	m.SetArchiveFileType(int32(archive.FileType))
	m.SetArchiveFileName(archive.FileName)
	m.SetServiceNo(archive.ServiceNo)
	m.SetArchiveFileTitle(archive.FileTitle)
	m.SetArchiveFileExtension(ext)
	m.SetArchiveFileVersion(archive.FileVersion)
	m.SetArchiveFileExplanation(archive.FileExplanation)
	m.SetArchiveFileConfidential(archive.FileConfidential)
	m.SetArchiveFileDeleteFlg(archive.FileDeleteFlg)
	res, err := m.Save(stream.Context())
	if err != nil {
		return err
	}
	ocm, err := res.QueryOrganization().Only(stream.Context())
	if err != nil {
		return err
	}

	err = stream.SendAndClose(&genproto.UploadArchiveResponse{
		Archive: toProtoArchive(res, ocm),
	})

	if err != nil {
		return err
	}

	return nil
}

終わりに

今回は、ファイルアップロード処理をgRPCのClient streaming方式を用いて実装してみました。
アップロードするファイルのサイズが大きくなってくると、どうしてもアップロード処理が重たくなってきてしまうので、サーバー側にストリーミング方式でrequestを並列で送信でできるのは非常に良いかなと思いました。

Register as a new user and use Qiita more conveniently

  1. You can follow users and tags
  2. you can stock useful information
  3. You can make editorial suggestions for articles
What you can do with signing up
0
Help us understand the problem. What are the problem?