はじめに
gRPCのstreamingについて、クライアント側からキャンセルする方法のサンプルが見つからなかったため調査しました。
(bi-directional streamingについては公式にサンプルがあるのでそちらを参照)
プロトコル定義ファイル
サンプル実装用に以下のプロトコル定義ファイルを用意しました。
syntax = "proto3";
option go_package = "github.com/Aniokrait/goGrpcServer2/helloworld";
package helloworld;
service Greeter {
// レスポンスをstreamにしています。
rpc SayHello (HelloRequest) returns (stream HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
クライアント側実装
grpc-kotlinを使用している場合、Protocol bufferでレスポンスの型をstreamにすると戻り値がFlowになります。クライアント側でこのFlowをキャンセルすることで、streamingを停止させることができます。
ここでは以下のようにデータレイヤのクラスから、streamingを開始しました。
class GrpcDatasource {
// 簡易的にManagedChannelをdataSourceクラス内に定義
private val channel = let {
val builder = ManagedChannelBuilder.forAddress("10.0.2.2", 50051)
builder.usePlaintext()
builder.executor(Dispatchers.IO.asExecutor()).build()
}
// サーバからstreamを受け取るメソッド
fun sayHello(): Flow<HelloReply> {
val greeter = GreeterGrpcKt.GreeterCoroutineStub(channel)
val response = greeter.sayHello(HelloRequest.newBuilder().setName("John").build())
Log.d("GrpcDatasource", response.toString())
return response
}
}
上記実装によって、name = "John"というリクエストをサーバに最初に送信し、それに対して複数回レスポンスを受け取ることができるようになりました。
次にViewModel経由でUIでこのFlowを監視し、Stateを監視しているコンポーネントが非表示になったら、streamingがキャンセルされるようにします。
// サンプル実装なのでGrpcDatasourceおよびMainViewModelは直接インスタンスを生成しています。
class MainViewModel: ViewModel() {
private val dataSource = GrpcDatasource()
val myState = dataSource.sayHello()
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5_000),
initialValue = HelloReply.getDefaultInstance())
}
@Composable
fun MainScreen() {
val vm = MainViewModel()
val myState by vm.myState.collectAsState()
}
myState変数はStateFlowなので、監視者がいなくなるとFlowがキャンセルされます。
したがってMainScreenから別の画面に遷移したり、アプリをバックグラウンドに移行させた際に、自動的にストリーミングがキャンセルされるようになっています。
サーバ側実装
ではサーバ側を実装していきます。ここではGoとKotlinの例を紹介します。
基本的にはサーバ側でループし、ループのたびにレスポンスを返すというのは変わらないのですが、Kotlinの場合はクライアントによってstreamingがキャンセルされたかを都度判定する必要があります。
(grpc-kotlinは内部的にgrpc-javaに依存していたはずなのでKotlinも自動的にキャンセルされるかと思ったのですが。。ピュアなgrpc-javaではなくLogNetのSpringBootを使用しているのも関係しているかもしれません。要調査)
Goの場合
package main
import (
"log"
"net"
"time"
"strconv"
pb "github.com/Aniokrait/goGrpcServer2/helloworld"
"google.golang.org/grpc"
"google.golang.org/grpc/reflection"
)
type server struct {
pb.UnimplementedGreeterServer
savedFeatures []*pb.HelloReply // read-only after initialized
}
func (s *server) SayHello(in *pb.HelloRequest, stream pb.Greeter_SayHelloServer) error {
log.Printf("Received: %v", in.GetName())
counter := 0
for {
counter++
log.Printf("Sending: %v", in.Name)
// "Hello, $リクエストで設定したname + $レスポンス回数"を返す。
// 例: Hello, John1
if err := stream.Send(&pb.HelloReply{Message: "Hello, " + in.Name + strconv.Itoa(counter)}); err != nil {
log.Printf("failed to listen: %v", err)
break
}
// 3秒おきにレスポンスを返す
time.Sleep(3 * time.Second)
}
return nil
}
func main() {
//localhost:50051でリッスンする
lis, err := net.Listen("tcp", ":50051")
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
s := grpc.NewServer()
pb.RegisterGreeterServer(s, &server{})
reflection.Register(s)
if err := s.Serve(lis); err != nil {
log.Fatalf("failed to serve: %v", err)
}
}
Kotlinの場合
@GRpcService
class MainController: GreeterGrpc.GreeterImplBase() {
override fun sayHello(request: HelloRequest, responseObserver: StreamObserver<HelloReply>) {
var counter = 0
while (true) {
counter++
// Contextから通信がキャンセルされているかどうかを取得できる。
val isCanceled = Context.current().isCancelled
println("canceld: $isCanceled")
// キャンセルされてたらループを抜けて終了。
if(isCanceled) {
break
}
// "Hello, $リクエストで設定したname + $レスポンス回数"を返す。
// 例: Hello, John1
responseObserver.onNext(HelloReply.newBuilder()
.setMessage("Hello, ${request.getName()}$counter")
.build())
println("reply: ${request.getName()}")
// 3秒おきにレスポンスを返す
Thread.sleep(3000)
}
println("end")
responseObserver.onCompleted();
}
}
さいごに
Kotlinによるサーバを実装した際に自動的にキャンセルにならなかったのでGoでサーバを構築しましたが、やはりGo言語の方がgRPCとは相性がよいですね。
メタデータの取り扱いやインターセプターの実装なんかもドキュメントを読む限りGoの方が扱いやすそうなので、gRPCを使用する際はサーバサイドはGo言語を選択したほうが良さそうです。