Spring Cloud StreamはSpring Cloudファミリーのフレームワークで、イベント駆動型のマイクロサービスやETLパイプラインの一部となるメッセージングアプリを構築するために利用します。
Spring Cloud Streamを利用すると非常に簡単にメッセージングアプリを構築できますが、複数のSpringプロジェクトを統合して構築された非常に複雑なフレームワークで、理解して使うのはちょっと難しいです。
この記事では、Spring Cloud Stream v3に準拠して、フレームワークの構成を簡単に解説します。
バージョン
- Spring Boot v2.7.3
- Spring Cloud v2021.0.4
- Spring Cloud Stream v3.2.5
この記事では、汎用的なメッセージングに適したRabbitMQバインダを利用します。
Spring Cloud Streamとは
v3ではjava.util.function
に準拠しており、関数をBeanを定義するだけで簡単にメッセージを処理するハンドラを実装できます。ハンドラ以外のほとんどすべては、フレームワークで隠蔽されます。
java.util.function.Supplier
:送信(Source, Producer)
@Bean
public Supplier<Payload> supply() {
return () -> payload;
}
メッセージペイロードとなるオブジェクトを返却します。
java.util.function.Function
:送信&受信(Processor)
@Bean
public Function<InPayload, OutPayload> func() {
return inPayload -> outPayload;
}
受信メッセージのペイロードを受け取り、送信メッセージのペイロードを返却します。
java.util.function.Consumer
:受信(Sink, Consumer)
@Bean
public Consumer<Payload> consume() {
return payload -> xxx;
}
受信メッセージのペイロードを受け取り、ロジックを実行します。
実装のポイント
-
java.util.function
のインターフェイスは@FunctionalInterface
で、ラムダ式で関数を実装することができます。 - ジェネリクス型にはペイロード(任意のクラス)と、
Message<Payload>
型を指定できます。ヘッダなどを制御したいときはMessage<Payload>
を利用します。
フレームワークの構成
Spring Cloud Streamは大きくSupplierとそれ以外で異なる構成を持っています。
それぞれを簡単な図にして解説します。(簡略化しており、一部正確でない部分があります。)
Supplier
Supplierは以下のようなコンポーネントで構成されます。
Element | Framework | Description |
---|---|---|
Supplier | Spring Cloud Function | 送信するメッセージ(またはペイロード)を生成します。 |
MessageConverter | Spring Messaging | メッセージをシリアライズ/デシリアライズします。 |
IntegrationFlow | Spring Integration | Supplierをポーリングし、生成されたメッセージをBindingChannelに伝達します。 |
Binding Channel | Spring Cloud Stream / Spring Integration | destination(宛先)の情報を保持し、メッセージ送信のためMessageChannelBinderを呼び出します。 |
MessageChannelBinder | Spring AMQP(RabbitMQ) | MQとの通信方法を決定します。 |
RabbitTemplate | Spring AMQP(RabbitMQ) | RabbitMQにメッセージを送信します。 |
RetryTemplate | Spring Retry | RabbitTemplateでのメッセージ送信をリトライします。 |
Default ErrorChannel | Spring Cloud Stream / Spring Integration | アプリケーションに1つだけ定義でき、Consumerで発生したエラーを処理します。 |
Function,Consumer
Consumerは以下のようなコンポーネントで構成されます。
FunctionはConsumerと同様の構成で、Functionが返却したメッセージ(またはペイロード)がSupplierと同様に送信Binding Channelに伝達されます。
Element | Framework | Description |
---|---|---|
MessageChannelBinder | Spring AMQP(RabbitMQ) | MQとの通信方法を決定します。 |
MessageListenerContainer | Spring AMQP(RabbitMQ) | MQとのコネクションを確立し、メッセージを受信します。 |
Binding Channel | Spring Cloud Stream / Spring Integration | destination(宛先)の情報を保持し、MessageChannelBinderからメッセージを受け取ります。 |
MessageConverter | Spring Messaging | メッセージをシリアライズ/デシリアライズします。 |
Consumer | Spring Cloud Function | 受信したメッセージ(またはペイロード)を処理します。 |
RetryTemplate | Spring Retry | Consumerでのメッセージ処理をリトライします。 |
Binding ErrorChannel | Spring Cloud Stream / Spring Integration | destination単位に定義でき、Consumerで発生したエラーを処理します。また、エラーとなったメッセージをDLQに送信するため、MessageChannelBinderを呼び出します。 |
Default ErrorChannel | Spring Cloud Stream / Spring Integration | アプリケーションに1つだけ定義でき、Consumerで発生したエラーを処理します。 |
ErrorHandler | Spring Cloud Stream / Spring Cloud Function | エラーチャネルが送信するエラーメッセージをサブスクライブして、追加の処理を行います。 |
RabbitTemplate | Spring AMQP(RabbitMQ) | RabbitMQ(DLQ)にメッセージを送信します。 |
コンポーネントの解説
メッセージハンドラとバインディング
メッセージハンドラ(関数)
Spring Cloud Functionのコンポーネントで、Supplier,Function,Consumerのいずれかで実装します。
各関数は前述の通り、ラムダにより実装し@Bean
で定義します。
関数Beanは自動検出されますが、1アプリケーションに複数定義する場合はプロパティで有効にするBeanを選択する必要があります。
spring:
cloud:
function:
definition: supply
関数を複数有効にする場合は;
で区切ります。
関数のチェイン
1メッセージに対して複数の関数をチェイン実行することができます。
@Bean
public Function<InPayload, OutPayload> func() {
return inPayload -> outPayload;
}
@Bean
public Consumer<OutPayload> consume() {
return outPayload -> xxx;
}
関数を|
で繋ぎます。
spring:
cloud:
function:
definition: func|consume
単純な処理を関数として部品化し、組み合わせることで複雑な処理を実現します。
ビジネスロジックよりはデータエンジニアリングに向いています。
Binding Channel
Spring Cloud Streamのコンポーネントで、Spring Intgrationで実現されます。
Binding ChannelはSupplier,Functionの送信先、Function,Consumerの受信元の接続情報(バインディング)を管理します。
Binding Channelはプロパティで定義します。
spring:
cloud:
stream:
bindings:
supply-out-0:
destination: testQueue
バインディング名は関数名から自動的に付与され、[functionName]-[in|out]-[index]
になります。
バインディング名に別名を付与することもできますが、関数とバインディングの対応が分かりにくくなるためお勧めしません。
バインディングに共通の設定
複数のバインディングに共通する設定は、デフォルトバインディングを利用します。
spring:
cloud:
stream:
default:
xxx: yyy
MessageConverter
Spring Messagingのコンポーネントです。(Spring Cloud Streamで部分的に拡張されています。)
メッセージヘッダやバインディングのコンテンツタイプに応じて、メッセージをシリアライズ/デシリアライズします。デフォルトコンテンツタイプはapplication/json
で、Jacksonが利用されます。
- バインディング単位の設定
spring:
cloud:
stream:
default:
content-type: "application/json"
- メッセージ単位の設定
@Bean
public Supplier<Message<Payload>> supply() {
return () -> MessageBuilder.withPayload(payload)
.setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
.build();
}
MessageConverterの追加
カスタムMessageConverterを利用するには、Beanを追加します。
@Bean
public MessageConverter customConverter() {
return new XxxMessageConverter();
}
ただし、Spring Messagingで用意されているクラスを追加しようとすると、無視されます。
デフォルトapplication/json
向けのクラスもSpring Cloud Streamで拡張しているようなので、既存クラス流用では動作しない場合があることを把握しているのでしょう。
ポーリングとリトライ
IntegrationFlow
Spring Integrationのコンポーネントです。
IntegrationFlowが行うSupplierポーリングは、Spring Integrationのプロパティで調整できます。
spring:
integration:
poller:
fixed-delay: 2000
RetryTemplate
Spring Retryのコンポーネントです。
RetryTemplateはRabbitTemplateでのメッセージ送信、Consumerでのメッセージ処理に適用されます。
それぞれプロパティで調整することができ、Consumerではリトライ対象の例外を設定できます。
- RabbitTemplateのリトライ設定
spring:
rabbitmq:
template:
retry:
enabled: true
- Consumerのリトライ設定
spring:
cloud:
stream:
default:
consumer:
max-attempts: 10
エラーハンドリング
Default ErrorChannel
Spring Integrationのコンポーネントです。
Spring Cloud Streamの基幹フレームワークであるSpring Integrationには、エラーハンドリングの最終防衛ラインとしてDefault ErrorChannelがセットアップされます。
ErrorChannelを含むMessageChannelは、チャネルが受信したメッセージMessageHandlerにより処理します。Default ErrorChannelには2つのMessageHandlerがセットされています。
- LoggingHandler:スタックトレースをエラーログとして出力します。
- ServiceActivatingHandler:
@ServiceActivator
でメッセージを処理できます。
これにより、@ServiceActivator
で任意のエラー処理を挟み込めます。
主にユースケースの要素を含まないフレームワークレベルのエラー処理を行うことをお勧めします。
@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public void errorChannel(MessagingException e) {
Message<?> failedMessage = e.getFailedMessage():
...
}
スローされた例外は必ずMessagingException
にラップされています。
Default ErrorChannelのカスタマイズ
Default ErrorChannelのLoggingHandlerは、スタックトレースをエラーログとして出力します。
例外の内容によっては異なるレベルのログを出力したくなると思いますが、そのような場合はDefault ErrorChannelのBeanを置き換える必要があります。
DefaultConfiguringBeanFactoryPostProcessor#registerErrorChannelを参考に、以下のBeanを定義すればDefault ErrorChannelを置き換えることができます。
@Bean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public PublishSubscribeChannel errorChannel(IntegrationProperties integrationProperties) {
Error errorProperties = integrationProperties.getError();
PublishSubscribeChannel errorChannel = new PublishSubscribeChannel(errorProperties.isRequireSubscribers());
errorChannel.setIgnoreFailures(errorProperties.isIgnoreFailures());
return errorChannel;
}
@Bean(IntegrationContextUtils.ERROR_LOGGER_BEAN_NAME + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX)
public LoggingHandler loggingHandler() {
return new MyLoggingHandler();
}
@Bean
public ConsumerEndpointFactoryBean consumerEndopointFactory(PublishSubscribeChannel errorChannel, LoggingHandler loggingHandler) {
ConsumerEndpointFactoryBean factoryBean = new ConsumerEndpointFactoryBean();
factoryBean.setInputChannel(errorChannel);
factoryBean.setHandler(loggingHandler);
return factoryBean;
}
Binding ErrorChannel
Spring Cloud Streamのコンポーネントで、Spring Integrationで実現されます。
バインディング単位(正確にはdestination単位)のエラーチャネルが、[destination].[group].errors
の名前で登録されます。
Default ErrorChannelと同様に、@ServiceActivator
で任意のエラー処理を挟み込めます。
@ServiceActivator
でエラー処理を行なうと、(outputChannel
に出力しない限り)エラー処理が完了し、以降のエラー処理には伝達されません。
主にユースケースレベルのエラー処理を行うことをお勧めしたいところですが、、、。迂闊にエラー処理するとDLQ交換が実行されない等の問題が生じることから、挙動をよく確認して利用することをお勧めします。
@ServiceActivator(inputChannel = "testQueue.testGroup.errors")
public void errorChannel(MessagingException e) {
Message<?> failedMessage = e.getFailedMessage():
...
}
なお、Supplierでも同様の仕組みがありますが、仕様が異なり基本的にフレームワーク側が利用します。
ErrorHandler
Spring Cloud Streamのコンポーネントで、Spring Cloud Functionで実現されます。
Binding ErrorChannelから伝達されたエラーメッセージをサブスクライブして、追加処理を行います。
ErrorChannelのメッセージングとは独立してサブスクライブするため、ここでエラー処理してもDefault ErrorChannelに伝達されます。
主にユースケースレベルのエラー処理を行うことをお勧めします。
@Bean
public Consumer<MessagingException> errorHandler() {
return e -> {
Message<?> failedMessage = e.getFailedMessage():
...
};
}
※リファレンスの実装例ではConsumer<ErrorMessage>
ですが、Spring Cloud FunctionとMessageConverterの仕様から、ErrorMessage
ジェネリクスだと変換エラーになります。
spring:
cloud:
function:
definition: consume
stream:
default:
error-handler-definition: errorHandler
ErrorHandlerはプロパティでバインディングに紐づけます。
ErrorHandlerも関数の一種なので、メッセージハンドラの関数を選択する必要が生じます。
ChannelInterceptor
構成図にはありませんが、任意のMessageChannelにインターセプトできます。
例えば以下のように、送信バインディングにインターセプトできます。
@Bean
@GlobalChannelInterceptor(patterns = "*-out-*")
public ChannelInterceptor intercept() {
return new ChannelInterceptor() {
@Override public Message<?> preSend(Message<?> message, MessageChannel channel) { return message; }
@Override public void postSend(Message<?> message, MessageChannel channel, boolean sent) {}
@Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {}
};
}
sendとreceiveの前・後・完了時にインターセプトできますが、Spring Cloud Streamではsendしか呼んでいないようです。
このうちafterSendCompletion
を利用して、送信失敗時のエラー処理をすることもできます。(あまり利用することはないと思いますが、、、)
RabbitMQバインダ
RabbitTemplate / MessageListenerContainer
Spirng AMQP(RabbitMQ)のコンポーネントです。
ここからはSpring Cloud Streamの解説から外れるので解説しませんが、以下の点は理解しておく必要があります。
- 送受信やDLQ交換はバインダ内で自動的に行われ、実装を意識する必要がない。
- RabbitTemplateやMessageListenerContainer自体を置き換えることはできない。
- Spring Cloud Streamのプロパティによりカスタマイズできる部分も限られている。
このため、Spring AMQPを使い慣れた人が思い通りに拡張しようとすると、使いにくさを感じるかもしれません。RabbitMessageChannelBinderのセッティングをよく理解する必要がありそうです。
まとめ
今回はSpring Cloud Streamの構成を整理してみました。
簡単に実装できてとっつきやすい印象ですが、理解してくると難しく感じるフレームワークでした。
フレームワークで隠蔽している部分が多いため実装の自由度には不十分さを感じますが、開発中のv4ではさらに使いやすくなると良いですね。