Spring BootでRabbit MQクラスタを利用する

More than 3 years have passed since last update.


概要

dependencyにspring-boot-starter-amqpを追加しapplication.ymlに接続先などを設定するだけで利用可能になる。

まだ検証途中だがいろいろと試してみたので一旦メモ。


RabbitMQのクラスタ環境で利用する

実運用ではRaabbitMQはほぼクラスタ構成にするはずなので、単純にクラスタ構成を組んだ場合以下のようになる。

スクリーンショット 2016-01-11 19.46.43.png

@Configuration

public class RabbitConfiguration {

@Bean
public Exchange testExchange() {
return new DirectExchange("test.exchange");
}

@Bean
public Queue queueX() {
return new Queue("queue.x");
}

@Bean
public Queue queueY() {
return new Queue("queue.y");
}

@Bean
@Autowired
public Binding queueXBinding(@Qualifier("queueX") Queue queue,Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("routing-key-x").noargs();
}

@Bean
@Autowired
public Binding queueXBinding(@Qualifier("queueY") Queue queue,Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("routing-key-y").noargs();
}

}

@Autowired

private RabbitMessagingTemplate template;

@Transactional
public void send() {
Message<String> message = new GenericMessage<>("data")
template.send("test.exchange", "routing-key-x",message);
}

@RabbitListener(queues="queue.X")

public class QueueXListener {
@RabbitHandler
public void onMessage(@Payload String data){
}
}

spring:

rabbitmq:
addresses: node1:5672,node2:5672
username: guest
password: guest


  • RabbitMessagingTemplateはspring.rabbitmq.addressesに指定したうちの一つのNodeに対してメッセージを送信

  • Exchangeは指定されたルーティングキーに従ってQueueにメッセージをルーティング

  • Consumer(@RabbitListener)は指定されたキューに接続しメッセージがQueueに入ったらメッセージを取り出し処理開始


キューをミラーリングして利用する

クラスタ構成だけでは、ノード間でキューを認識してルーティングが可能になるだけなので、上の図ではNode1、Node2のいずれにメッセージを送信しても対象のqueue Xにメッセージが到達するが、Node1に障害が発生するとqueue Xにメッセージを送信できなくなる。

そのため、実運用ではキューを各ノードでミラーリングすることになる。

ミラーリングしたキューの利用に伴うSpringの設定変更は不要。

スクリーンショット 2016-01-11 19.50.09.png

スクリーンショット 2016-01-11 20.01.09.png


  • Node2がダウンするとNode1のQueue Yがmasterに昇格する。

  • ConsumerはNode1に接続して処理を継続する。

  • Node1/Node2が稼働中はConsumerはどのノードに接続してもMasterからデキューして処理する。

  • Node2を停止しただけでNode1のqueue Yがmasterになるので、試してはいないがNode1とNode2の接続が切れるとスプリットブレイン症候群に陥り、切断中はmasterが二つ存在した状態になり、メッセージの2重配信が発生するはず。3ノードクラスタを構築してcluster_partition_handlingパラメータでいろいろできそうではある。


Consumerが処理中に接続が切れたりノードが停止した場合

Consumerがメッセージの処理を開始すると、メッセージの状態はReadyからUnackedになるが、メッセージを処理中にRabbitMQとの接続が切れたり、接続中のノードが停止した場合にはメッセージの状態がUnackedからReadyに戻る。そして同じキューに対するRabbitListenerがデキューし処理を再開することになる。

そのため、同じキューに対するRabbitListenerが複数存在したり、RabbitListenrの同時実行数が2以上になっている場合には、2重でメッセージが処理されることになる。

スクリーンショット 2016-01-12 19.34.16.png


リトライ処理

Consumer側でExceptionが発生した場合にリトライするにはStatelessRetryOperationsInterceptorを利用する。

StatefullRetryOperationsInterceptorもあるが、予期しないExceptionが発生した場合、内部でリトライできないようにマークしているため、機能しない。

@Bean

@Autowired
public SimpleRabbitListenerContainerFactory springRetryConnectionFactory(ConnectionFactory cf) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf);
//リトライ処理追加3秒インターバルで3回リトライする
factory.setAdviceChain(RetryInterceptorBuilder.stateless()
.maxAttempts(3).backOffOptions(3000, 1.0, 3000).build());
factory.setDefaultRequeueRejected(false);
return factory;
}

//containerFactoryに上で定義したBeanを指定

@Component
@RabbitListener(queues="default.queue",containerFactory="springRetryConnectionFactory")
public class DefaultQueueListener {
}

Exceptionが発生するとStatelessRetryOperationInterceptorがRabbitListenerの処理を再実行することになる。指定したリトライ回数を超過するとRabbitMQにエラーとして返却するため、DLQが指定されていればDLQに移動になる。

スクリーンショット 2016-01-12 20.23.50.png


リトライ処理②

たいていの場合上記のリトライでも十分だが、一回エラーになったら次のキューを優先してリトライ処理を後回しにしたい場合には、Rabbit MQ側の構成の工夫で対応することになる。

スクリーンショット 2016-01-12 20.33.42.png


  1. Producerがメッセージsendするとdefault.queueに入る。

  2. default.queueにConsumerが処理する。このときx-max-retryヘッダにリトライ回数を詰める

  3. エラーになるとretry.queue(DLQ)に入る。

  4. retry.queueをlistenするConsumerがx-max-retryヘッダとx-current-retryヘッダをつきあわせて、リトライ可能であれば、x-current-retryをインクリメントしてwait.queueにputする。リトライ不可能であればfailure.queueにputして終了。

  5. wait.queueに指定されたTTL(インターバル時間)を超過するとDLQとして指定されたdefault.queueに移動。

  6. 以降2から繰り返し。

//Spring Boot起動時に自動的にRabbitMQにQueue/Exchange/Bindingの定義が反映される

@Configuration
public class RabbitConfiguration {

@Bean
public Exchange defaultExchange() {
return new DirectExchange("default.exchange");
}

@Bean
public Exchange errorExchange() {
return new DirectExchange("error.exchange");
}

@Bean
public Queue defaultQueue() {
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","error.exchange");
arguments.put("x-dead-letter-routing-key","retry.routing-key");
return new Queue("default.queue", true, false, false, arguments);
}

@Bean
public Queue waitQueue() {
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","default.exchange");
arguments.put("x-message-ttl",2000); //interval 2000msec
return new Queue("wait.queue", true, false, false, arguments);
}

@Bean
public Queue failureQueue() {
return new Queue("failure.queue");
}

@Bean
public Queue retryQueue() {
return new Queue("retry.queue");
}

@Bean
@Autowired
public Binding defaultQueueBinding(@Qualifier("defaultQueue") Queue queue,
@Qualifier("defaultExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("routing-key").noargs();
}

@Bean
@Autowired
public Binding waitQueueBinding(@Qualifier("waitQueue") Queue queue,
@Qualifier("errorExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("routing-key").noargs();
}

@Bean
@Autowired
public Binding retryQueueBinding(@Qualifier("retryQueue") Queue queue,
@Qualifier("errorExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with("retry.routing-key").noargs();
}

@Bean
@Autowired
public Binding failureQueueBinding(@Qualifier("failureQueue") Queue queue,
@Qualifier("errorExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with("failure.routing-key").noargs();
}

@Bean
@Autowired
public SimpleRabbitListenerContainerFactory requeueRejectContainerFactory(ConnectionFactory cf) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf);
factory.setDefaultRequeueRejected(false);
factory.setMaxConcurrentConsumers(3);
factory.setConcurrentConsumers(3);
return factory;
}
}

@Component

@RabbitListener(queues="default.queue",containerFactory="requeueRejectContainerFactory")
public class DefaultQueueListener {

@RabbitHandler
public void process(@Payload String data){
}

}

@Component

@RabbitListener(queues="retry.queue")
public class RetryQueueListener {

@Autowired
private RabbitMessagingTemplate template;

@Transactional
@RabbitHandler
public void process(@Payload String data ,@Headers Map<String,Object> headers){
int maxRetry = (int)headers.getOrDefault("x-max-retry", 0);
int currentRetry = (int)headers.getOrDefault("x-current-retry", 0);

Map<String,Object> newHeaders = new HashMap<>();
headers.entrySet().stream().filter(e->e.getKey().startsWith("x-")).forEach(e->{
newHeaders.put(e.getKey(), e.getValue());
});
if(currentRetry < maxRetry){
currentRetry++;
newHeaders.put("x-current-retry", currentRetry);
GenericMessage<String> message = new GenericMessage<>(data,newHeaders);
template.send("error.exchange","routing-key",message);
}else{
GenericMessage<String> message = new GenericMessage<>(data,newHeaders);
template.send("error.exchange","failure.routing-key",message);
}
}

}


その他はまったところ


データベースとRabbitMQへのメッセージ送信の疑似同期

トランザクションについてはRabbitTransactionManagerを利用可能だが、JTAには参加できないため、DBのトランザクションとRabbitMQのトランザクションの同期を取ることはできない。

ベストエフォートでやるなら、RabbitTransactionManagerを利用せず、TransactionSynchronizationManagerを利用してDBコミット後にメッセージを送信するなどの方法が考えられる。この場合メッセージ送信でエラーが発生してもDBはコミットされた状態となる。

@Service

public class SampleService {

@Autowired
private SampleRepository repository;

@Autowired
private RabbitMessagingTemplate template;

@Transactional
public void execute() throws JsonProcessingException{

//DB登録
repository.insert(new Sample());

//メッセージ送信
send("default.exchange", "routing-key", "message to send");
}

private <T> void send(String destination, String routingKey, String payload ){
if( TransactionSynchronizationManager.isActualTransactionActive() ) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
public void afterCommit(){
template.convertAndSend(destination,routingKey,payload);
}
});
}
}
}


メッセージの永久受信

デフォルトのrabbitListenerContainerFactoryは、Consumer側で予期しない例外が発生すると、DLQにも移動されずキューに残り続けるので、場合によっては無限に再実行を繰り返す。SimpleRabbitListenerContainerFactoryを自作してdefaultRequeueRejected=falseを設定する必要がある。