gRPCとは
gRPCの概要については、こちらのエントリで記載しています。
このエントリでは、gRPCの運用で気になるポイントや、Javaで実装する場合を中心にまとめていこうと思います。
開発フロー
現代的なシステムではシステム間をAPIを通じて通信することが多くなってきています。
この場合、各システムのインターフェイスは以下のいずれかのパターンで開発が進む場合が多いです。
パターン1: 手動でインターフェイスのドキュメント(仕様書等)を記述し、サーバ、クライアントの開発者でその要件を満たす実装を行う
パターン2: サーバ側のコードから、インターフェイスドキュメントを自動生成し、クライアントがその要件を満たす実装を行う
パターン3: 手動でインターフェイスドキュメントを記述し、各システム用にサーバとクライアントのコードを自動生成する
gRPCを用いた開発では上記の3のパターンで行います。
この方法は、インターフェイスドキュメントを事前に定義しそれに対する実装も自動生成されるため、
- ドキュメントとの差異が発生することがない
- インターフェイスの合意が事前に取りやすい
- 実装コストを減らせる
などのメリットがあります。
JavaでgRPC
最もシンプルなgRPCの実装をJavaで行う場合です。
プロジェクトの設定
buildscript {
repositories {
mavenCentral()
}
dependencies {
classpath 'com.google.protobuf:protobuf-gradle-plugin:0.8.8'
}
}
apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'com.google.protobuf'
repositories {
jcenter()
}
def grpcVersion = '1.21.0'
dependencies {
compile "io.grpc:grpc-netty:${grpcVersion}"
compile "io.grpc:grpc-protobuf:${grpcVersion}"
compile "io.grpc:grpc-stub:${grpcVersion}"
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:3.8.0"
}
plugins {
grpc {
artifact = 'io.grpc:protoc-gen-grpc-java:1.21.0'
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}
// for IDEA
sourceSets {
main {
java {
srcDirs 'build/generated/source/proto/main/grpc'
srcDirs 'build/generated/source/proto/main/java'
}
}
}
.protoでインターフェイスを定義
syntax = "proto3";
option java_multiple_files = true;
option java_package = "io.grpc.examples.helloworld";
package helloworld;
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {}
}
message HelloRequest {
string name = 1;
}
message HelloReply {
string message = 1;
}
gradleから、使用するコード生成
> ./gradlew generateProto
サーバ側のコードを実装
*Grpc.*ImplBase
を継承したクラスに、.proto
で定義した処理を実装し、ServerBuilder
のaddService
で登録します。
class DemoServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(6565)
.addService(new GreeterImpl())
.build();
server.start();
server.awaitTermination();
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
クライアント側のコードを実装
ManagedChannelBuilder
により、サーバのChannelを生成し、自動生成されたStubクラスを使用しリクエストを実行します。
class DemoClient {
public static void main(String[] args) {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.build();
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
HelloRequest request = HelloRequest.newBuilder()
.setName("Tom")
.build();
HelloReply reply = stub.sayHello(request);
System.out.println("Reply: " + reply);
}
}
DemoServerを起動を、DemoClientを実行すると以下のような出力がされます。
> Reply: message: "Hello Tom"
Spring BootでgRPC
Spring Bootの場合は、starterがあるので、それを使うと便利です。
dependenciesの変更
//...
dependencies {
// compile "io.grpc:grpc-netty:${grpcVersion}"
// compile "io.grpc:grpc-protobuf:${grpcVersion}"
// compile "io.grpc:grpc-stub:${grpcVersion}"
compile('org.lognet:grpc-spring-boot-starter:${grpcStarterVersion}')
}
//...
Spring BootのServer実装
*Grpc.*ImplBase
を継承したクラスに、@GRpcService
を付加し、.proto
で定義した処理を実装します。
@SpringBootApplication
class DemoServerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoServerApplication.class, args);
}
@GRpcService
public static class GreeterService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
}
gRPCの通信方式
gRPCでは、従来通りの単体の通信から、HTTP/2を利用した双方向通信も利用できます。
以下は、それぞれの実装例です。
protoファイル
// ...
service Greeter {
rpc SayHelloUnary (HelloRequest) returns (HelloReply) {}
rpc SayHelloServerStreaming (HelloRequest) returns (stream HelloReply) {}
rpc SayHelloClientStreaming (stream HelloRequest) returns (HelloReply) {}
rpc SayHelloBidirectionalStreaming (stream HelloRequest) returns (stream HelloReply) {}
}
// ...
Serverの実装
class DemoServer {
public static void main(String[] args) throws Exception {
// ...
}
public static class GreeterService extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHelloUnary(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void sayHelloServerStreaming(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onNext(reply);
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public StreamObserver<HelloRequest> sayHelloClientStreaming(StreamObserver<HelloReply> responseObserver) {
List<String> requests = new ArrayList<>();
return new StreamObserver<HelloRequest>() {
@Override
public void onNext(HelloRequest request) {
requests.add(request.getName());
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + requests.toString()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
};
}
@Override
public StreamObserver<HelloRequest> sayHelloBidirectionalStreaming(StreamObserver<HelloReply> responseObserver) {
return new StreamObserver<HelloRequest>() {
@Override
public void onNext(HelloRequest request) {
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + request.getName()).build();
responseObserver.onNext(reply);
responseObserver.onNext(reply);
responseObserver.onNext(reply);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
responseObserver.onCompleted();
}
};
}
}
}
クライアントの実装
class DemoClient {
public static void main(String[] args) {
// ...
}
String unary() {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
return blockingStub().sayHelloUnary(request).toString(); // message: "Hello Tom"
}
String serverStreaming() {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
Iterator<HelloReply> replies = blockingStub().sayHelloServerStreaming(request);
List<HelloReply> response = new ArrayList<>();
while (replies.hasNext()) {
response.add(replies.next());
}
return response.toString(); // [message: "Hello Tom", message: "Hello Tom", message: "Hello Tom"]
}
String clientStreaming() throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
CountDownLatch finishLatch = new CountDownLatch(1);
List<HelloReply> response = new ArrayList<>();
StreamObserver<HelloRequest> streamObserver = stub().sayHelloClientStreaming(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
response.add(reply);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
});
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onCompleted();
finishLatch.await(10, TimeUnit.SECONDS);
return response.toString(); // message: "Hello [Tom, Tom, Tom]"
}
String bidirectionalStreaming() throws Exception {
HelloRequest request = HelloRequest.newBuilder().setName("Tom").build();
CountDownLatch finishLatch = new CountDownLatch(1);
List<HelloReply> response = new ArrayList<>();
StreamObserver<HelloRequest> streamObserver = stub().sayHelloBidirectionalStreaming(new StreamObserver<HelloReply>() {
@Override
public void onNext(HelloReply reply) {
response.add(reply);
}
@Override
public void onError(Throwable t) {
// ...
}
@Override
public void onCompleted() {
finishLatch.countDown();
}
});
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onNext(request);
streamObserver.onCompleted();
finishLatch.await(10, TimeUnit.SECONDS);
return response.toString(); // [message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" , message: "Hello Tom" ]
}
private GreeterGrpc.GreeterBlockingStub blockingStub() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build();
return GreeterGrpc.newBlockingStub(channel);
}
private GreeterGrpc.GreeterStub stub() {
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext(true)
.build();
return GreeterGrpc.newStub(channel);
}
}
Observerパターンを利用しているので、慣れていないと少し戸惑う部分もあるかもしれません。
また、この手のインターフェイスはRxJavaやReactorでも採用されているので、それぞれと統合を行うためのreactive-grpcなども存在していたりします。
ロードバランシング
gRPCの通信は双方向通信などを可能にするため、TCPコネクションを利用した永続接続となります。
これはHTTP/1.1のような、通信ごとに接続を行う方式と比べ効率的な一方で、適切なロードバランシングが必要になります。
以下は、その方式と特徴です。
種類 | 説明 | メリット | デメリット |
---|---|---|---|
Proxy | クライアント側の実装がシンプル 信頼性されないクライアントもアクセス可能 |
レイテンシが高い LBによってのスループットに左右される |
|
Client Side | 中間層が無いためハイパフォーマンス | クライアントの実装が複雑で、ヘルスチェックや負荷分散の仕組みも用意が必要 言語ごとに実装が必要 クライアントの信頼性を担保する仕組みが必要 |
Eurekaによるクライアントロードバランシング
Eurekaを使用することで、クライアントロードバランシングを実現できます。
実装方法はいくつかありまが、Serviceの実装と合わせてgrpc-spring-boot-starterを利用するのが便利です。
Spring Boot + EurekaのServer実装
サーバ側は通常のEurekaを使用する場合と同様、依存を追加しアプリケーション名や使用するポート、eurekaサーバの設定、Eurekaの有効化を行います。
//...
dependencies {
// ...
compile('org.springframework.cloud:spring-cloud-starter-eureka')
}
//...
spring:
application:
name: demo-server // アプリケーション名の設定
grpc:
port: 6565 // gRPCのポート設定
eureka:
instance:
nonSecurePort: ${grpc.port} // gRPCポートとEurekaに設定
@SpringBootApplication
@EnableEurekaClient // Eurekaの有効化
class DemoServerApplication {
public static void main(String[] args) {
SpringApplication.run(DemoServerApplication.class, args);
}
// ...
}
Spring Boot + EurekaのClient実装
クライアントの実装は単純で、依存を追加して、Eurekaの有効化、EurekaClientを使用してサーバのIPとポートを取得します。
ロードバランシングはEureka側で行うため、クライアントで特別な実装は不要です。
//...
dependencies {
// ...
compile('org.springframework.cloud:spring-cloud-starter-eureka')
}
//...
@SpringBootApplication
@EnableEurekaClient // Eurekaの有効化
class DemoClientApplication {
@Autowired
EurekaClient client;
void sayHello(HelloRequest request) {
InstanceInfo instanceInfo = client.getNextServerFromEureka("backend-service", false); // EurekaClientからサーバ情報を取得
ManagedChannel channel = ManagedChannelBuilder.forAddress(instanceInfo.getIPAddr(), instanceInfo.getPort()) // InstanceInfoからIPやポートを設定
.usePlaintext(true)
.build();
GreeterGrpc.GreeterBlockingStub stub = GreeterGrpc.newBlockingStub(channel);
stub.sayHello(request);
}
// ...
}
エラー処理
onErrorによるエラー処理
基本的なエラーはStreamObserver
のonError
メソッドを利用します。
class DemoServer {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
try {
// ...
} catch (Exception e) {
StatusRuntimeException exception = Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException();
responseObserver.onError(exception);
}
}
}
class DemoClient {
public static void main(String[] args) {
try {
// ...
} catch (StatusRuntimeException e) {
e.printStackTrace(); // io.grpc.StatusRuntimeException: INTERNAL: error message...
}
}
}
Metadataによるエラー情報の付加
実運用の場合、エラーに関してより詳細な情報を扱いたい場合も多いと思います。
その場合に利用するのがMetadata
です。
Metadata
には様々な情報を付加することができますが、protoファイルを利用するのが便利です。
message Error {
string message = 1;
string detail = 2;
}
class DemoServer {
@Override
public void sayHello(HelloRequest request, StreamObserver<HelloReply> responseObserver) {
try {
// ...
} catch (Exception e) {
Metadata metadata = new Metadata(); // Metadataを生成
Error error = Error.newBuilder()
.setMessage("my error")
.setDetail("error detail")
.build();
Metadata.Key<Error> key = ProtoUtils.keyForProto(error);
metadata.put(key, error); // エラー情報を追加
StatusRuntimeException exception = Status.INTERNAL
.withDescription(e.getMessage())
.withCause(e)
.asRuntimeException(metadata); // Metadataを付加
responseObserver.onError(exception);
}
}
}
class DemoClient {
public static void main(String[] args) {
try {
// ...
} catch (StatusRuntimeException e) {
e.printStackTrace();
Status status = Status.fromThrowable(e); // Status{code=INTERNAL, description=Invalid parameter, cause=null}
Metadata metadata = Status.trailersFromThrowable(e); // Metadata(content-type=application/grpc,helloworld.error-bin=CghteSBlcnJvchIMZXJyb3IgZGV0YWls)
Error error = metadata.get(ProtoUtils.keyForProto(Error.getDefaultInstance())); // error=message: "my error detail: "error detail"
}
}
}
Interceptorによる透過的なエラー処理
onErrorによるエラーハンドリン行わずサーバで例外が発生した場合、クライアント側ではStatusRuntimeException(Status=Unknown)が発生します。
これはサーバ側で例外が発生したことは検知できますが、どのような例外が発生してるかは不明です。
通常のアプリケーションではRuntimeExceptionが発生する可能性が常にあるので、サーバ側の各エンドポイントでtry-catchが必要になります。
ただし、このような処理は非常に冗長になるため、透過的に処理を埋め込むのがInterceptorです。
io.grpc.ServerInterceptor
の実装をすることで独自のエラー処理を実現できますが、一般的なエラー処理でよければ標準のio.grpc.util.TransmitStatusRuntimeExceptionInterceptor
を利用するのが便利です。
class DemoServer {
public static void main(String[] args) throws Exception {
Server server = ServerBuilder.forPort(6565)
.intercept(TransmitStatusRuntimeExceptionInterceptor.instance()) // Interceptorの登録
.addService(new GreeterImpl())
.build();
// ...
}
static class GreeterImpl extends GreeterGrpc.GreeterImplBase {
@Override
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver) {
// metadataの生成など...
// onErrorの代わりに、StatusRuntimeExceptionをthrowする
throw Status.INTERNAL
.withDescription("server error")
.asRuntimeException(metadata);
}
}
}
リトライ処理
サーバが何かしらの原因でエラーが発生する場合には、クライアント側でリトライを行うことができます。
class DemoClient {
public static void main(String[] args) {
// リトライポリシーの設定
Map<String, Object> retryPolicy = new HashMap<>();
retryPolicy.put("maxAttempts", 3D);
retryPolicy.put("initialBackoff", "0.5s");
retryPolicy.put("maxBackoff", "1s");
retryPolicy.put("backoffMultiplier", 2D);
retryPolicy.put("retryableStatusCodes", Arrays.asList("UNAVAILABLE"));
Map<String, Object> methodConfig = new HashMap<>();
Map<String, Object> name = new HashMap<>();
name.put("service", "helloworld.Greeter");
methodConfig.put("name", Collections.singletonList(name));
methodConfig.put("retryPolicy", retryPolicy);
Map<String, Object> serviceConfig = new HashMap<>();
serviceConfig.put("methodConfig", Collections.singletonList(methodConfig));
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 6565)
.usePlaintext()
.enableRetry() // リトライの有効化
.defaultServiceConfig(serviceConfig) // 設定の適用
.build();
// ...
// UNAVAILABLEが発生した場合、自動的にリトライポリシーに従いリトライを行う
HelloReply reply = stub.sayHello(request);
// ...
}
}
バリデーション
gRPCのバリデーション専用の仕様はないため、通常のエラーと同様に処理します。
また、不正な入力のためのStatus.INVALID_ARGUMENT
も用意されています。
セキュリティ
gRPCではSSL/TLSを用いた認証や、Googleのトークンを使用した認証の仕組みを提供しています。
Spring Securityによる認可
Spring SecurityのPreAuthorize
のような機能が欲しくなるときもあると思います。
現状公式でそのような機能はサポートされていないですが、インターセプターを利用することで実装は可能です。
以下のエントリが参考になります。
@GRpcService
public class DemoGrpcService extends DemoServiceGrpc.DemoServiceImplBase {
@Override
@PreAuthorize("hasRole('USER')")
public void list(ListRequest request, StreamObserver<ListResponse> responseObserver) {
// ...
}
@Override
@PreAuthorize("hasRole('ADMIN')")
public void buy(BuyRequest request, StreamObserver<BuyResponse> responseObserver) {
// ...
}
}
また、grpc-spring-boot-starterのissueでも検討されています。
ただし、現状のgRPCの用途として、広く公開された環境というより、プラットフォーム内の通信として利用する場合も多いので、どこまで制御が必要かは検討したほうがいいと思います。
テスト
gRPCのテストを行う場合には、公式で提供されているライブラリを使用して実行します。
サーバーのテスト
//...
dependencies {
// ...
testCompile "io.grpc:grpc-testing:${grpcVersion}"
}
//...
class DemoServerTest {
// gRPCライブラリから提供されるRuleクラスを利用する
@Rule
public GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Test
public void test() {
grpcServerRule.getServiceRegistry().addService(new GreeterService());
GreeterGrpc.GreeterBlockingStub blockingStub = GreeterGrpc.newBlockingStub(grpcServerRule.getChannel());
String testName = "test name";
HelloReply reply = blockingStub.sayHello(HelloRequest.newBuilder().setName(testName).build());
assertEquals("Hello " + testName, reply.getMessage());
}
}
クライアントのテスト
class DemoClientTest {
// gRPCライブラリから提供されるRuleクラスを利用する
@Rule
public GrpcServerRule grpcServerRule = new GrpcServerRule().directExecutor();
@Test
public void test() {
GreeterGrpc.GreeterImplBase serviceImpl = Mockito.spy(new GreeterGrpc.GreeterImplBase() {}); // Server実装のモック化
grpcServerRule.getServiceRegistry().addService(serviceImpl);
ArgumentCaptor<HelloRequest> requestCaptor = ArgumentCaptor.forClass(HelloRequest.class);
String testName = "test name";
DemoClient client = new DemoClient(grpcServerRule.getChannel());;
client.hello(testName);
Mockito.verify(serviceImpl)
.sayHello(requestCaptor.capture(), Matchers.any()); // 実行内容のキャプチャ
assertEquals(testName, requestCaptor.getValue().getName());
}
}
.protoの管理
gRPCでProtocol Buffersを利用する上で必要なのが、.protoファイルの管理です。
この方法に関しては公式でサポートされた機能はないため、代表的な2種類を記載します。
専用のリポジトリ
メルカリなどで採用しているのが.protoの専用リポジトリを作成する方法です。(Protoファイルの管理)
この場合CIと連携させて、クライアント言語ごとのライブラリを生成しやすくなります。
また、そこまでしなくてもGitのsubmoduleなどを使用して、クライアント側に.protoファイルを取り込むのという手もあります。
サーバ側のリポジトリに同胞
専用リポジトリで管理は、サーバ側のコードと.protoファイルのリポジトリが分離しまいますが、実装を行うサーバ側に.protoを含めたほうが扱いやすいという場合も多いと思います。
その場合、protodepを利用することで、クライアント側で各リポジトリ内の.protoファイルを集約することができます。
これにより、サーバ側のアプリケーションで.protoファイルを管理し、クライアント側で必要となる.protoファイルを使用するという運用が可能です。
ブラウザ対応
gRPC-Web
gRPCをブラウザから利用するためには、gRPC-Webがあります。
現時点ではnginxやenvoyをproxyとして利用し、ブラウザとgRPCサーバの通信を実現します。
REST API
gRPCで構築したアプリケーションをREST API経由でアクセスしたいという場合も多いと思います。
この場合に便利なのが、grpc-gatewayです。
こちらは、少し長くなりそうなので、また別エントリとしてまとめようと思います。
まとめ
gRPCは複数アプリケーションを経由したシステムを構築する上で非常に有効な仕組みですが、実際に運用を行う上で不明点も多いと思いうのでまとめめした。
また、更に知見が溜まったらこちらのエントリもアップデートしていこうと思います。