最近よく見かけるgRPCですが、メリットとして挙げられる双方向通信はどうするのでしょうか?
Javaでの利用はあまりまとまった文献が見当たらなかったのでまとめてみます
このドキュメントについて
- gRPCをJavaで利用する場合にTCPコネクションを維持したまま双方向通信どうするのかを記載しているつもり
- gRPCの設定やProtocolBufferなどの説明は関連資料に譲ります
利用した環境
実行環境など | バージョン |
---|---|
JDK | 1.8.0_121 |
Kotlin | 1.1.2 |
gRPC | 1.5.0 |
いちおうgRPCとは
ざっくりとした特徴。
- Googleが開発したRPCフレームワーク(Javaを利用する公式チュートリアルはこちら)
- Protocol Bufferを利用して複数言語間で統一的な呼び出しが可能
- HTTP/2を用いて高速、双方向な通信が可能
双方向通信どうするのか?
とりあえずのポイントは protoファイルにstramを付ける です
その後出来上がった スタブクラスに紐づくメソッドにおいて、 StreamObservable
を適切に処理することで双方向通信が可能です
単純なRequest / Response
まずはよくある 1リクエスト、1レスポンスの例を見てみます
文字列message
を受け取って 反転した文字列 result
を返す単純なサービスを作成
イメージ
+----------+ +---------------+
| |--- Request -->| |
| Client | | Server |
| |<-- Response ---| |
+----------+ +---------------+
protoファイル
syntax = "proto3";
message StringRequest {
string message = 1;
}
message ReverseReply {
string result = 1;
}
service ReverseString {
rpc Exec (StringRequest) returns (ReverseReply) {}
}
protoより生成されるJavaコード(抜粋)
ここには手をいれる必要はなし、ProtocolBufferにて自動生成されるコードがこんな感じというのを示しておきます
public static abstract class ReverseStringImplBase implements io.grpc.BindableService {
public void exec(StringRequest request,
io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
}
...
}
Serverコード(Kotlin)
Server側のコード抜粋です。ここではgRPCデフォルトの6565番ポートで起動する想定です
サーバ側はStreamObserverを利用したレスポンスを記載するが、 onNext
の呼び出しは1回しか許容されない模様
(試しに2回呼んでみたけど、処理はブロックされてうまく動かず)
class ReverseStringService : ReverseStringGrpc.ReverseStringImplBase() {
override fun exec(request: StringRequest?, responseObserver: StreamObserver<ReverseReply>?) {
println("req : " + request?.message);
val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()
responseObserver?.onNext(reply)
responseObserver?.onCompleted()
}
}
Clientコード(Kotlin)
後続の説明と合わせるために非同期処理のClient実装
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build()
val stub = ReverseStringGrpc.newStub(channel)
val message = StringRequest.newBuilder().setMessage("morimori").build()
stub.exec(message, object : StreamObserver<ReverseReply> {
override fun onNext(reply: ReverseReply) {
println("res : " + reply.result)
}
override fun onError(t: Throwable) {}
override fun onCompleted() {
println("complete")
}
})
実行結果
Server
req : morimori
Client
res : iromirom
complete
双方向のRequest / Response
同一TCPコネクションで複数回のRequest/Responseをやり取り出来るようにしたい
イメージ
+----------+ +---------------+
| |--- Request -->| |
| | | |
| |<-- Response ---| |
| | | |
| |--- Request -->| |
| Client | | Server |
| |<-- Response ---| |
| | | |
| |--- Request -->| |
| | | |
| |<-- Response ---| |
+----------+ +---------------+
protoファイル(抜粋)
ExecStream
を追加
syntax = "proto3";
...
service ReverseString {
rpc Exec (StringRequest) returns (ReverseReply) {}
rpc ExecStream (stream StringRequest) returns (stream ReverseReply) {}
}
protoより生成されるJavaコード
引数も戻り値も StreamObject
を持つstreamExecメソッドが追加されます
public static abstract class ReverseStringImplBase implements io.grpc.BindableService {
public void exec(StringRequest request,
io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
}
public io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.StringRequest> execStream(
io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.ReverseReply> responseObserver) {
return asyncUnimplementedStreamingCall(METHOD_EXEC_STREAM, responseObserver);
}
...
}
Serverコード(Kotlin)
StreamObservableを返すようなコードを記載します
実行結果の違いをわかりやすくするために、 1 Requestに対して2 Responseを返してみます
override fun execStream(responseObserver: StreamObserver<ReverseReply>?): StreamObserver<StringRequest> {
return object : StreamObserver<StringRequest> {
override fun onNext(request: StringRequest) {
println("req : " + request?.message);
val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()
responseObserver?.onNext(reply)
responseObserver?.onNext(reply)
}
override fun onError(t: Throwable) {}
override fun onCompleted() {
println("complete")
responseObserver?.onCompleted()
}
}
}
Clientコード(Kotlin)
Client側の実装は変わっていて、StreamObserver#onNext
を利用してリクエストを投げるようにstubの型が変更されてます
val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build()
val message_1 = StringRequest.newBuilder().setMessage("morimori1").build()
val message_2 = StringRequest.newBuilder().setMessage("morimori2").build()
val message_3 = StringRequest.newBuilder().setMessage("morimori3").build()
val stub = ReverseStringGrpc.newStub(channel)
val observer = stub.execStream(object : StreamObserver<ReverseReply> {
override fun onNext(reply: ReverseReply) {
println("res : " + reply.result)
}
override fun onError(t: Throwable?) {}
override fun onCompleted() {
println("complete")
}
})
observer.onNext(message_1)
observer.onNext(message_2)
observer.onNext(message_3)
observer.onComplete()
実行結果
Server
req : morimori1
req : morimori2
req : morimori3
complete
Client
res : 1iromirom
res : 1iromirom
res : 2iromirom
res : 2iromirom
res : 3iromirom
res : 3iromirom
complete
サンプルでの結果でもわかるように、以下のような偏った通信も可能です
Request:1、Response:N
+----------+ +---------------+
| |--- Request -->| |
| | | |
| | | |
| Client |<-- Response ---| Server |
| |<-- Response ---| |
| |<-- Response ---| |
| | | |
+----------+ +---------------+
Request:N、Response:1
+----------+ +---------------+
| |--- Request -->| |
| |--- Request -->| |
| |--- Request -->| |
| Client | | Server |
| | | |
| |<-- Response ---| |
| | | |
+----------+ +---------------+
もちろん Request:N, Response:N みたいなことも可能だと考えます
まとめ
双方向通信については、ProtocolBufferの定義に stream
を付けることで対応するスタブクラスが生成されました。
実装については、RxJavaのObservableに似たStreamObservableを用いて、連続的なイベント処理を意識すればそこまで戸惑うことは無いと感じました。
所感としては、かなり簡単にハイパフォーマンスな双方向かつ連続的なRPCが記述出来るなと
モバイルアプリでgRPCの利用も可能なためインクリメンタルサーチなどが効率的に実現可能になるかな、、など想いはめぐります。