諸事情でgRPCのヘッダを使って任意の値を連携したいので、連携方法を調べてみました。
結論!?
ClientInterceptor
とServerInterceptor
を利用すれば実現できる!けどサーバ側はちょっと一工夫する必要がありそう。
やりたいことは・・・
ざっくりだが・・やりたいことを絵にしてみると・・・こんな感じ。
Spring Bootで検証APを作ってみる
変更履歴
2021/10/14:
ヘッダ情報をスレッドローカル変数で保持する実装はやめ、リクエストオブジェクトをキーにキャッシュするような実装に変更。理由は、ServerCall.Listener
の全てのメソッド(コールバックメソッド)が同一スレッドで実行される保証がない模様のため。
検証コード
GrpcServer
サーバ機能の簡易実装クラス。
package com.example.demo;
import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
public class GrpcServer {
private static final int DEFAULT_PORT = 50051;
private final BindableService[] services;
private ServerInterceptor[] serverInterceptors;
private Server server;
public GrpcServer(BindableService... services) {
this.services = services;
}
public GrpcServer withInterceptors(ServerInterceptor... serverInterceptors) {
this.serverInterceptors = serverInterceptors;
return this;
}
public void start() {
ServerBuilder<?> builder = ServerBuilder.forPort(DEFAULT_PORT);
Stream.of(services)
.forEach(service -> builder.addService(
ServerInterceptors.intercept(service, serverInterceptors))); // サーバ向けのインタセプタを適用したサービスを追加
try {
server = builder.build().start();
}
catch (IOException e) {
throw new UncheckedIOException(e);
}
Runtime.getRuntime().addShutdownHook(new Thread(GrpcServer.this::stop));
}
public void stop() {
if (server != null) {
try {
server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
}
catch (InterruptedException e) {
e.printStackTrace(System.err);
}
server = null;
}
}
}
サーバAP(gRPCサービス)の実装
まずは簡単なprotoファイルを作る。
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.example.demo.proto";
package com.example.demo.proto;
service Transaction {
rpc Create (TransactionRequest) returns (TransactionReply) {}
rpc Refer (TransactionReferRequest) returns (TransactionReferReply) {}
}
message TransactionRequest {
string name = 1;
string vendor = 2;
int64 amount = 3;
}
message TransactionReply {
string id = 1;
}
message TransactionReferRequest {
string id = 1;
}
message TransactionReferReply {
enum Status {
ACCEPT = 0;
COMPLETED = 1;
REJECTED = 2;
}
string id = 1;
string name = 2;
string vendor = 3;
int64 amount = 4;
Status status = 5;
}
次にprotoからjavaコードを自動生成して、自動生成されたクラスを継承するサーバAPの実装を行う。
package com.example.demo;
import com.example.demo.proto.TransactionGrpc;
import com.example.demo.proto.TransactionReferReply;
import com.example.demo.proto.TransactionReferRequest;
import com.example.demo.proto.TransactionReply;
import com.example.demo.proto.TransactionRequest;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;
import java.util.UUID;
public class TransactionStub extends TransactionGrpc.TransactionImplBase {
private final MetadataHolder metadataHolder;
public TransactionStub(MetadataHolder metadataHolder) {
// リクエストヘッダを受け取る魔法の器をインジェクションしておく
this.metadataHolder = metadataHolder;
}
@Override
public void create(TransactionRequest request, StreamObserver<TransactionReply> responseObserver) {
// 魔法の器経由でリクエストヘッダの値を取得
String envId = metadataHolder.get(request).get(Metadata.Key.of("envId", Metadata.ASCII_STRING_MARSHALLER));
TransactionReply reply = TransactionReply.newBuilder().setId(envId + ":" + UUID.randomUUID()).build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
@Override
public void refer(TransactionReferRequest request, StreamObserver<TransactionReferReply> responseObserver) {
// 魔法の器経由でリクエストヘッダの値を取得
String envId = metadataHolder.get(request).get(Metadata.Key.of("envId", Metadata.ASCII_STRING_MARSHALLER));
TransactionReferReply reply = TransactionReferReply.newBuilder()
.setId(request.getId())
.setName(envId + ":" + "Name").setAmount(100)
.setStatus(TransactionReferReply.Status.ACCEPT)
.build();
responseObserver.onNext(reply);
responseObserver.onCompleted();
}
}
Bean定義とクライアントAPの実装
サーバ&クライアントのBean定義とクライアントAPの実装を行う。
package com.example.demo;
import com.example.demo.proto.TransactionGrpc;
import com.example.demo.proto.TransactionReferReply;
import com.example.demo.proto.TransactionReferRequest;
import com.example.demo.proto.TransactionReply;
import com.example.demo.proto.TransactionRequest;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingServerCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.MetadataUtils;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;
import java.util.UUID;
@SpringBootApplication
public class GrpcInterceptorDemoApplication {
public static void main(String[] args) {
SpringApplication.run(GrpcInterceptorDemoApplication.class, args);
}
// =================================
// gRPCサーバ関連のBean定義
// =================================
// リクエストヘッダの情報をAP層へ連携するための魔法の器の定義
@Bean
public MetadataHolder metadataHolder() {
return new MetadataHolder();
}
@Bean
ServerInterceptor serverHeadersInterceptor(MetadataHolder metadataHolder) {
return new ServerInterceptor() {
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers, ServerCallHandler<ReqT, RespT> next) {
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
next.startCall(call, headers)) {
@Override public void onMessage(ReqT message) {
metadataHolder.save(message, headers); // リクエストされたヘッダ情報を保存
super.onMessage(message);
}
@Override
public void onComplete() {
runWithClearHolder(super::onComplete); // サーバ処理の終了時にお掃除
}
@Override
public void onCancel() {
runWithClearHolder(super::onCancel); // サーバ処理の終了時にお掃除
}
private void runWithClearHolder(Runnable runnable) {
try {
runnable.run();
}
finally {
metadataHolder.clear(headers);
}
}
};
}
};
}
// gRPCサーバの定義
@Bean(initMethod = "start", destroyMethod = "stop")
public GrpcServer grpcServer(ServerInterceptor serverHeadersInterceptor,
MetadataHolder metadataHolder) {
return new GrpcServer(new TransactionStub(metadataHolder))
.withInterceptors(serverHeadersInterceptor); // サーバ向けのインタセプタを設定
}
// =================================
// gRPCクライアント関連のBean定義
// =================================
// gRPCサーバへ接続するためのチャネルを定義
@Bean
ManagedChannel grpcManagedChannel() {
return ManagedChannelBuilder
.forAddress("localhost", 50051)
.usePlaintext()
.build();
}
// リクエストヘッダに任意の値を設定するインタセプタの定義
@Bean
ClientInterceptor clientHeadersInterceptor(Environment environment) {
// 任意のヘッダ値を設定してくれるインタセプタを定義
Metadata headers = new Metadata();
headers.put(Metadata.Key.of("envId", Metadata.ASCII_STRING_MARSHALLER),
environment.getProperty("envId", "dev"));
return MetadataUtils.newAttachHeadersInterceptor(headers);
}
// クライアントAP側の実装
@Bean
public ApplicationRunner clientRunner(ManagedChannel managedChannel, ClientInterceptor clientHeadersInterceptor) {
return args -> {
TransactionGrpc.TransactionBlockingStub blockingStub =
TransactionGrpc.newBlockingStub(managedChannel)
.withInterceptors(clientHeadersInterceptor); // クライアント向けのインタセプタを設定
{
TransactionRequest request = TransactionRequest.newBuilder()
.setName("Name")
.setAmount(100)
.setVendor("TestPay")
.build();
TransactionReply reply = blockingStub.create(request);
System.out.println(reply);
}
{
TransactionReferRequest request = TransactionReferRequest.newBuilder()
.setId(UUID.randomUUID().toString())
.build();
TransactionReferReply reply = blockingStub.refer(request);
System.out.println(reply);
}
};
}
}
実行してみる
実行すると以下のような情報がコンソール(標準出力)へ出力される。
. ____ _ __ _ _
/\\ / ___'_ __ _ _(_)_ __ __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
\\/ ___)| |_)| | | | | || (_| | ) ) ) )
' |____| .__|_| |_|_| |_\__, | / / / /
=========|_|==============|___/=/_/_/_/
:: Spring Boot :: (v2.5.5)
2021-10-12 02:18:13.747 INFO 95389 --- [ main] c.e.demo.GrpcInterceptorDemoApplication : Starting GrpcInterceptorDemoApplication using Java 1.8.0_252 on yyyyy with PID 95389 (/Users/xxx/git-me/grpc-interceptor-demo/target/classes started by shimizukazuki in /Users/xxx/git-me/grpc-interceptor-demo)
2021-10-12 02:18:13.750 INFO 95389 --- [ main] c.e.demo.GrpcInterceptorDemoApplication : No active profile set, falling back to default profiles: default
2021-10-12 02:18:14.546 INFO 95389 --- [ main] c.e.demo.GrpcInterceptorDemoApplication : Started GrpcInterceptorDemoApplication in 1.145 seconds (JVM running for 1.853)
id: "dev:ccebe320-5bce-4556-99ac-e7c15ec997b6"
id: "11c23d5d-cd7a-4c06-9360-f2b10267cf89"
name: "dev:Name"
amount: 100
まとめ
もっとスマートな方法があるのかもしれませんが、私がさっと調べた感じだとスマートな方法を見つけることができませんでした・・・魔法の器はあまり使いたくないのですが、とりあえず実現したいことはできることはわかったのでヨシとしよう