Java
Kotlin
gRPC

gRPC Javaで双方向通信をやってみる

最近よく見かけるgRPCですが、メリットとして挙げられる双方向通信はどうするのでしょうか?
Javaでの利用はあまりまとまった文献が見当たらなかったのでまとめてみます

このドキュメントについて

  • gRPCをJavaで利用する場合にTCPコネクションを維持したまま双方向通信どうするのかを記載しているつもり
  • gRPCの設定やProtocolBufferなどの説明は関連資料に譲ります

利用した環境

実行環境など バージョン
JDK 1.8.0_121
Kotlin 1.1.2
gRPC 1.5.0

いちおうgRPCとは

ざっくりとした特徴。

  • Googleが開発したRPCフレームワーク(Javaを利用する公式チュートリアルはこちら)
  • Protocol Bufferを利用して複数言語間で統一的な呼び出しが可能
  • HTTP/2を用いて高速、双方向な通信が可能

双方向通信どうするのか?

とりあえずのポイントは protoファイルにstramを付ける です

その後出来上がった スタブクラスに紐づくメソッドにおいて、 StreamObservable を適切に処理することで双方向通信が可能です

単純なRequest / Response

まずはよくある 1リクエスト、1レスポンスの例を見てみます
文字列messageを受け取って 反転した文字列 result を返す単純なサービスを作成

イメージ

 +----------+                +---------------+
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |<-- Response ---|               |
 +----------+                +---------------+

protoファイル

syntax = "proto3";

message StringRequest {
    string message = 1;
}

message ReverseReply {
    string result = 1;
}

service ReverseString {
    rpc Exec (StringRequest) returns (ReverseReply) {}
}

protoより生成されるJavaコード(抜粋)

ここには手をいれる必要はなし、ProtocolBufferにて自動生成されるコードがこんな感じというのを示しておきます

public static abstract class ReverseStringImplBase implements io.grpc.BindableService {

    public void exec(StringRequest request,
        io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
    }

    ...
}

Serverコード(Kotlin)

Server側のコード抜粋です。ここではgRPCデフォルトの6565番ポートで起動する想定です
サーバ側はStreamObserverを利用したレスポンスを記載するが、 onNext の呼び出しは1回しか許容されない模様
(試しに2回呼んでみたけど、処理はブロックされてうまく動かず)

class ReverseStringService : ReverseStringGrpc.ReverseStringImplBase() {
    override fun exec(request: StringRequest?, responseObserver: StreamObserver<ReverseReply>?) {
        println("req : " + request?.message);
        val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()

        responseObserver?.onNext(reply)
        responseObserver?.onCompleted()
    }
}

Clientコード(Kotlin)

後続の説明と合わせるために非同期処理のClient実装

val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build()

val stub = ReverseStringGrpc.newStub(channel)
val message = StringRequest.newBuilder().setMessage("morimori").build()

stub.exec(message, object : StreamObserver<ReverseReply> {
    override fun onNext(reply: ReverseReply) {
        println("res : " + reply.result)
    }

    override fun onError(t: Throwable) {}

    override fun onCompleted() {
        println("complete")
    }
})

実行結果

Server

req : morimori

Client

res : iromirom
complete

双方向のRequest / Response

同一TCPコネクションで複数回のRequest/Responseをやり取り出来るようにしたい

イメージ

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |                |               |
 |          |<-- Response ---|               |
 |          |                |               |
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |<-- Response ---|               |
 |          |                |               |
 |          |---  Request -->|               |
 |          |                |               |
 |          |<-- Response ---|               |
 +----------+                +---------------+

protoファイル(抜粋)

ExecStream を追加

syntax = "proto3";

...

service ReverseString {
    rpc Exec (StringRequest) returns (ReverseReply) {}
    rpc ExecStream (stream StringRequest) returns (stream ReverseReply) {}
}

protoより生成されるJavaコード

引数も戻り値も StreamObject を持つstreamExecメソッドが追加されます

public static abstract class ReverseStringImplBase implements io.grpc.BindableService {

    public void exec(StringRequest request,
        io.grpc.stub.StreamObserver<ReverseReply> responseObserver) {
      asyncUnimplementedUnaryCall(METHOD_EXEC, responseObserver);
    }

    public io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.StringRequest> execStream(
        io.grpc.stub.StreamObserver<jp.a2kaido.helloworld.ReverseReply> responseObserver) {
      return asyncUnimplementedStreamingCall(METHOD_EXEC_STREAM, responseObserver);
    }

    ...
}


Serverコード(Kotlin)

StreamObservableを返すようなコードを記載します
実行結果の違いをわかりやすくするために、 1 Requestに対して2 Responseを返してみます

override fun execStream(responseObserver: StreamObserver<ReverseReply>?): StreamObserver<StringRequest> {
    return object : StreamObserver<StringRequest> {
        override fun onNext(request: StringRequest) {
            println("req : " + request?.message);
            val reply = ReverseReply.newBuilder().setResult(request?.message?.reversed()).build()

            responseObserver?.onNext(reply)
            responseObserver?.onNext(reply)
        }

        override fun onError(t: Throwable) {}

        override fun onCompleted() {
            println("complete")
            responseObserver?.onCompleted()
        }
    }
}

Clientコード(Kotlin)

Client側の実装は変わっていて、StreamObserver#onNext を利用してリクエストを投げるようにstubの型が変更されてます

val channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build()

val message_1 = StringRequest.newBuilder().setMessage("morimori1").build()
val message_2 = StringRequest.newBuilder().setMessage("morimori2").build()
val message_3 = StringRequest.newBuilder().setMessage("morimori3").build()

val stub = ReverseStringGrpc.newStub(channel)

val observer = stub.execStream(object : StreamObserver<ReverseReply> {
    override fun onNext(reply: ReverseReply) {
        println("res : " + reply.result)
    }

    override fun onError(t: Throwable?) {}

    override fun onCompleted() {
        println("complete")
    }
})

observer.onNext(message_1)
observer.onNext(message_2)
observer.onNext(message_3)
observer.onComplete()

実行結果

Server

req : morimori1
req : morimori2
req : morimori3
complete

Client

res : 1iromirom
res : 1iromirom
res : 2iromirom
res : 2iromirom
res : 3iromirom
res : 3iromirom
complete

サンプルでの結果でもわかるように、以下のような偏った通信も可能です

Request:1、Response:N

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |                |               |
 |          |                |               |
 |  Client  |<-- Response ---|     Server    |
 |          |<-- Response ---|               |
 |          |<-- Response ---|               |
 |          |                |               |
 +----------+                +---------------+

Request:N、Response:1

 +----------+                +---------------+
 |          |---  Request -->|               |
 |          |---  Request -->|               |
 |          |---  Request -->|               |
 |  Client  |                |     Server    |
 |          |                |               |
 |          |<-- Response ---|               |
 |          |                |               |
 +----------+                +---------------+

もちろん Request:N, Response:N みたいなことも可能だと考えます

まとめ

双方向通信については、ProtocolBufferの定義に stream を付けることで対応するスタブクラスが生成されました。
実装については、RxJavaのObservableに似たStreamObservableを用いて、連続的なイベント処理を意識すればそこまで戸惑うことは無いと感じました。

所感としては、かなり簡単にハイパフォーマンスな双方向かつ連続的なRPCが記述出来るなと
モバイルアプリでgRPCの利用も可能なためインクリメンタルサーチなどが効率的に実現可能になるかな、、など想いはめぐります。

関連資料