Help us understand the problem. What is going on with this article?

Spring BootでRabbit MQクラスタを利用する

More than 3 years have passed since last update.

概要

dependencyにspring-boot-starter-amqpを追加しapplication.ymlに接続先などを設定するだけで利用可能になる。
まだ検証途中だがいろいろと試してみたので一旦メモ。

RabbitMQのクラスタ環境で利用する

実運用ではRaabbitMQはほぼクラスタ構成にするはずなので、単純にクラスタ構成を組んだ場合以下のようになる。

スクリーンショット 2016-01-11 19.46.43.png

@Configuration
public class RabbitConfiguration {

    @Bean
    public Exchange testExchange() {
        return new DirectExchange("test.exchange");
    }

    @Bean
    public Queue queueX() {
        return new Queue("queue.x");
    }

    @Bean
    public Queue queueY() {
        return new Queue("queue.y");
    }

    @Bean
    @Autowired
    public Binding queueXBinding(@Qualifier("queueX") Queue queue,Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("routing-key-x").noargs();
    }

    @Bean
    @Autowired
    public Binding queueXBinding(@Qualifier("queueY") Queue queue,Exchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with("routing-key-y").noargs();
    }


}

@Autowired
private RabbitMessagingTemplate template;   

@Transactional
public void send() {        
    Message<String> message = new GenericMessage<>("data")
    template.send("test.exchange", "routing-key-x",message);
}
@RabbitListener(queues="queue.X")
public class QueueXListener {   
    @RabbitHandler
    public void onMessage(@Payload String data){
    }
}
spring:
    rabbitmq:
        addresses: node1:5672,node2:5672
        username: guest
        password: guest
  • RabbitMessagingTemplateはspring.rabbitmq.addressesに指定したうちの一つのNodeに対してメッセージを送信
  • Exchangeは指定されたルーティングキーに従ってQueueにメッセージをルーティング
  • Consumer(@RabbitListener)は指定されたキューに接続しメッセージがQueueに入ったらメッセージを取り出し処理開始

キューをミラーリングして利用する

クラスタ構成だけでは、ノード間でキューを認識してルーティングが可能になるだけなので、上の図ではNode1、Node2のいずれにメッセージを送信しても対象のqueue Xにメッセージが到達するが、Node1に障害が発生するとqueue Xにメッセージを送信できなくなる。
そのため、実運用ではキューを各ノードでミラーリングすることになる。
ミラーリングしたキューの利用に伴うSpringの設定変更は不要。

スクリーンショット 2016-01-11 19.50.09.png

スクリーンショット 2016-01-11 20.01.09.png

  • Node2がダウンするとNode1のQueue Yがmasterに昇格する。
  • ConsumerはNode1に接続して処理を継続する。
  • Node1/Node2が稼働中はConsumerはどのノードに接続してもMasterからデキューして処理する。
  • Node2を停止しただけでNode1のqueue Yがmasterになるので、試してはいないがNode1とNode2の接続が切れるとスプリットブレイン症候群に陥り、切断中はmasterが二つ存在した状態になり、メッセージの2重配信が発生するはず。3ノードクラスタを構築してcluster_partition_handlingパラメータでいろいろできそうではある。

Consumerが処理中に接続が切れたりノードが停止した場合

Consumerがメッセージの処理を開始すると、メッセージの状態はReadyからUnackedになるが、メッセージを処理中にRabbitMQとの接続が切れたり、接続中のノードが停止した場合にはメッセージの状態がUnackedからReadyに戻る。そして同じキューに対するRabbitListenerがデキューし処理を再開することになる。
そのため、同じキューに対するRabbitListenerが複数存在したり、RabbitListenrの同時実行数が2以上になっている場合には、2重でメッセージが処理されることになる。

スクリーンショット 2016-01-12 19.34.16.png

リトライ処理

Consumer側でExceptionが発生した場合にリトライするにはStatelessRetryOperationsInterceptorを利用する。
StatefullRetryOperationsInterceptorもあるが、予期しないExceptionが発生した場合、内部でリトライできないようにマークしているため、機能しない。

@Bean
@Autowired  
public SimpleRabbitListenerContainerFactory springRetryConnectionFactory(ConnectionFactory cf) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(cf);
    //リトライ処理追加3秒インターバルで3回リトライする
    factory.setAdviceChain(RetryInterceptorBuilder.stateless()
        .maxAttempts(3).backOffOptions(3000, 1.0, 3000).build());
    factory.setDefaultRequeueRejected(false);
    return factory;
}
//containerFactoryに上で定義したBeanを指定
@Component
@RabbitListener(queues="default.queue",containerFactory="springRetryConnectionFactory")
public class DefaultQueueListener {
}

Exceptionが発生するとStatelessRetryOperationInterceptorがRabbitListenerの処理を再実行することになる。指定したリトライ回数を超過するとRabbitMQにエラーとして返却するため、DLQが指定されていればDLQに移動になる。

スクリーンショット 2016-01-12 20.23.50.png

リトライ処理②

たいていの場合上記のリトライでも十分だが、一回エラーになったら次のキューを優先してリトライ処理を後回しにしたい場合には、Rabbit MQ側の構成の工夫で対応することになる。

スクリーンショット 2016-01-12 20.33.42.png

  1. Producerがメッセージsendするとdefault.queueに入る。
  2. default.queueにConsumerが処理する。このときx-max-retryヘッダにリトライ回数を詰める
  3. エラーになるとretry.queue(DLQ)に入る。
  4. retry.queueをlistenするConsumerがx-max-retryヘッダとx-current-retryヘッダをつきあわせて、リトライ可能であれば、x-current-retryをインクリメントしてwait.queueにputする。リトライ不可能であればfailure.queueにputして終了。
  5. wait.queueに指定されたTTL(インターバル時間)を超過するとDLQとして指定されたdefault.queueに移動。
  6. 以降2から繰り返し。
//Spring Boot起動時に自動的にRabbitMQにQueue/Exchange/Bindingの定義が反映される
@Configuration
public class RabbitConfiguration {

    @Bean
    public Exchange defaultExchange() {
        return new DirectExchange("default.exchange");
    }

    @Bean
    public Exchange errorExchange() {
        return new DirectExchange("error.exchange");
    }

    @Bean
    public Queue defaultQueue() {
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange","error.exchange");
        arguments.put("x-dead-letter-routing-key","retry.routing-key");
        return new Queue("default.queue", true, false, false, arguments);
    }

    @Bean
    public Queue waitQueue() {
        Map<String,Object> arguments = new HashMap<>();
        arguments.put("x-dead-letter-exchange","default.exchange");
        arguments.put("x-message-ttl",2000); //interval 2000msec
        return new Queue("wait.queue", true, false, false, arguments);
    }

    @Bean
    public Queue failureQueue() {
        return new Queue("failure.queue");
    }

    @Bean
    public Queue retryQueue() {     
        return new Queue("retry.queue");
    }

    @Bean
    @Autowired  
    public Binding defaultQueueBinding(@Qualifier("defaultQueue") Queue queue,
            @Qualifier("defaultExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("routing-key").noargs();
    }

    @Bean
    @Autowired  
    public Binding waitQueueBinding(@Qualifier("waitQueue") Queue queue,
            @Qualifier("errorExchange") Exchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with("routing-key").noargs();
    }

    @Bean
    @Autowired  
    public Binding retryQueueBinding(@Qualifier("retryQueue") Queue queue,
            @Qualifier("errorExchange") Exchange exchange){
        return BindingBuilder.bind(queue)
                                .to(exchange)
                                .with("retry.routing-key").noargs();
    }

    @Bean
    @Autowired  
    public Binding failureQueueBinding(@Qualifier("failureQueue") Queue queue,
            @Qualifier("errorExchange") Exchange exchange){
        return BindingBuilder.bind(queue)
                                .to(exchange)
                                .with("failure.routing-key").noargs();
    }

    @Bean
    @Autowired  
    public SimpleRabbitListenerContainerFactory requeueRejectContainerFactory(ConnectionFactory cf) {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(cf);
        factory.setDefaultRequeueRejected(false);
        factory.setMaxConcurrentConsumers(3);
        factory.setConcurrentConsumers(3);
        return factory;
    }
}
@Component
@RabbitListener(queues="default.queue",containerFactory="requeueRejectContainerFactory")
public class DefaultQueueListener {

    @RabbitHandler
    public void process(@Payload String data){
    }

}
@Component
@RabbitListener(queues="retry.queue")
public class RetryQueueListener {

    @Autowired
    private RabbitMessagingTemplate template;

    @Transactional
    @RabbitHandler
    public void process(@Payload String data ,@Headers Map<String,Object> headers){
        int maxRetry = (int)headers.getOrDefault("x-max-retry", 0);
        int currentRetry = (int)headers.getOrDefault("x-current-retry", 0);

        Map<String,Object> newHeaders = new HashMap<>();
        headers.entrySet().stream().filter(e->e.getKey().startsWith("x-")).forEach(e->{
            newHeaders.put(e.getKey(), e.getValue());   
        });
        if(currentRetry < maxRetry){            
            currentRetry++;
            newHeaders.put("x-current-retry", currentRetry);
            GenericMessage<String> message = new GenericMessage<>(data,newHeaders);
            template.send("error.exchange","routing-key",message);
        }else{                              
            GenericMessage<String> message = new GenericMessage<>(data,newHeaders);
            template.send("error.exchange","failure.routing-key",message);
        }
    }

}

その他はまったところ

データベースとRabbitMQへのメッセージ送信の疑似同期

トランザクションについてはRabbitTransactionManagerを利用可能だが、JTAには参加できないため、DBのトランザクションとRabbitMQのトランザクションの同期を取ることはできない。
ベストエフォートでやるなら、RabbitTransactionManagerを利用せず、TransactionSynchronizationManagerを利用してDBコミット後にメッセージを送信するなどの方法が考えられる。この場合メッセージ送信でエラーが発生してもDBはコミットされた状態となる。

@Service
public class SampleService {

    @Autowired
    private SampleRepository repository;

    @Autowired
    private RabbitMessagingTemplate template;

    @Transactional
    public void execute() throws JsonProcessingException{

        //DB登録
        repository.insert(new Sample());    

        //メッセージ送信
        send("default.exchange", "routing-key", "message to send");                 
    }

    private <T> void send(String destination, String routingKey, String payload ){
        if( TransactionSynchronizationManager.isActualTransactionActive() ) {
            TransactionSynchronizationManager.registerSynchronization(
            new TransactionSynchronizationAdapter() {
                public void afterCommit(){
                    template.convertAndSend(destination,routingKey,payload);                    
                }
            });     
        }
    }
}

メッセージの永久受信

デフォルトのrabbitListenerContainerFactoryは、Consumer側で予期しない例外が発生すると、DLQにも移動されずキューに残り続けるので、場合によっては無限に再実行を繰り返す。SimpleRabbitListenerContainerFactoryを自作してdefaultRequeueRejected=falseを設定する必要がある。

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away