概要
dependencyにspring-boot-starter-amqpを追加しapplication.ymlに接続先などを設定するだけで利用可能になる。
まだ検証途中だがいろいろと試してみたので一旦メモ。
RabbitMQのクラスタ環境で利用する
実運用ではRaabbitMQはほぼクラスタ構成にするはずなので、単純にクラスタ構成を組んだ場合以下のようになる。
@Configuration
public class RabbitConfiguration {
@Bean
public Exchange testExchange() {
return new DirectExchange("test.exchange");
}
@Bean
public Queue queueX() {
return new Queue("queue.x");
}
@Bean
public Queue queueY() {
return new Queue("queue.y");
}
@Bean
@Autowired
public Binding queueXBinding(@Qualifier("queueX") Queue queue,Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("routing-key-x").noargs();
}
@Bean
@Autowired
public Binding queueXBinding(@Qualifier("queueY") Queue queue,Exchange exchange) {
return BindingBuilder.bind(queue).to(exchange).with("routing-key-y").noargs();
}
}
@Autowired
private RabbitMessagingTemplate template;
@Transactional
public void send() {
Message<String> message = new GenericMessage<>("data")
template.send("test.exchange", "routing-key-x",message);
}
@RabbitListener(queues="queue.X")
public class QueueXListener {
@RabbitHandler
public void onMessage(@Payload String data){
}
}
spring:
rabbitmq:
addresses: node1:5672,node2:5672
username: guest
password: guest
- RabbitMessagingTemplateはspring.rabbitmq.addressesに指定したうちの一つのNodeに対してメッセージを送信
- Exchangeは指定されたルーティングキーに従ってQueueにメッセージをルーティング
- Consumer(@RabbitListener)は指定されたキューに接続しメッセージがQueueに入ったらメッセージを取り出し処理開始
キューをミラーリングして利用する
クラスタ構成だけでは、ノード間でキューを認識してルーティングが可能になるだけなので、上の図ではNode1、Node2のいずれにメッセージを送信しても対象のqueue Xにメッセージが到達するが、Node1に障害が発生するとqueue Xにメッセージを送信できなくなる。
そのため、実運用ではキューを各ノードでミラーリングすることになる。
ミラーリングしたキューの利用に伴うSpringの設定変更は不要。
- Node2がダウンするとNode1のQueue Yがmasterに昇格する。
- ConsumerはNode1に接続して処理を継続する。
- Node1/Node2が稼働中はConsumerはどのノードに接続してもMasterからデキューして処理する。
- Node2を停止しただけでNode1のqueue Yがmasterになるので、試してはいないがNode1とNode2の接続が切れるとスプリットブレイン症候群に陥り、切断中はmasterが二つ存在した状態になり、メッセージの2重配信が発生するはず。3ノードクラスタを構築してcluster_partition_handlingパラメータでいろいろできそうではある。
Consumerが処理中に接続が切れたりノードが停止した場合
Consumerがメッセージの処理を開始すると、メッセージの状態はReadyからUnackedになるが、メッセージを処理中にRabbitMQとの接続が切れたり、接続中のノードが停止した場合にはメッセージの状態がUnackedからReadyに戻る。そして同じキューに対するRabbitListenerがデキューし処理を再開することになる。
そのため、同じキューに対するRabbitListenerが複数存在したり、RabbitListenrの同時実行数が2以上になっている場合には、2重でメッセージが処理されることになる。
リトライ処理
Consumer側でExceptionが発生した場合にリトライするにはStatelessRetryOperationsInterceptorを利用する。
StatefullRetryOperationsInterceptorもあるが、予期しないExceptionが発生した場合、内部でリトライできないようにマークしているため、機能しない。
@Bean
@Autowired
public SimpleRabbitListenerContainerFactory springRetryConnectionFactory(ConnectionFactory cf) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf);
//リトライ処理追加3秒インターバルで3回リトライする
factory.setAdviceChain(RetryInterceptorBuilder.stateless()
.maxAttempts(3).backOffOptions(3000, 1.0, 3000).build());
factory.setDefaultRequeueRejected(false);
return factory;
}
//containerFactoryに上で定義したBeanを指定
@Component
@RabbitListener(queues="default.queue",containerFactory="springRetryConnectionFactory")
public class DefaultQueueListener {
}
Exceptionが発生するとStatelessRetryOperationInterceptorがRabbitListenerの処理を再実行することになる。指定したリトライ回数を超過するとRabbitMQにエラーとして返却するため、DLQが指定されていればDLQに移動になる。
リトライ処理②
たいていの場合上記のリトライでも十分だが、一回エラーになったら次のキューを優先してリトライ処理を後回しにしたい場合には、Rabbit MQ側の構成の工夫で対応することになる。
- Producerがメッセージsendするとdefault.queueに入る。
- default.queueにConsumerが処理する。このときx-max-retryヘッダにリトライ回数を詰める
- エラーになるとretry.queue(DLQ)に入る。
- retry.queueをlistenするConsumerがx-max-retryヘッダとx-current-retryヘッダをつきあわせて、リトライ可能であれば、x-current-retryをインクリメントしてwait.queueにputする。リトライ不可能であればfailure.queueにputして終了。
- wait.queueに指定されたTTL(インターバル時間)を超過するとDLQとして指定されたdefault.queueに移動。
- 以降2から繰り返し。
//Spring Boot起動時に自動的にRabbitMQにQueue/Exchange/Bindingの定義が反映される
@Configuration
public class RabbitConfiguration {
@Bean
public Exchange defaultExchange() {
return new DirectExchange("default.exchange");
}
@Bean
public Exchange errorExchange() {
return new DirectExchange("error.exchange");
}
@Bean
public Queue defaultQueue() {
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","error.exchange");
arguments.put("x-dead-letter-routing-key","retry.routing-key");
return new Queue("default.queue", true, false, false, arguments);
}
@Bean
public Queue waitQueue() {
Map<String,Object> arguments = new HashMap<>();
arguments.put("x-dead-letter-exchange","default.exchange");
arguments.put("x-message-ttl",2000); //interval 2000msec
return new Queue("wait.queue", true, false, false, arguments);
}
@Bean
public Queue failureQueue() {
return new Queue("failure.queue");
}
@Bean
public Queue retryQueue() {
return new Queue("retry.queue");
}
@Bean
@Autowired
public Binding defaultQueueBinding(@Qualifier("defaultQueue") Queue queue,
@Qualifier("defaultExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("routing-key").noargs();
}
@Bean
@Autowired
public Binding waitQueueBinding(@Qualifier("waitQueue") Queue queue,
@Qualifier("errorExchange") Exchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("routing-key").noargs();
}
@Bean
@Autowired
public Binding retryQueueBinding(@Qualifier("retryQueue") Queue queue,
@Qualifier("errorExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with("retry.routing-key").noargs();
}
@Bean
@Autowired
public Binding failureQueueBinding(@Qualifier("failureQueue") Queue queue,
@Qualifier("errorExchange") Exchange exchange){
return BindingBuilder.bind(queue)
.to(exchange)
.with("failure.routing-key").noargs();
}
@Bean
@Autowired
public SimpleRabbitListenerContainerFactory requeueRejectContainerFactory(ConnectionFactory cf) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(cf);
factory.setDefaultRequeueRejected(false);
factory.setMaxConcurrentConsumers(3);
factory.setConcurrentConsumers(3);
return factory;
}
}
@Component
@RabbitListener(queues="default.queue",containerFactory="requeueRejectContainerFactory")
public class DefaultQueueListener {
@RabbitHandler
public void process(@Payload String data){
}
}
@Component
@RabbitListener(queues="retry.queue")
public class RetryQueueListener {
@Autowired
private RabbitMessagingTemplate template;
@Transactional
@RabbitHandler
public void process(@Payload String data ,@Headers Map<String,Object> headers){
int maxRetry = (int)headers.getOrDefault("x-max-retry", 0);
int currentRetry = (int)headers.getOrDefault("x-current-retry", 0);
Map<String,Object> newHeaders = new HashMap<>();
headers.entrySet().stream().filter(e->e.getKey().startsWith("x-")).forEach(e->{
newHeaders.put(e.getKey(), e.getValue());
});
if(currentRetry < maxRetry){
currentRetry++;
newHeaders.put("x-current-retry", currentRetry);
GenericMessage<String> message = new GenericMessage<>(data,newHeaders);
template.send("error.exchange","routing-key",message);
}else{
GenericMessage<String> message = new GenericMessage<>(data,newHeaders);
template.send("error.exchange","failure.routing-key",message);
}
}
}
その他はまったところ
データベースとRabbitMQへのメッセージ送信の疑似同期
トランザクションについてはRabbitTransactionManagerを利用可能だが、JTAには参加できないため、DBのトランザクションとRabbitMQのトランザクションの同期を取ることはできない。
ベストエフォートでやるなら、RabbitTransactionManagerを利用せず、TransactionSynchronizationManagerを利用してDBコミット後にメッセージを送信するなどの方法が考えられる。この場合メッセージ送信でエラーが発生してもDBはコミットされた状態となる。
@Service
public class SampleService {
@Autowired
private SampleRepository repository;
@Autowired
private RabbitMessagingTemplate template;
@Transactional
public void execute() throws JsonProcessingException{
//DB登録
repository.insert(new Sample());
//メッセージ送信
send("default.exchange", "routing-key", "message to send");
}
private <T> void send(String destination, String routingKey, String payload ){
if( TransactionSynchronizationManager.isActualTransactionActive() ) {
TransactionSynchronizationManager.registerSynchronization(
new TransactionSynchronizationAdapter() {
public void afterCommit(){
template.convertAndSend(destination,routingKey,payload);
}
});
}
}
}
メッセージの永久受信
デフォルトのrabbitListenerContainerFactoryは、Consumer側で予期しない例外が発生すると、DLQにも移動されずキューに残り続けるので、場合によっては無限に再実行を繰り返す。SimpleRabbitListenerContainerFactoryを自作してdefaultRequeueRejected=falseを設定する必要がある。