7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Spring Cloud StreamにProtocol Buffersを対応させる

Last updated at Posted at 2018-05-25

Kobito.D5Bp80.png

なぜSpring Cloud StreamとProtocol Buffersなのか

すごく簡単に言えば

メッセージングを楽にしたい

からです。

最近のシステムは同期的なAPIコールの他に、スケーラビリティなどを考慮した非同期メッセージングを用いるケースも増えてきました。

ここで問題になるのが、複数システムを跨ぐため、そのスキーマをどのように管理するかです。

この場合REST API定義のように、共有可能で、シリアライズ形式に後方互換性あるものが望ましいです。

メッセージングによく使われるJSONのスキーマ定義としてメジャーなのがJSON Schemaですが、その定義は決して扱いやすいとはいいにくいものです。

そこで候補に上がるのがgRPCにも採用されているProtocol Buffersです。

Protocol BuffersはIDL(インタフェース定義言語)の為、JSON Schemaの用に、従来の仕様に縛られず、簡潔にそのスキーマを定義することができます。

ということで、Javaでよく使われるSpringのメッセージングのフレームワークであるSpring Cloud Stream上で、Protocol Buffersによるメッセージングを行ってみます。

基本的な仕組み

Spring Cloud Streamではデフォルトの変換フォーマットとして、Jsonがサポートされていますが、他のフォーマットにも対応するためにMessageConverterクラスが用意されいてます。

なので、やることは単純で

  1. Protocol BuffersのMessageConverter実装を作成
  2. 使用するアプリケーション内でBean登録
  3. application.yaml(propeties)のstreamのcontent-typeにProtocol BuffersのMIMEを指定
    の3点です。

実装方法

順に具体的な実装を行っていきます。

1. Protocol BuffersのMessageConverter実装を作成

サンプルなので細かなエラー処理などは省いていますが、以下のようなMessageConverterの実装クラスを用意します。

public class ProtobufMessageConverter extends AbstractMessageConverter {

	public ProtobufMessageConverter() {
	    // Protocol Buffers用のMIMEを用意
		super(new MimeType("application", "x-protobuf"));
	}

	@Override
	protected boolean supports(Class<?> clazz) {
	    // Protocol Buffersのメッセージに使用されるスーパークラスをサポートクラスに指定
		return AbstractMessageLite.class.isAssignableFrom(clazz);
	}

	@Override
	protected Object convertToInternal(Object payload, MessageHeaders headers, Object conversionHint) {
	    // Protocol Buffersの機能でバイト配列への変換
		return ((AbstractMessageLite) payload).toByteArray();
	}

	@Override
	@lombok.SneakyThrows
	protected Object convertFromInternal(Message<?> message, Class<?> targetClass, Object conversionHint) {
		Object payload = message.getPayload();

		// conversionHintのMethodParameterから、受け取り側の期待する定義へ変換
		if (conversionHint instanceof MethodParameter) {
			MethodParameter param = (MethodParameter) conversionHint;
			param = param.nestedIfOptional();
			if (Message.class.isAssignableFrom(param.getParameterType())) {
				param = param.nested();
			}
			Class<?> clazz = param.getNestedParameterType();
			Method parseFrom = clazz.getMethod("parseFrom", byte[].class);
			return parseFrom.invoke(null, payload);
		}
		return payload;
	}
}

convertFromInternalメソッドに関しては、メッセージを受け取る側の設計によるので他の方法はあると思いますが、今回は簡単な実装にしてます。

2. 使用するアプリケーション内でBean登録

作成したMessageConverterを使用する各アプリケーション内でBean定義します。

@Configuration
class Config {
	@Bean
	@StreamMessageConverter
	public MessageConverter messageConverter() {
		return new ProtobufMessageConverter();
	}
}

3. application.yaml(propeties)のstreamのcontent-typeにProtocol BuffersのMIMEを指定

使用するstream内のcontent-typeを認識させるために設定します。

送信側

application.yaml
spring:
  cloud:
    stream:
      bindings:
        output:
          destination: tasks
          content-type: application/x-protobuf # 作成したProtocol BuffersのMIMEを指定
          producer:
            partitionKeyExpression: "1"

受信側

application.yaml
spring:
  cloud:
    stream:
      bindings:
        input:
          destination: tasks
          content-type: application/x-protobuf # 作成したProtocol BuffersのMIMEを指定

以上でProtocol Buffers用の対応は完了です。

使い方

後は通常のSpring Cloud StreamとProtocol Buffersの使い方と同じになります。

スキーマ定義

task.proto
syntax = "proto3";

option java_multiple_files = true;
option java_package = "com.example.task";

message TaskCreated {
    string id = 1;
    string name = 2;
}

message TaskStarted {
    string id = 1;
}

message TaskDone {
    string id = 1;
}

送信処理

@EnableBinding(Source.class)
@AllArgsConstructor
class Publisher {

	Source source;

	void create() {
		TaskCreated task = // ...
		Message<TaskCreated> message = new GenericMessage<>(task);
		source.output().send(message);
	}

	void start() {
		TaskStarted task = // ...
		Message<TaskStarted> message = new GenericMessage<>(task);
		source.output().send(message);
	}

	void done() {
		TaskDone task = // ...
		Message<TaskDone> message = new GenericMessage<>(task);
		source.output().send(message);
	}
}

受信処理

@EnableBinding(Sink.class)
class Subscriber {

	@StreamListener(Sink.INPUT)
	void handle(TaskCreated message) {
		System.out.println(message);
	}

	@StreamListener(Sink.INPUT)
	void handle(TaskStarted message) {
		System.out.println(message);
	}

	@StreamListener(Sink.INPUT)
	void handle(TaskDone message) {
		System.out.println(message);
	}
}

出力

> id: "xxx-1"
name: "Task 1"

> id: "xxx-1"

> id: "xxx-1"

送信側はProtocol BuffersのクラスをMessageとして送信、受信側は@StreamListenerの使用して、受け取りたいクラスを指定することで、メッセージングの実装の仕組みを意識することなく使用できます。

Spring Cloud StreamはProtocol Buffersに限らず、MessageConverterによって任意のフォーマットを使用することができるので、必要に応じアプリケーションのコードに依存しないメッセージングが可能になっていて便利ですね!

ソースコード

今回使用したソースコードは以下にあります。

7
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?