なぜSpring Cloud StreamとProtocol Buffersなのか
すごく簡単に言えば
メッセージングを楽にしたい
からです。
最近のシステムは同期的なAPIコールの他に、スケーラビリティなどを考慮した非同期メッセージングを用いるケースも増えてきました。
ここで問題になるのが、複数システムを跨ぐため、そのスキーマをどのように管理するかです。
この場合REST API定義のように、共有可能で、シリアライズ形式に後方互換性あるものが望ましいです。
メッセージングによく使われるJSONのスキーマ定義としてメジャーなのがJSON Schemaですが、その定義は決して扱いやすいとはいいにくいものです。
そこで候補に上がるのがgRPCにも採用されているProtocol Buffersです。
Protocol BuffersはIDL(インタフェース定義言語)の為、JSON Schemaの用に、従来の仕様に縛られず、簡潔にそのスキーマを定義することができます。
ということで、Javaでよく使われるSpringのメッセージングのフレームワークであるSpring Cloud Stream上で、Protocol Buffersによるメッセージングを行ってみます。
基本的な仕組み
Spring Cloud Streamではデフォルトの変換フォーマットとして、Jsonがサポートされていますが、他のフォーマットにも対応するためにMessageConverter
クラスが用意されいてます。
なので、やることは単純で
- Protocol BuffersのMessageConverter実装を作成
- 使用するアプリケーション内でBean登録
- application.yaml(propeties)のstreamのcontent-typeにProtocol BuffersのMIMEを指定
の3点です。
実装方法
順に具体的な実装を行っていきます。
1. Protocol BuffersのMessageConverter実装を作成
サンプルなので細かなエラー処理などは省いていますが、以下のようなMessageConverterの実装クラスを用意します。
public class ProtobufMessageConverter extends AbstractMessageConverter {
public ProtobufMessageConverter() {
// Protocol Buffers用のMIMEを用意
super(new MimeType("application", "x-protobuf"));
}
@Override
protected boolean supports(Class<?> clazz) {
// Protocol Buffersのメッセージに使用されるスーパークラスをサポートクラスに指定
return AbstractMessageLite.class.isAssignableFrom(clazz);
}
@Override
protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
// Protocol Buffersの機能でバイト配列への変換
return ((AbstractMessageLite) payload).toByteArray();
}
@Override
@lombok.SneakyThrows
protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
Object payload = message.getPayload();
// conversionHintのMethodParameterから、受け取り側の期待する定義へ変換
if (conversionHint instanceof MethodParameter) {
MethodParameter param = (MethodParameter) conversionHint;
param = param.nestedIfOptional();
if (Message.class.isAssignableFrom(param.getParameterType())) {
param = param.nested();
}
Class<?> clazz = param.getNestedParameterType();
Method parseFrom = clazz.getMethod("parseFrom", byte[].class);
return parseFrom.invoke(null, payload);
}
return payload;
}
}
convertFromInternalメソッドに関しては、メッセージを受け取る側の設計によるので他の方法はあると思いますが、今回は簡単な実装にしてます。
2. 使用するアプリケーション内でBean登録
作成したMessageConverterを使用する各アプリケーション内でBean定義します。
@Configuration
class Config {
@Bean
@StreamMessageConverter
public MessageConverter messageConverter() {
return new ProtobufMessageConverter();
}
}
3. application.yaml(propeties)のstreamのcontent-typeにProtocol BuffersのMIMEを指定
使用するstream内のcontent-typeを認識させるために設定します。
送信側
spring:
cloud:
stream:
bindings:
output:
destination: tasks
content-type: application/x-protobuf # 作成したProtocol BuffersのMIMEを指定
producer:
partitionKeyExpression: "1"
受信側
spring:
cloud:
stream:
bindings:
input:
destination: tasks
content-type: application/x-protobuf # 作成したProtocol BuffersのMIMEを指定
以上でProtocol Buffers用の対応は完了です。
使い方
後は通常のSpring Cloud StreamとProtocol Buffersの使い方と同じになります。
スキーマ定義
syntax = "proto3";
option java_multiple_files = true;
option java_package = "com.example.task";
message TaskCreated {
string id = 1;
string name = 2;
}
message TaskStarted {
string id = 1;
}
message TaskDone {
string id = 1;
}
送信処理
@EnableBinding(Source.class)
@AllArgsConstructor
class Publisher {
Source source;
void create() {
TaskCreated task = // ...
Message<TaskCreated> message = new GenericMessage<>(task);
source.output().send(message);
}
void start() {
TaskStarted task = // ...
Message<TaskStarted> message = new GenericMessage<>(task);
source.output().send(message);
}
void done() {
TaskDone task = // ...
Message<TaskDone> message = new GenericMessage<>(task);
source.output().send(message);
}
}
受信処理
@EnableBinding(Sink.class)
class Subscriber {
@StreamListener(Sink.INPUT)
void handle(TaskCreated message) {
System.out.println(message);
}
@StreamListener(Sink.INPUT)
void handle(TaskStarted message) {
System.out.println(message);
}
@StreamListener(Sink.INPUT)
void handle(TaskDone message) {
System.out.println(message);
}
}
出力
> id: "xxx-1"
name: "Task 1"
> id: "xxx-1"
> id: "xxx-1"
送信側はProtocol BuffersのクラスをMessageとして送信、受信側は@StreamListenerの使用して、受け取りたいクラスを指定することで、メッセージングの実装の仕組みを意識することなく使用できます。
Spring Cloud StreamはProtocol Buffersに限らず、MessageConverterによって任意のフォーマットを使用することができるので、必要に応じアプリケーションのコードに依存しないメッセージングが可能になっていて便利ですね!
ソースコード
今回使用したソースコードは以下にあります。