LoginSignup
3
3

More than 1 year has passed since last update.

gRPC(Java)でヘッダを使って情報を連携する方法

Last updated at Posted at 2021-10-11

諸事情でgRPCのヘッダを使って任意の値を連携したいので、連携方法を調べてみました。

結論!?

ClientInterceptorServerInterceptorを利用すれば実現できる!けどサーバ側はちょっと一工夫する必要がありそう。

やりたいことは・・・

ざっくりだが・・やりたいことを絵にしてみると・・・こんな感じ。

image.png

Spring Bootで検証APを作ってみる

変更履歴

2021/10/14:
ヘッダ情報をスレッドローカル変数で保持する実装はやめ、リクエストオブジェクトをキーにキャッシュするような実装に変更。理由は、ServerCall.Listenerの全てのメソッド(コールバックメソッド)が同一スレッドで実行される保証がない模様のため。

検証コード

GrpcServer

サーバ機能の簡易実装クラス。

package com.example.demo;

import io.grpc.BindableService;
import io.grpc.Server;
import io.grpc.ServerBuilder;
import io.grpc.ServerInterceptor;
import io.grpc.ServerInterceptors;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

public class GrpcServer {

    private static final int DEFAULT_PORT = 50051;

    private final BindableService[] services;

    private ServerInterceptor[] serverInterceptors;

    private Server server;

    public GrpcServer(BindableService... services) {
        this.services = services;
    }

    public GrpcServer withInterceptors(ServerInterceptor... serverInterceptors) {
        this.serverInterceptors = serverInterceptors;
        return this;
    }

    public void start() {
        ServerBuilder<?> builder = ServerBuilder.forPort(DEFAULT_PORT);
        Stream.of(services)
                .forEach(service -> builder.addService(
                        ServerInterceptors.intercept(service, serverInterceptors))); // サーバ向けのインタセプタを適用したサービスを追加
        try {
            server = builder.build().start();
        }
        catch (IOException e) {
            throw new UncheckedIOException(e);
        }
        Runtime.getRuntime().addShutdownHook(new Thread(GrpcServer.this::stop));
    }

    public void stop() {
        if (server != null) {
            try {
                server.shutdown().awaitTermination(30, TimeUnit.SECONDS);
            }
            catch (InterruptedException e) {
                e.printStackTrace(System.err);
            }
            server = null;
        }
    }

}

サーバAP(gRPCサービス)の実装

まずは簡単なprotoファイルを作る。

syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.demo.proto";

package com.example.demo.proto;

service Transaction {
  rpc Create (TransactionRequest) returns (TransactionReply) {}
  rpc Refer (TransactionReferRequest) returns (TransactionReferReply) {}
}

message TransactionRequest {
  string name = 1;
  string vendor = 2;
  int64 amount = 3;
}

message TransactionReply {
  string id = 1;
}

message TransactionReferRequest {
  string id = 1;
}
message TransactionReferReply {
  enum Status {
    ACCEPT = 0;
    COMPLETED = 1;
    REJECTED = 2;
  }
  string id = 1;
  string name = 2;
  string vendor = 3;
  int64 amount = 4;
  Status status = 5;
}

次にprotoからjavaコードを自動生成して、自動生成されたクラスを継承するサーバAPの実装を行う。

package com.example.demo;

import com.example.demo.proto.TransactionGrpc;
import com.example.demo.proto.TransactionReferReply;
import com.example.demo.proto.TransactionReferRequest;
import com.example.demo.proto.TransactionReply;
import com.example.demo.proto.TransactionRequest;
import io.grpc.Metadata;
import io.grpc.stub.StreamObserver;

import java.util.UUID;

public class TransactionStub extends TransactionGrpc.TransactionImplBase {

    private final MetadataHolder metadataHolder;

    public TransactionStub(MetadataHolder metadataHolder) {
        // リクエストヘッダを受け取る魔法の器をインジェクションしておく
        this.metadataHolder = metadataHolder;
    }

    @Override
    public void create(TransactionRequest request, StreamObserver<TransactionReply> responseObserver) {
        // 魔法の器経由でリクエストヘッダの値を取得
        String envId = metadataHolder.get(request).get(Metadata.Key.of("envId", Metadata.ASCII_STRING_MARSHALLER));
        TransactionReply reply = TransactionReply.newBuilder().setId(envId + ":" + UUID.randomUUID()).build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

    @Override
    public void refer(TransactionReferRequest request, StreamObserver<TransactionReferReply> responseObserver) {
        // 魔法の器経由でリクエストヘッダの値を取得
        String envId = metadataHolder.get(request).get(Metadata.Key.of("envId", Metadata.ASCII_STRING_MARSHALLER));
        TransactionReferReply reply = TransactionReferReply.newBuilder()
                .setId(request.getId())
                .setName(envId + ":" + "Name").setAmount(100)
                .setStatus(TransactionReferReply.Status.ACCEPT)
                .build();
        responseObserver.onNext(reply);
        responseObserver.onCompleted();
    }

}

Bean定義とクライアントAPの実装

サーバ&クライアントのBean定義とクライアントAPの実装を行う。

package com.example.demo;

import com.example.demo.proto.TransactionGrpc;
import com.example.demo.proto.TransactionReferReply;
import com.example.demo.proto.TransactionReferRequest;
import com.example.demo.proto.TransactionReply;
import com.example.demo.proto.TransactionRequest;
import io.grpc.ClientInterceptor;
import io.grpc.ForwardingServerCallListener;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.stub.MetadataUtils;
import org.springframework.boot.ApplicationRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.env.Environment;

import java.util.UUID;

@SpringBootApplication
public class GrpcInterceptorDemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(GrpcInterceptorDemoApplication.class, args);
    }

    // =================================
    // gRPCサーバ関連のBean定義
    // =================================

    // リクエストヘッダの情報をAP層へ連携するための魔法の器の定義
    @Bean
    public MetadataHolder metadataHolder() {
        return new MetadataHolder();
    }

    @Bean
    ServerInterceptor serverHeadersInterceptor(MetadataHolder metadataHolder) {
        return new ServerInterceptor() {
            @Override
            public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
                    Metadata headers, ServerCallHandler<ReqT, RespT> next) {
                return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(
                        next.startCall(call, headers)) {

                    @Override public void onMessage(ReqT message) {
                        metadataHolder.save(message, headers); // リクエストされたヘッダ情報を保存
                        super.onMessage(message);
                    }

                    @Override
                    public void onComplete() {
                        runWithClearHolder(super::onComplete); // サーバ処理の終了時にお掃除
                    }

                    @Override
                    public void onCancel() {
                        runWithClearHolder(super::onCancel); // サーバ処理の終了時にお掃除
                    }

                    private void runWithClearHolder(Runnable runnable) {
                        try {
                            runnable.run();
                        }
                        finally {
                            metadataHolder.clear(headers);
                        }
                    }
                };
            }
        };
    }

    // gRPCサーバの定義
    @Bean(initMethod = "start", destroyMethod = "stop")
    public GrpcServer grpcServer(ServerInterceptor serverHeadersInterceptor,
            MetadataHolder metadataHolder) {
        return new GrpcServer(new TransactionStub(metadataHolder))
                .withInterceptors(serverHeadersInterceptor); // サーバ向けのインタセプタを設定
    }

    // =================================
    // gRPCクライアント関連のBean定義
    // =================================

    // gRPCサーバへ接続するためのチャネルを定義
    @Bean
    ManagedChannel grpcManagedChannel() {
        return ManagedChannelBuilder
                .forAddress("localhost", 50051)
                .usePlaintext()
                .build();
    }

    // リクエストヘッダに任意の値を設定するインタセプタの定義
    @Bean
    ClientInterceptor clientHeadersInterceptor(Environment environment) {
        // 任意のヘッダ値を設定してくれるインタセプタを定義
        Metadata headers = new Metadata();
        headers.put(Metadata.Key.of("envId", Metadata.ASCII_STRING_MARSHALLER),
                environment.getProperty("envId", "dev"));
        return MetadataUtils.newAttachHeadersInterceptor(headers);
    }

    // クライアントAP側の実装
    @Bean
    public ApplicationRunner clientRunner(ManagedChannel managedChannel, ClientInterceptor clientHeadersInterceptor) {
        return args -> {
            TransactionGrpc.TransactionBlockingStub blockingStub =
                    TransactionGrpc.newBlockingStub(managedChannel)
                            .withInterceptors(clientHeadersInterceptor); // クライアント向けのインタセプタを設定
            {
                TransactionRequest request = TransactionRequest.newBuilder()
                        .setName("Name")
                        .setAmount(100)
                        .setVendor("TestPay")
                        .build();
                TransactionReply reply = blockingStub.create(request);
                System.out.println(reply);
            }
            {
                TransactionReferRequest request = TransactionReferRequest.newBuilder()
                        .setId(UUID.randomUUID().toString())
                        .build();
                TransactionReferReply reply = blockingStub.refer(request);
                System.out.println(reply);
            }
        };
    }

}

実行してみる

実行すると以下のような情報がコンソール(標準出力)へ出力される。

  .   ____          _            __ _ _
 /\\ / ___'_ __ _ _(_)_ __  __ _ \ \ \ \
( ( )\___ | '_ | '_| | '_ \/ _` | \ \ \ \
 \\/  ___)| |_)| | | | | || (_| |  ) ) ) )
  '  |____| .__|_| |_|_| |_\__, | / / / /
 =========|_|==============|___/=/_/_/_/
 :: Spring Boot ::                (v2.5.5)

2021-10-12 02:18:13.747  INFO 95389 --- [           main] c.e.demo.GrpcInterceptorDemoApplication  : Starting GrpcInterceptorDemoApplication using Java 1.8.0_252 on yyyyy with PID 95389 (/Users/xxx/git-me/grpc-interceptor-demo/target/classes started by shimizukazuki in /Users/xxx/git-me/grpc-interceptor-demo)
2021-10-12 02:18:13.750  INFO 95389 --- [           main] c.e.demo.GrpcInterceptorDemoApplication  : No active profile set, falling back to default profiles: default
2021-10-12 02:18:14.546  INFO 95389 --- [           main] c.e.demo.GrpcInterceptorDemoApplication  : Started GrpcInterceptorDemoApplication in 1.145 seconds (JVM running for 1.853)
id: "dev:ccebe320-5bce-4556-99ac-e7c15ec997b6"

id: "11c23d5d-cd7a-4c06-9360-f2b10267cf89"
name: "dev:Name"
amount: 100

 まとめ

もっとスマートな方法があるのかもしれませんが、私がさっと調べた感じだとスマートな方法を見つけることができませんでした・・・魔法の器はあまり使いたくないのですが、とりあえず実現したいことはできることはわかったのでヨシとしよう :sweat_smile:

参考サイト

3
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
3