Java
gRPC

JavaでgRPC導入のためのポイント

gRPCとは

gRPCの概要については、こちらのエントリで記載しています。

このエントリでは、gRPCの運用で気になるポイントや、Javaで実装する場合を中心にまとめていこうと思います。

開発フロー

現代的なシステムではシステム間をAPIを通じて通信することが多くなってきています。
この場合、各システムのインターフェイスは以下のいずれかのパターンで開発が進む場合が多いです。

パターン1: 手動でインターフェイスのドキュメント(仕様書等)を記述し、サーバ、クライアントの開発者でその要件を満たす実装を行う
パターン2: サーバ側のコードから、インターフェイスドキュメントを自動生成し、クライアントがその要件を満たす実装を行う
パターン3: 手動でインターフェイスドキュメントを記述し、各システム用にサーバとクライアントのコードを自動生成する

gRPCを用いた開発では上記の3のパターンで行います。
この方法は、インターフェイスドキュメントを事前に定義しそれに対する実装も自動生成されるため、

  • ドキュメントとの差異が発生することがない
  • インターフェイスの合意が事前に取りやすい
  • 実装コストを減らせる

などのメリットがあります。

JavaでgRPC

最もシンプルなgRPCの実装をJavaで行う場合です。

プロジェクトの設定

build.gradle
buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.3'
    }
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'com.google.protobuf'

repositories {
    jcenter()
}

def grpcVersion = '1.8.0'

dependencies {
    compile "io.grpc:grpc-netty:${grpcVersion}"
    compile "io.grpc:grpc-protobuf:${grpcVersion}"
    compile "io.grpc:grpc-stub:${grpcVersion}"
}

protobuf {
    protoc {
        artifact = "com.google.protobuf:protoc:3.2.0"
    }
    plugins {
        grpc {
            artifact = 'io.grpc:protoc-gen-grpc-java:1.4.0'
        }
    }
    generateProtoTasks {
        all()*.plugins {
            grpc {}
        }
    }
}

// for IDEA
sourceSets {
    main {
        java {
            srcDirs 'build/generated/source/proto/main/grpc'
            srcDirs 'build/generated/source/proto/main/java'
        }
    }
}

.protoでインターフェイスを定義

src/main/proto/helloworld.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";

package helloworld;

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {}
}

message HelloRequest {
  string name = 1;
}

message HelloReply {
  string message = 1;
}

gradleから、使用するコード生成

> ./gradlew generatePtoto

サーバ側のコードを実装

*Grpc.*ImplBaseを継承したクラスに、.protoで定義した処理を実装し、ServerBuilderaddServiceで登録します。

DemoServer.java
class DemoServer {

    public static void main(String[] args) throws Exception {

        Server server = ServerBuilder.forPort(6565)
                .addService(new GreeterImpl())
                .build();

        server.start();

        server.awaitTermination();
    }

    static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
        @Override
        public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
            HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
        }
    }
}

クライアント側のコードを実装

ManagedChannelBuilderにより、サーバのChannelを生成し、自動生成されたStubクラスを使用しリクエストを実行します。

DemoClient.java
class DemoClient {
    public static void main(String[] args) {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build();

        GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);

        HelloRequest request = HelloRequest.newBuilder()
                .setName("Tom")
                .build();

        HelloReply reply = stub.sayHello(request);

        System.out.println("Reply: " + reply);
    }
}

DemoServerを起動を、DemoClientを実行すると以下のような出力がされます。

output
> Reply: message: "Hello Tom"

Spring BootでgRPC

Spring Bootの場合は、starterがあるので、それを使うと便利です。

dependenciesの変更

build.gradle
//...

dependencies {
//    compile "io.grpc:grpc-netty:${grpcVersion}"
//    compile "io.grpc:grpc-protobuf:${grpcVersion}"
//    compile "io.grpc:grpc-stub:${grpcVersion}"
    compile('org.lognet:grpc-spring-boot-starter:${grpcStarterVersion}')
}

//...

Spring BootのServer実装

*Grpc.*ImplBaseを継承したクラスに、@GRpcServiceを付加し、.protoで定義した処理を実装します。

DemoServerApplication.java
@SpringBootApplication
class DemoServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoServerApplication.class, args);
    }

    @GRpcService
    public static class GreeterService extends GreeterGrpc.GreeterImplBase {
        @Override
        public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
            HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();

        }
    }
}

gRPCの通信方式

gRPCでは、従来通りの単体の通信から、HTTP/2を利用した双方向通信も利用できます。

通信方式 イメージ 概要 用途
Unary RPC Kobito.wC31i8.png 1つのリクエストに対して1つのレスポンスを返す方式 一般的なAPI、アプリケーション間通信
Server streaming RPC Kobito.62GIoA.png 1つのリクエストに対して、複数のレスポンスを返す方式 サーバサイドプッシュ、タイムライン、フィード配信などサーバから複数のデータをプッシュする場合
Client streaming RPC Kobito.3xi5Yv.png 複数のリクエストに対して、1つのレスポンスを返す方式 大量のデータアップロードなど
Bidirectional streaming RPC Kobito.sAGiM5.png 複数のリクエストに対して、複数のレスポンスを返す方式 チャットなど双方向通信を行う場合

以下は、それぞれの実装例です。

protoファイル

src/main/proto/helloworld.proto
// ...

service Greeter {
  rpc SayHelloUnary (HelloRequest) returns (HelloReply) {}
  rpc SayHelloServerStreaming (HelloRequest) returns (stream HelloReply) {}
  rpc SayHelloClientStreaming (stream HelloRequest) returns (HelloReply) {}
  rpc SayHelloBidirectionalStreaming (stream HelloRequest) returns (stream HelloReply) {}
}

// ...

Serverの実装

DemoServer.java
class DemoServer {

    public static void main(String[] args) throws Exception {
        // ...
    }

    public static class GreeterService extends GreeterGrpc.GreeterImplBase {

        @Override
        public void sayHelloUnary(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
            HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
        }

        @Override
        public void sayHelloServerStreaming(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
            HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
            responseObserver.onNext(reply);
            responseObserver.onNext(reply);
            responseObserver.onNext(reply);
            responseObserver.onCompleted();
        }

        @Override
        public StreamObserver<HelloRequest> sayHelloClientStreaming(StreamObserver<HelloReply> responseObserver) {
            List<String> requests = new ArrayList<>();
            return new StreamObserver<HelloRequest>() {
                @Override
                public void onNext(HelloRequest request) {
                    requests.add(request.getName());
                }
                @Override
                public void onError(Throwable t) {
                    // ...
                }
                @Override
                public void onCompleted() {
                    HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + requests.toString()).build();
                    responseObserver.onNext(reply);
                    responseObserver.onCompleted();
                }
            };
        }

        @Override
        public StreamObserver<HelloRequest> sayHelloBidirectionalStreaming(StreamObserver<HelloReply> responseObserver) {
            return new StreamObserver<HelloRequest>() {
                @Override
                public void onNext(HelloRequest request) {
                    HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
                    responseObserver.onNext(reply);
                    responseObserver.onNext(reply);
                    responseObserver.onNext(reply);
                }
                @Override
                public void onError(Throwable t) {
                    // ...
                }
                @Override
                public void onCompleted() {
                    responseObserver.onCompleted();
                }
            };
        }
    }
}

クライアントの実装

DemoClient.java
class DemoClient {
    public static void main(String[] args) {
        // ...
    }

    String unary() {
        HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
        return blockingStub().sayHelloUnary(request).toString(); // message: "Hello Tom"
    }

    String serverStreaming() {
        HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
        Iterator<HelloReply> replies = blockingStub().sayHelloServerStreaming(request);
        List<HelloReply> response = new ArrayList<>();
        while (replies.hasNext()) {
            response.add(replies.next());
        }
        return response.toString(); // [message: "Hello Tom", message: "Hello Tom", message: "Hello Tom"]
    }

    String clientStreaming() throws Exception {
        HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
        CountDownLatch finishLatch = new CountDownLatch(1);
        List<HelloReply> response = new ArrayList<>();
        StreamObserver<HelloRequest> streamObserver = stub().sayHelloClientStreaming(new StreamObserver<HelloReply>() {
            @Override
            public void onNext(HelloReply reply) {
                response.add(reply);
            }
            @Override
            public void onError(Throwable t) {
                // ...
            }
            @Override
            public void onCompleted() {
                finishLatch.countDown();
            }
        });
        streamObserver.onNext(request);
        streamObserver.onNext(request);
        streamObserver.onNext(request);
        streamObserver.onCompleted();
        finishLatch.await(10, TimeUnit.SECONDS);
        return response.toString(); // message: "Hello [Tom, Tom, Tom]"
    }

    String bidirectionalStreaming() throws Exception {
        HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
        CountDownLatch finishLatch = new CountDownLatch(1);
        List<HelloReply> response = new ArrayList<>();
        StreamObserver<HelloRequest> streamObserver = stub().sayHelloBidirectionalStreaming(new StreamObserver<HelloReply>() {
            @Override
            public void onNext(HelloReply reply) {
                response.add(reply);
            }
            @Override
            public void onError(Throwable t) {
                // ...
            }
            @Override
            public void onCompleted() {
                finishLatch.countDown();
            }
        });
        streamObserver.onNext(request);
        streamObserver.onNext(request);
        streamObserver.onNext(request);
        streamObserver.onCompleted();
        finishLatch.await(10, TimeUnit.SECONDS);
        return response.toString(); // [message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" ]
    }

    private GreeterGrpc.GreeterBlockingStub blockingStub() {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build();
        return GreeterGrpc.newBlockingStub(channel);
    }

    private GreeterGrpc.GreeterStub stub() {
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
                .usePlaintext(true)
                .build();
        return GreeterGrpc.newStub(channel);
    }
}

Observerパターンを利用しているので、慣れていないと少し戸惑う部分もあるかもしれません。
また、この手のインターフェイスはRxJavaReactorでも採用されているので、それぞれと統合を行うためのreactive-grpcなども存在していたりします。

ロードバランシング

gRPCの通信は双方向通信などを可能にするため、TCPコネクションを利用した永続接続となります。
これはHTTP/1.1のような、通信ごとに接続を行う方式と比べ効率的な一方で、適切なロードバランシングが必要になります。
以下は、その方式と特徴です。

種類 説明 メリット デメリット
Proxy Proxy クライアント側の実装がシンプル
信頼性されないクライアントもアクセス可能
レイテンシが高い
LBによってのスループットに左右される
Client Side Client Side 中間層が無いためハイパフォーマンス クライアントの実装が複雑で、ヘルスチェックや負荷分散の仕組みも容易が必要
言語ごとに実装が必要
クライアントの信頼性を担保する仕組みが必要

Eurekaによるクライアントロードバランシング

Eurekaを使用することで、クライアントロードバランシングを実現できます。
実装方法はいくつかありまが、Serviceの実装と合わせてgrpc-spring-boot-starterを利用するのが便利です。

Spring Boot + EurekaのServer実装

サーバ側は通常のEurekaを使用する場合と同様、依存を追加しアプリケーション名や使用するポート、eurekaサーバの設定、Eurekaの有効化を行います。

build.gradle
//...

dependencies {
    // ...
    compile('org.springframework.cloud:spring-cloud-starter-eureka')
}

//...
bootstrap.yml
spring:
    application:
        name: demo-server // アプリケーション名の設定
application.yml
grpc:
    port: 6565 // gRPCのポート設定
eureka:
    instance:
        nonSecurePort: ${grpc.port} // gRPCポートとEurekaに設定
DemoServerApplication.java
@SpringBootApplication
@EnableEurekaClient // Eurekaの有効化
class DemoServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoServerApplication.class, args);
    }

    // ...
}

Spring Boot + EurekaのClient実装

クライアントの実装は単純で、依存を追加して、Eurekaの有効化、EurekaClientを使用してサーバのIPとポートを取得します。
ロードバランシングはEureka側で行うため、クライアントで特別な実装は不要です。

build.gradle
//...

dependencies {
    // ...
    compile('org.springframework.cloud:spring-cloud-starter-eureka')
}

//...
DemoClientApplication.java
@SpringBootApplication
@EnableEurekaClient // Eurekaの有効化
class DemoClientApplication {

    @Autowired
    EurekaClient client;

    void sayHello(HelloRequest request) {
        InstanceInfo instanceInfo = client.getNextServerFromEureka("backend-service", false); // EurekaClientからサーバ情報を取得
        ManagedChannel channel = ManagedChannelBuilder.forAddress(instanceInfo.getIPAddr(), instanceInfo.getPort()) // InstanceInfoからIPやポートを設定
                .usePlaintext(true)
                .build();

        GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
        stub.sayHello(request);
    }

    // ...
}

エラー処理

onErrorによるエラー処理

基本的なエラーはStreamObserveronErrorメソッドを利用します。

class DemoServer {
    @Override
    public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
        try {

            // ...  

        } catch (Exception e) {

            StatusRuntimeException exception = Status.INTERNAL
                    .withDescription(e.getMessage())
                    .withCause(e)
                    .asRuntimeException();
            responseObserver.onError(exception);
        }
    }
}
DemoClient.java
class DemoClient {
    public static void main(String[] args) {        
        try {

            // ...  

        } catch (StatusRuntimeException e) {
            e.printStackTrace(); // io.grpc.StatusRuntimeException: INTERNAL: error message...
        }
    }
}

Metadataによるエラー情報の付加

実運用の場合、エラーに関してより詳細な情報を扱いたい場合も多いと思います。

その場合に利用するのがMetadataです。
Metadataには様々な情報を付加することができますが、protoファイルを利用するのが便利です。

message Error {
  string message = 1;
  string detail = 2;
}
class DemoServer {
    @Override
    public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
        try {

            // ...  

        } catch (Exception e) {
            Metadata metadata = new Metadata(); // Metadataを生成
            Error error = Error.newBuilder()
                    .setMessage("my error")
                    .setDetail("error detail")
                    .build();
            Metadata.Key<Error> key = ProtoUtils.keyForProto(error);
            metadata.put(key, error); // エラー情報を追加

            StatusRuntimeException exception = Status.INTERNAL
                    .withDescription(e.getMessage())
                    .withCause(e)
                    .asRuntimeException(metadata); // Metadataを付加
            responseObserver.onError(exception);
        }
    }
}
DemoClient.java
class DemoClient {
    public static void main(String[] args) {        
        try {

            // ...  

        } catch (StatusRuntimeException e) {
            e.printStackTrace();
            Status status = Status.fromThrowable(e); // Status{code=INTERNAL, description=Invalid parameter, cause=null}
            Metadata metadata = Status.trailersFromThrowable(e); // Metadata(content-type=application/grpc,helloworld.error-bin=CghteSBlcnJvchIMZXJyb3IgZGV0YWls)
            Error error = metadata.get(ProtoUtils.keyForProto(Error.getDefaultInstance())); // error=message: "my error detail: "error detail"
        }
    }
}

Interceptorによる透過的なエラー処理

onErrorによるエラーハンドリン行わずサーバで例外が発生した場合、クライアント側ではStatusRuntimeException(Status=Unknown)が発生します。
これはサーバ側で例外が発生したことは検知できますが、どのような例外が発生してるかは不明です。
通常のアプリケーションではRuntimeExceptionが発生する可能性が常にあるので、サーバ側の各エンドポイントでtry-catchが必要になります。
ただし、このような処理は非常に冗長になるため、透過的に処理を埋め込むのがInterceptorです。
io.grpc.ServerInterceptorの実装をすることで独自のエラー処理を実現できますが、一般的なエラー処理でよければ標準のio.grpc.util.TransmitStatusRuntimeExceptionInterceptorを利用するのが便利です。

DemoServer.java
class DemoServer {

    public static void main(String[] args) throws Exception {

        Server server = ServerBuilder.forPort(6565)
                .intercept(TransmitStatusRuntimeExceptionInterceptor.instance())  // Interceptorの登録
                .addService(new GreeterImpl())
                .build();

        // ...          
    }

    static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
        @Override
        public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {

            // metadataの生成など...

            // onErrorの代わりに、StatusRuntimeExceptionをthrowする
            throw Status.INTERNAL
                .withDescription("server error")
                .asRuntimeException(metadata);
        }
    }
}

バリデーション

gRPCのバリデーション専用の仕様はないため、通常のエラーと同様に処理します。
また、不正な入力のためのStatus.INVALID_ARGUMENTも用意されています。

セキュリティ

gRPCではSSL/TLSを用いた認証や、Googleのトークンを使用した認証の仕組みを提供しています。

Spring Securityによる認可

Spring SecurityのPreAuthorizeのような機能が欲しくなるときもあると思います。
現状公式でそのような機能はサポートされていないですが、インターセプターを利用することで実装は可能です。
以下のエントリが参考になります。

DemoGrpcService.java
@GRpcService
public class DemoGrpcService extends DemoServiceGrpc.DemoServiceImplBase {

    @Override
    @PreAuthorize("hasRole('USER')")
    public void list(ListRequest request, StreamObserver<ListResponse> responseObserver) {
        // ...
    }

    @Override
    @PreAuthorize("hasRole('ADMIN')")
    public void buy(BuyRequest request, StreamObserver<BuyResponse> responseObserver) {
        // ...
    }
}

また、grpc-spring-boot-starterのissueでも検討されています。

ただし、現状のgRPCの用途として、広く公開された環境というより、プラットフォーム内の通信として利用する場合も多いので、どこまで制御が必要かは検討したほうがいいと思います。

テスト

gRPCのテストを行う場合には、公式で提供されているライブラリを使用して実行します。

サーバーのテスト

build.gradle
//...

dependencies {
    // ...
    testCompile "io.grpc:grpc-testing:${grpcVersion}"
}

//...
DemoServerTest.java
class DemoServerTest {

    // gRPCライブラリから提供されるRuleクラスを利用する
    @Rule
    public GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();

    @Test
    public void test() {
        grpcServerRule.getServiceRegistry().addService(new GreeterService());
        GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(grpcServerRule.getChannel());

        String testName = "test name";
        HelloReply reply = blockingStub.sayHello(HelloRequest.newBuilder().setName(testName).build());

        assertEquals("Hello " + testName, reply.getMessage());
    }
}

クライアントのテスト

DemoClientTest.java
class DemoClientTest {

    // gRPCライブラリから提供されるRuleクラスを利用する
    @Rule
    public GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();

    @Test
    public void test() {
        GreeterGrpc.GreeterImplBase serviceImpl = Mockito.spy(new GreeterGrpc.GreeterImplBase() {}); // Server実装のモック化
        grpcServerRule.getServiceRegistry().addService(serviceImpl);

        ArgumentCaptor<HelloRequest> requestCaptor = ArgumentCaptor.forClass(HelloRequest.class);
        String testName = "test name";

        DemoClient client = new DemoClient(grpcServerRule.getChannel());;
        client.hello(testName);

        Mockito.verify(serviceImpl)
                .sayHello(requestCaptor.capture(), Matchers.any()); // 実行内容のキャプチャ

        assertEquals(testName, requestCaptor.getValue().getName());
    }

}

.protoの管理

gRPCでProtocol Buffersを利用する上で必要なのが、.protoファイルの管理です。
この方法に関しては公式でサポートされた機能はないため、代表的な2種類を記載します。

専用のリポジトリ

メルカリなどで採用しているのが.protoの専用リポジトリを作成する方法です。(Protoファイルの管理)
この場合CIと連携させて、クライアント言語ごとのライブラリを生成しやすくなります。
また、そこまでしなくてもGitのsubmoduleなどを使用して、クライアント側に.protoファイルを取り込むのという手もあります。

サーバ側のリポジトリに同胞

専用リポジトリで管理は、サーバ側のコードと.protoファイルのリポジトリが分離しまいますが、実装を行うサーバ側に.protoを含めたほうが扱いやすいという場合も多いと思います。
その場合、protodepを利用することで、クライアント側で各リポジトリ内の.protoファイルを集約することができます。
これにより、サーバ側のアプリケーションで.protoファイルを管理し、クライアント側で必要となる.protoファイルを使用するという運用が可能です。

REST API

gRPCで構築したアプリケーションをREST API経由でアクセスしたいという場合も多いと思います。
この場合に便利なのが、grpc-gatewayです。
こちらは、少し長くなりそうなので、また別エントリとしてまとめようと思います。

まとめ

gRPCは複数アプリケーションを経由したシステムを構築する上で非常に有効な仕組みですが、実際に運用を行う上で不明点も多いと思いうのでまとめめした。
また、更に知見が溜まったらこちらのエントリもアップデートしていこうと思います。

参考