はじめに
マイクロサービス方式の開発で選定されることが多いgRPCを用いてクライアントからのファイルアップロード処理を実装してみました。
gRPCの通信方式
- Unary RPCs
シンプルな通信方式。1つのrequestに対して1つのresponseが返ってくる方式になります。gRPCの通信方式の中で一番REST方式に近いものになると思います。 - Server streaming RPCs
クライアントからの1つのrequestに対して、サーバーから複数のresponseを返すことができる通信方式になります。サーバーからクライアントに向けてサイズの大きいファイルなどを送信したい場合に使うことが多いと思います。 - Client streaming RPCs
クライアントからの複数のrequestに対して、サーバーから1つのresponseを返す通信方式になります。クライアントからサーバーに向けてサイズの大きいファイルなどをアップロードする際に使うことが多いと思います。 - Bidirectional streaming RPCs
クライアントからの複数のrequestに対して、サーバーから複数のresposeを返すことができる通信方式になります。私自身、こちらの通信方式を用いての実装を未だ行ったことがありません。今後ユースケースを探っていきたいと思います。
実装方針
今回はクライアントからPDFファイルやテキストファイルをサーバーに送信する形式で実装を進めたいと思います。
ファイルアップロード処理の大まかな流れは以下のようなイメージになります。
- gRPCのstream方式を用いてクライアント側からアップロードするファイルをバイナリデータに変換したうえで送信する。
- クライアント側から送信されてきたバイナリデータをサーバー側で復元し、ファイルを作成する。
クライアントからアップロードされるファイルのサイズが大きなものになるケースを想定して、 今回は、Client streaming RPCs
方式で実装を進めます。
protocol buffersの定義
Client streaming RPCs
を使用する場合には、protoのrpc関数定義箇所にて、以下のように stream
という設定を追加する必要があります。
クライアントから送信するrequestのフィールド定義には、byte型を指定してupload_file
を追加し、バイナリ形式のデータを送信できる様にします。
service ArchiveService {
rpc Update(stream UploadArchiveRequest) returns (UploadArchiveResponse) {}
rpc Create(stream UploadArchiveRequest) returns (UploadArchiveResponse) {}
}
message UploadArchiveRequest {
Archive archive = 1;
bytes upload_file = 2;
}
message UploadArchiveResponse {
Archive archive = 1;
}
client側の実装
先ほどprotocol buffers
にてstrema方式で Create
rpc関数を定義したと思います。
stream方式で定義されたrpc関数を呼び出すとサーバーとクライアント間で通信のコネクションが発生します。
クラインと側で stream.Send(&genproto.UploadArchiveRequest{UploadFile:convertFileToBinary})
を1回呼び出すと、サーバー側に1つのリクエストが送信されます。
stream.Send
を複数回実行すると1回のコネクションでクライアントから複数のrequestを送信することができます。
この特徴を活かして、容量の大きなデータをいくつかに分割し、複数回のrequestとして送信するということが実現可能となります。
package services
import (
"backend-go/genproto"
"context"
"io"
"log"
"os"
"testing"
_ "github.com/go-sql-driver/mysql"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)
func TestArchiveServiceCreate(t *testing.T) {
if err := fixtures.Load(); err != nil {
log.Fatalf("failed load fixtures: %v", err)
}
ctx := context.Background()
conn, err := grpc.DialContext(ctx, "bufnet", grpc.WithContextDialer(bufDialer), grpc.WithTransportCredentials(insecure.NewCredentials()))
if err != nil {
t.Fatalf("failed connect grpc: %v", err)
}
defer conn.Close()
fileName := "dummyFile.xls"
client := genproto.NewArchiveServiceClient(conn)
stream, err := client.Create(ctx)
if err != nil {
t.Fatalf("failed start to connecting stream : %v", err)
}
stream.Send(&genproto.UploadArchiveRequest{
Archive: &genproto.Archive{
OperatorId: "1",
OperatorCoId: 1,
FileType: genproto.Archive_ARCHIVE_FILE_TYPE_INTERNAL,
FileName: fileName,
ServiceNo: false,
FileTitle: "dummyFileTitle",
FileVersion: 1.0,
FileExplanation: "dummyFileExplanation",
FileConfidential: true,
FileDeleteFlg: false,
},
})
file, _ := os.Open("../testdata/" + fileName)
defer file.Close()
for {
convertFileToBinary, err := convertFileToBinary(t, file)
if err == io.EOF {
break
}
if err != nil {
t.Fatalf("failed upload file: %v", err)
}
stream.Send(&genproto.UploadArchiveRequest{
UploadFile: convertFileToBinary,
})
}
create, err := stream.CloseAndRecv()
if err != nil {
t.Fatalf("failed upload file: %v", err)
}
assert.Nil(t, err)
assert.Equal(t, "dummyCoName1", create.Archive.GetOperatorCoName())
assert.Equal(t, genproto.Archive_ArchiveFileType(genproto.Archive_ARCHIVE_FILE_TYPE_INTERNAL), create.Archive.GetFileType())
assert.Equal(t, fileName, create.Archive.GetFileName())
assert.Equal(t, false, create.Archive.GetServiceNo())
assert.Equal(t, "dummyFileTitle", create.Archive.GetFileTitle())
assert.Equal(t, ".xls", create.Archive.GetFileExtension())
assert.Equal(t, 1.0, create.Archive.GetFileVersion())
assert.Equal(t, "dummyFileExplanation", create.Archive.GetFileExplanation())
assert.Equal(t, true, create.Archive.GetFileConfidential())
assert.Equal(t, false, create.Archive.GetFileDeleteFlg())
}
func convertFileToBinary(t *testing.T, file *os.File) (fileBinary []byte, err error) {
uploadFileBinary := make([]byte, 1024)
count, err := file.Read(uploadFileBinary)
t.Log("file successfully loaded", count)
if err != nil {
return nil, err
}
return uploadFileBinary, nil
}
server側の実装
サーバー側でfor文を回して複数のリクエストを受け取る処理を書かなければいけないので、サーバー側の実装については少々面倒な印象を持ちました。
複数のリクエストを受け取る処理に関しては、Server Streaming RPCS
方式を用いてCSV出力処理を実装した時と同様に、ヘルパー関数を定義して、抽象化したいところではあります。
package services
import (
"backend-go/ent"
"backend-go/genproto"
"context"
"io"
)
// ArchiveService implements ArchiveServiceServer
type ArchiveService struct {
client *ent.Client
genproto.UnimplementedArchiveServiceServer
}
// NewArchiveService returns a new ArchiveService
func NewArchiveService(client *ent.Client) *ArchiveService {
return &ArchiveService{
client: client,
}
}
// Create implements ArchiveServiceServer.Create
func (svc *ArchiveService) Create(stream genproto.ArchiveService_CreateServer) error {
var (
archive *genproto.Archive
)
for {
res, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
if res.Archive != nil {
archive = res.GetArchive()
}
}
// TODO: implement logic of Uploading file to s3
ext := getFileExtension(archive.FileName)
m := svc.client.ArchiveFile.Create()
m.SetOperatorCoID(uint32(1))
m.SetArchiveFileType(int32(archive.FileType))
m.SetArchiveFileName(archive.FileName)
m.SetServiceNo(archive.ServiceNo)
m.SetArchiveFileTitle(archive.FileTitle)
m.SetArchiveFileExtension(ext)
m.SetArchiveFileVersion(archive.FileVersion)
m.SetArchiveFileExplanation(archive.FileExplanation)
m.SetArchiveFileConfidential(archive.FileConfidential)
m.SetArchiveFileDeleteFlg(archive.FileDeleteFlg)
res, err := m.Save(stream.Context())
if err != nil {
return err
}
ocm, err := res.QueryOrganization().Only(stream.Context())
if err != nil {
return err
}
err = stream.SendAndClose(&genproto.UploadArchiveResponse{
Archive: toProtoArchive(res, ocm),
})
if err != nil {
return err
}
return nil
}
終わりに
今回は、ファイルアップロード処理をgRPCのClient streaming
方式を用いて実装してみました。
アップロードするファイルのサイズが大きくなってくると、どうしてもアップロード処理が重たくなってきてしまうので、サーバー側にストリーミング方式でrequestを並列で送信でできるのは非常に良いかなと思いました。