はじめに
こんにちは!本記事では、Micronautフレームワークを使用したJavaアプリケーションで、gRPCチャンネルを動的に生成する方法について解説します。
この記事の対象者
- Micronautを使ったマイクロサービス開発をしている方
この記事で学べること
- gRPCチャンネルを動的に生成する方法
注意事項
- テスト環境での動作を確認したものであり、本番ワークロードで実用に耐えうるかは未検証です。
開発環境
- Java: 17
- Micronaut: 3.7.3
課題と解決したいこと
課題
user-apiというgrpcチャンネルを想定します。
1つの場合は、gRPCチャンネルの設定をapplication.yml
に記述することで、 @GrpcChannel
アノテーションでInjectすることができます。
grpc:
channels:
user-api:
target: "user-api.example.com:50051"
plaintext: true
今回APIを複数台にして負荷分散することとなりました。
インフラの増設はできるだけ控えたいという背景があり、gRPCチャンネルの振り分けをアプリケーション上で行う必要が出てきました。
これにあたって、gRPCチャンネルの設定を application.yml
に記述する通常の方法では、以下の問題がありました。
- application.ymlの肥大化: API数ぶんchannelsに設定を追加する必要がある
-
Javaコードの肥大化:
@GrpcChannel
によるInjectをAPI数ぶん書く必要がある。 -
チャンネルのターゲットのみを変更したgRPCチャンネルインスタンスを作成できない
- application.ymlに共通設定だけ書く方法が使えない
grpc:
channels:
user-api-1:
target: "user-api-1.example.com:50051"
plaintext: true
user-api-2:
target: "user-api-2.example.com:50051"
plaintext: true
公式には動的に生成する方法が紹介されていなかったので、どうにか出来ないか調査したところ、
Javaで設定を生成する方法で解決できたと思われるので、以下で紹介します。
実装アプローチ
設計方針
- アプリケーションの起動時にgRPCチャンネル設定を動的に生成する。
- GrpcChannelConfigurer
- gRPCチャンネルインスタンスをInjectionを用いずに、動的チャンネルファクトリから取得する。
- GrpcDynamicManagedChannelFactory
- スタブをキャッシュする
- UserApiStubFactory
実装詳細
1. アプリケーションの起動時にgRPCチャンネル設定を動的に生成
@ContextConfigurer
を付与することで、アプリケーションコンテキストを上書きできるようです。
https://docs.micronaut.io/latest/guide/#defaultEnvironment
これにより、 application.yml
の grpc.channels
を上書きします。
@ContextConfigurer
public class GrpcChannelConfigurer implements ApplicationContextConfigurer {
@Override
public void configure(ApplicationContextBuilder builder) {
// 環境変数からAPI数を取得
var apiCount = Integer.parseInt(System.getenv("API-COUNT"));
Map<String, Object> overrideProperties = new HashMap<>();
var channelNameBase = "user-api-";
var targetPlaceHolder = "user-api-%s.example.com:80";
for (int i = 1; i <= apiCount; i++) {
var channelConfig = new HashMap<String, Object>();
// application.yml で設定していたgrpcチャンネルのプロパティを設定
var target = targetPlaceHolder.formatted(i);
channelConfig.put("target", target);
channelConfig.put("plaintext", true);
// application.ymlで指定していたproperty名と同値
var propertyName = "grpc.channels." + channelNameBase + i;
overrideProperties.put(propertyName, channelConfig);
}
// 生成した設定をMicronautのプロパティソースに追加
var propertySource = new MapPropertySource("dynamic-grpc-channels", overrideProperties);
builder.propertySources(propertySource);
}
}
2. 動的チャンネルファクトリの実装
チャンネルの動的生成を行うファクトリクラスです。
io.micronaut.grpc.channels
の GrpcManagedChannelFactory.class
の managedChannel
メソッドのみを変更したものです。
krickert氏がgithub issuesに投稿していたコード: Having trouble getting dynamic service clients from Micronaut をベースにしています。
@Singleton
public class GrpcDynamicManagedChannelFactory implements AutoCloseable {
private final ApplicationContext beanContext;
private final Map<ChannelKey, ManagedChannel> channels = new ConcurrentHashMap<>();
@Inject
public GrpcDynamicManagedChannelFactory(ApplicationContext beanContext) {
this.beanContext = beanContext;
}
/**
* チャンネル名からManagedChannelを生成
*/
public ManagedChannel managedChannel(String channelName) {
Argument<String> argument = Argument.STRING;
return this.channels.computeIfAbsent(new ChannelKey(argument, channelName), channelKey -> {
// NettyChannelBuilderを動的に作成
final NettyChannelBuilder nettyChannelBuilder =
this.beanContext.createBean(NettyChannelBuilder.class, channelName);
ManagedChannel channel = nettyChannelBuilder.build();
LOG.debug("Created new gRPC channel: {}", channelName);
return channel;
});
}
@PreDestroy
public void close() {
LOG.info("Shutting down {} gRPC channels", channels.size());
channels.values().forEach(channel -> {
if (!channel.isShutdown()) {
try {
channel.shutdown().awaitTermination(1L, TimeUnit.SECONDS);
} catch (Exception e) {
Thread.currentThread().interrupt();
LOG.warn("Error shutting down GRPC channel: {}", e.getMessage(), e);
}
}
});
channels.clear();
}
/**
* チャンネルキーを表すクラス
*/
private static final class ChannelKey {
final Argument<?> identifier;
final String value;
public ChannelKey(Argument<?> identifier, String value) {
this.identifier = identifier;
this.value = value;
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
ChannelKey clientKey = (ChannelKey) o;
return Objects.equals(identifier, clientKey.identifier) &&
Objects.equals(value, clientKey.value);
}
@Override
public int hashCode() {
return Objects.hash(identifier, value);
}
}
}
3. スタブファクトリの実装
@Singleton
public class UserApiStubFactory {
@Inject
private GrpcDynamicManagedChannelFactory grpcDynamicManagedChannelFactory;
/**
* スタブのキャッシュ(API番号でキーを生成)
*/
private final Map<String, UserApiServiceGrpc.UserApiServiceFutureStub> stubCache = new ConcurrentHashMap<>();
/**
* スタブを取得(キャッシュから取得、存在しない場合は新規作成)
*/
public UserApiServiceGrpc.UserApiServiceFutureStub futureStub(int apiNumber) {
return stubCache.computeIfAbsent(getCacheKey(apiNumber),
key -> this.createFutureStub(key));
}
/**
* キャッシュキーを生成
*/
private String getCacheKey(int apiNumber) {
return apiNumber.toString();
}
/**
* スタブを作成
*/
private UserApiServiceGrpc.UserApiServiceFutureStub createFutureStub(String cacheKey) {
// チャンネルの動的生成
var channelName = "user-api-" + cacheKey;
var channel = grpcDynamicManagedChannelFactory.managedChannel(channelName);
return UserApiServiceGrpc.newFutureStub(channel);
}
}
クライアント実装例:
@Singleton
public class UserApiClientImpl implements UserApiClient {
@Inject
private UserApiStubFactory userApiStubFactory;
@Override
public Mono<GetUserReply> getUserAsync(@NotNull GetUserRequest request) {
// ファクトリから動的にスタブを取得
var futureStub = userApiStubFactory.futureStub(request.getApiNumber());
...
}
}
おわりに
今回実装した動的gRPCチャンネル生成により、アプリケーション側でのgRPCターゲット振り分けを実現する事ができました。
この実装パターンが、同様の課題を抱える方の参考になれば幸いです!