1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Spring Cloud Stream フレームワークを理解する

Last updated at Posted at 2022-10-11

Spring Cloud StreamはSpring Cloudファミリーのフレームワークで、イベント駆動型のマイクロサービスやETLパイプラインの一部となるメッセージングアプリを構築するために利用します。

Spring Cloud Streamを利用すると非常に簡単にメッセージングアプリを構築できますが、複数のSpringプロジェクトを統合して構築された非常に複雑なフレームワークで、理解して使うのはちょっと難しいです。

この記事では、Spring Cloud Stream v3に準拠して、フレームワークの構成を簡単に解説します。

バージョン

  • Spring Boot v2.7.3
  • Spring Cloud v2021.0.4
  • Spring Cloud Stream v3.2.5

この記事では、汎用的なメッセージングに適したRabbitMQバインダを利用します。

Spring Cloud Streamとは

v3ではjava.util.functionに準拠しており、関数をBeanを定義するだけで簡単にメッセージを処理するハンドラを実装できます。ハンドラ以外のほとんどすべては、フレームワークで隠蔽されます。

java.util.function.Supplier:送信(Source, Producer)

DemoApplication.java
@Bean
public Supplier<Payload> supply() {
  return () -> payload;
}

メッセージペイロードとなるオブジェクトを返却します。

java.util.function.Function:送信&受信(Processor)

DemoApplication.java
@Bean
public Function<InPayload, OutPayload> func() {
  return inPayload -> outPayload;
}

受信メッセージのペイロードを受け取り、送信メッセージのペイロードを返却します。

java.util.function.Consumer:受信(Sink, Consumer)

DemoApplication.java
@Bean
public Consumer<Payload> consume() {
  return payload -> xxx;
}

受信メッセージのペイロードを受け取り、ロジックを実行します。

実装のポイント

  • java.util.functionのインターフェイスは@FunctionalInterfaceで、ラムダ式で関数を実装することができます。
  • ジェネリクス型にはペイロード(任意のクラス)と、Message<Payload>型を指定できます。ヘッダなどを制御したいときはMessage<Payload>を利用します。

フレームワークの構成

Spring Cloud Streamは大きくSupplierとそれ以外で異なる構成を持っています。
それぞれを簡単な図にして解説します。(簡略化しており、一部正確でない部分があります。)

Supplier

Supplierは以下のようなコンポーネントで構成されます。

Supplier

Element Framework Description
Supplier Spring Cloud Function 送信するメッセージ(またはペイロード)を生成します。
MessageConverter Spring Messaging メッセージをシリアライズ/デシリアライズします。
IntegrationFlow Spring Integration Supplierをポーリングし、生成されたメッセージをBindingChannelに伝達します。
Binding Channel Spring Cloud Stream / Spring Integration destination(宛先)の情報を保持し、メッセージ送信のためMessageChannelBinderを呼び出します。
MessageChannelBinder Spring AMQP(RabbitMQ) MQとの通信方法を決定します。
RabbitTemplate Spring AMQP(RabbitMQ) RabbitMQにメッセージを送信します。
RetryTemplate Spring Retry RabbitTemplateでのメッセージ送信をリトライします。
Default ErrorChannel Spring Cloud Stream / Spring Integration アプリケーションに1つだけ定義でき、Consumerで発生したエラーを処理します。

Function,Consumer

Consumerは以下のようなコンポーネントで構成されます。
FunctionはConsumerと同様の構成で、Functionが返却したメッセージ(またはペイロード)がSupplierと同様に送信Binding Channelに伝達されます。

Consumer

Element Framework Description
MessageChannelBinder Spring AMQP(RabbitMQ) MQとの通信方法を決定します。
MessageListenerContainer Spring AMQP(RabbitMQ) MQとのコネクションを確立し、メッセージを受信します。
Binding Channel Spring Cloud Stream / Spring Integration destination(宛先)の情報を保持し、MessageChannelBinderからメッセージを受け取ります。
MessageConverter Spring Messaging メッセージをシリアライズ/デシリアライズします。
Consumer Spring Cloud Function 受信したメッセージ(またはペイロード)を処理します。
RetryTemplate Spring Retry Consumerでのメッセージ処理をリトライします。
Binding ErrorChannel Spring Cloud Stream / Spring Integration destination単位に定義でき、Consumerで発生したエラーを処理します。また、エラーとなったメッセージをDLQに送信するため、MessageChannelBinderを呼び出します。
Default ErrorChannel Spring Cloud Stream / Spring Integration アプリケーションに1つだけ定義でき、Consumerで発生したエラーを処理します。
ErrorHandler Spring Cloud Stream / Spring Cloud Function エラーチャネルが送信するエラーメッセージをサブスクライブして、追加の処理を行います。
RabbitTemplate Spring AMQP(RabbitMQ) RabbitMQ(DLQ)にメッセージを送信します。

コンポーネントの解説

メッセージハンドラとバインディング

メッセージハンドラ(関数)

Spring Cloud Functionのコンポーネントで、Supplier,Function,Consumerのいずれかで実装します。
各関数は前述の通り、ラムダにより実装し@Beanで定義します。

関数Beanは自動検出されますが、1アプリケーションに複数定義する場合はプロパティで有効にするBeanを選択する必要があります。

application.yml
spring:
  cloud:
    function:
      definition: supply

関数を複数有効にする場合は;で区切ります。

関数のチェイン

1メッセージに対して複数の関数をチェイン実行することができます。

DemoApplication.java
@Bean
public Function<InPayload, OutPayload> func() {
  return inPayload -> outPayload;
}
@Bean
public Consumer<OutPayload> consume() {
  return outPayload -> xxx;
}

関数を|で繋ぎます。

application.yml
spring:
  cloud:
    function:
      definition: func|consume

単純な処理を関数として部品化し、組み合わせることで複雑な処理を実現します。
ビジネスロジックよりはデータエンジニアリングに向いています。

Binding Channel

Spring Cloud Streamのコンポーネントで、Spring Intgrationで実現されます。

Binding ChannelはSupplier,Functionの送信先、Function,Consumerの受信元の接続情報(バインディング)を管理します。

Binding Channelはプロパティで定義します。

application.yml
spring:
  cloud:
    stream:
      bindings:
        supply-out-0:
          destination: testQueue

バインディング名は関数名から自動的に付与され、[functionName]-[in|out]-[index]になります。
バインディング名に別名を付与することもできますが、関数とバインディングの対応が分かりにくくなるためお勧めしません。

バインディングに共通の設定

複数のバインディングに共通する設定は、デフォルトバインディングを利用します。

application.yml
spring:
  cloud:
    stream:
      default:
        xxx: yyy

MessageConverter

Spring Messagingのコンポーネントです。(Spring Cloud Streamで部分的に拡張されています。)

メッセージヘッダやバインディングのコンテンツタイプに応じて、メッセージをシリアライズ/デシリアライズします。デフォルトコンテンツタイプはapplication/jsonで、Jacksonが利用されます。

  • バインディング単位の設定
application.yml
spring:
  cloud:
    stream:
      default:
        content-type: "application/json"
  • メッセージ単位の設定
DemoApplication.java
@Bean
public Supplier<Message<Payload>> supply() {
  return () -> MessageBuilder.withPayload(payload)
    .setHeader(MessageHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
    .build();
}

MessageConverterの追加

カスタムMessageConverterを利用するには、Beanを追加します。

DemoApplication.java
@Bean
public MessageConverter customConverter() {
  return new XxxMessageConverter();
}

ただし、Spring Messagingで用意されているクラスを追加しようとすると、無視されます。
デフォルトapplication/json向けのクラスもSpring Cloud Streamで拡張しているようなので、既存クラス流用では動作しない場合があることを把握しているのでしょう。

ポーリングとリトライ

IntegrationFlow

Spring Integrationのコンポーネントです。

IntegrationFlowが行うSupplierポーリングは、Spring Integrationのプロパティで調整できます。

application.yml
spring:
  integration:
    poller:
      fixed-delay: 2000

RetryTemplate

Spring Retryのコンポーネントです。

RetryTemplateはRabbitTemplateでのメッセージ送信、Consumerでのメッセージ処理に適用されます。
それぞれプロパティで調整することができ、Consumerではリトライ対象の例外を設定できます。

  • RabbitTemplateのリトライ設定
application.yml
spring:
  rabbitmq:
    template:
      retry:
        enabled: true
  • Consumerのリトライ設定
application.yml
spring:
  cloud:
    stream:
      default:
        consumer:
          max-attempts: 10

エラーハンドリング

Default ErrorChannel

Spring Integrationのコンポーネントです。

Spring Cloud Streamの基幹フレームワークであるSpring Integrationには、エラーハンドリングの最終防衛ラインとしてDefault ErrorChannelがセットアップされます。

ErrorChannelを含むMessageChannelは、チャネルが受信したメッセージMessageHandlerにより処理します。Default ErrorChannelには2つのMessageHandlerがセットされています。

  • LoggingHandler:スタックトレースをエラーログとして出力します。
  • ServiceActivatingHandler:@ServiceActivatorでメッセージを処理できます。

これにより、@ServiceActivatorで任意のエラー処理を挟み込めます。
主にユースケースの要素を含まないフレームワークレベルのエラー処理を行うことをお勧めします。

DemoApplication.java
@ServiceActivator(inputChannel = IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public void errorChannel(MessagingException e) {
  Message<?> failedMessage = e.getFailedMessage():
  ...
}

スローされた例外は必ずMessagingExceptionにラップされています。

Default ErrorChannelのカスタマイズ

Default ErrorChannelのLoggingHandlerは、スタックトレースをエラーログとして出力します。
例外の内容によっては異なるレベルのログを出力したくなると思いますが、そのような場合はDefault ErrorChannelのBeanを置き換える必要があります。

DefaultConfiguringBeanFactoryPostProcessor#registerErrorChannelを参考に、以下のBeanを定義すればDefault ErrorChannelを置き換えることができます。

DemoApplication.java
@Bean(IntegrationContextUtils.ERROR_CHANNEL_BEAN_NAME)
public PublishSubscribeChannel errorChannel(IntegrationProperties integrationProperties) {
  Error errorProperties = integrationProperties.getError();
  PublishSubscribeChannel errorChannel = new PublishSubscribeChannel(errorProperties.isRequireSubscribers());
  errorChannel.setIgnoreFailures(errorProperties.isIgnoreFailures());
  return errorChannel;  
}
@Bean(IntegrationContextUtils.ERROR_LOGGER_BEAN_NAME + IntegrationConfigUtils.HANDLER_ALIAS_SUFFIX)
public LoggingHandler loggingHandler() {
  return new MyLoggingHandler();
}
@Bean
public ConsumerEndpointFactoryBean consumerEndopointFactory(PublishSubscribeChannel errorChannel, LoggingHandler loggingHandler) {
  ConsumerEndpointFactoryBean factoryBean = new ConsumerEndpointFactoryBean();
  factoryBean.setInputChannel(errorChannel);
  factoryBean.setHandler(loggingHandler);
  return factoryBean;
}

Binding ErrorChannel

Spring Cloud Streamのコンポーネントで、Spring Integrationで実現されます。

バインディング単位(正確にはdestination単位)のエラーチャネルが、[destination].[group].errorsの名前で登録されます。

Default ErrorChannelと同様に、@ServiceActivatorで任意のエラー処理を挟み込めます。
@ServiceActivatorでエラー処理を行なうと、(outputChannelに出力しない限り)エラー処理が完了し、以降のエラー処理には伝達されません。

主にユースケースレベルのエラー処理を行うことをお勧めしたいところですが、、、。迂闊にエラー処理するとDLQ交換が実行されない等の問題が生じることから、挙動をよく確認して利用することをお勧めします。

DemoApplication.java
@ServiceActivator(inputChannel = "testQueue.testGroup.errors")
public void errorChannel(MessagingException e) {
  Message<?> failedMessage = e.getFailedMessage():
  ...
}

なお、Supplierでも同様の仕組みがありますが、仕様が異なり基本的にフレームワーク側が利用します。

ErrorHandler

Spring Cloud Streamのコンポーネントで、Spring Cloud Functionで実現されます。

Binding ErrorChannelから伝達されたエラーメッセージをサブスクライブして、追加処理を行います。
ErrorChannelのメッセージングとは独立してサブスクライブするため、ここでエラー処理してもDefault ErrorChannelに伝達されます。

主にユースケースレベルのエラー処理を行うことをお勧めします。

DemoApplication.java
@Bean
public Consumer<MessagingException> errorHandler() {
  return e -> {
    Message<?> failedMessage = e.getFailedMessage():
    ...
  };
}

※リファレンスの実装例ではConsumer<ErrorMessage>ですが、Spring Cloud FunctionとMessageConverterの仕様から、ErrorMessageジェネリクスだと変換エラーになります。

application.yml
spring:
  cloud:
    function:
      definition: consume
    stream:
      default:
        error-handler-definition: errorHandler

ErrorHandlerはプロパティでバインディングに紐づけます。
ErrorHandlerも関数の一種なので、メッセージハンドラの関数を選択する必要が生じます。

ChannelInterceptor

構成図にはありませんが、任意のMessageChannelにインターセプトできます。

例えば以下のように、送信バインディングにインターセプトできます。

DemoApplication.java
@Bean
@GlobalChannelInterceptor(patterns = "*-out-*")
public ChannelInterceptor intercept() {
  return new ChannelInterceptor() {
    @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { return message; }
    @Override public void postSend(Message<?> message, MessageChannel channel, boolean sent) {}
    @Override public void afterSendCompletion(Message<?> message, MessageChannel channel, boolean sent, @Nullable Exception ex) {}
  };
}

sendとreceiveの前・後・完了時にインターセプトできますが、Spring Cloud Streamではsendしか呼んでいないようです。

このうちafterSendCompletionを利用して、送信失敗時のエラー処理をすることもできます。(あまり利用することはないと思いますが、、、)

RabbitMQバインダ

RabbitTemplate / MessageListenerContainer

Spirng AMQP(RabbitMQ)のコンポーネントです。

ここからはSpring Cloud Streamの解説から外れるので解説しませんが、以下の点は理解しておく必要があります。

  • 送受信やDLQ交換はバインダ内で自動的に行われ、実装を意識する必要がない。
  • RabbitTemplateやMessageListenerContainer自体を置き換えることはできない。
  • Spring Cloud Streamのプロパティによりカスタマイズできる部分も限られている。

このため、Spring AMQPを使い慣れた人が思い通りに拡張しようとすると、使いにくさを感じるかもしれません。RabbitMessageChannelBinderのセッティングをよく理解する必要がありそうです。

まとめ

今回はSpring Cloud Streamの構成を整理してみました。
簡単に実装できてとっつきやすい印象ですが、理解してくると難しく感じるフレームワークでした。

フレームワークで隠蔽している部分が多いため実装の自由度には不十分さを感じますが、開発中のv4ではさらに使いやすくなると良いですね。

参考

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?