LoginSignup
1
0

概要

業務アプリケーションで、RabbitMQ を使って非同期処理を分散させたい場合があります。
例えば、PDF作成やファイル転送などの比較的時間のかかる処理を非同期で処理したい場合です。

この場合、PDF作成やファイル転送といった処理種別毎に処理要求をキューイングし、複数スレッドでキューを取り出して処理を分散するアーキテクチャを採用します。

Spring-AMQP では、SimpleMessageListenerContainersetConcurrentConsumers を設定することで、複数のスレッドで処理を分散させることができます。

この記事では、Spring-AMQP を使った具体的な実装例について解説します。
(RabbitMQ や SpringBoot の概要、インストール方法などの詳細は割愛します)

動作環境

  • spring-boot-3.1.2
  • spring-boot-starter-amqp-3.1.2
  • spring-amqp-3.0.6
  • spring-rabbit-3.0.6
  • RabbitMQ 3.12.2
  • Erlang 26.0.2

キューの定義

キューは RabbitMQ を再起動しても消えないようにキュー作成時に Durable を指定して永続化します。また、メッセージにも永続化を指定する必要がありますが、Spring AMQP では永続化がデフォルトになっています。

メッセージの欠落を許容しないのであれば、永続化は必須となります。
性能を優先して永続化しないユースケースも考えられますが、この場合でも、送信側と受信側がなんらかの手段でメッセージの欠落を検知して再送を実装することになると思います。

Spring AMQPの実装例

受信側(Consumer)

Spring AMQP を使った実装例を以下に記載します。
SimpleMessageListenerContainer の派生クラスを定義します。

MQListenerContainer.java
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;


public class MQListenerContainer extends SimpleMessageListenerContainer{
	
	public MQListenerContainer(CachingConnectionFactory connectionFactory, MQListener listener) {
		super();

		super.setConnectionFactory(connectionFactory);

		MessageListenerAdapter adapter = new MessageListenerAdapter(listener);
		super.setMessageListener(adapter);
		
		super.setAcknowledgeMode(AcknowledgeMode.MANUAL);
		super.setDefaultRequeueRejected(true);
		super.setAutoStartup(false);
	}
	
	public void setQueuenames(String queuenames) {
		String queues[] = null;
		if ( queuenames != null && queuenames.length() != 0) {
			queues = queuenames.trim().split(",");
		}
		
		this.setQueueNames(queues);
	}
}

メッセージを受信すると setMessageListener で設定した MessageListener が呼び出されます。
MessageListner クラスのサンプルでは何も実装していませんが、ここにキューを処理する業務処理を実装することになります。

RabbitMQ の Ack と Nack 応答は業務処理で明示的に返すべきです。
SimpleMessageListenerContainersetAcknowledgeModeMANUAL に設定して、Channel#basicAck で Ack応答します。(異常の場合は Nack 応答)

Channel#basicNack の requeue パラメータを true にして Nack 応答すると、キューは処理中の状態から Ready 状態に戻されて、他の Consumer が受信可能になります。回復可能なエラーであれば、他の Consumer による再実行で回復するかもしれません。

Nack応答によりリトライすることはできますが、そのままだとリトライ間隔がほとんどなく、無限にリトライが繰り返されることになります。簡単な解決策は、Nack応答する前にリトライ間隔用のスリープを実装することです。

エラーが回復するまでスリープ状態の Consumer スレッドが増え、スレッドがいっぱいになると、キューの滞留が発生します。一定期間のリトライ後にキューを削除するにはキューのTTLで生存期間を設定してください。

また、スレッドが大量にメモリを消費するインスタンスを参照したままスリープしないように注意してください。

回復不能なエラーであれば運用ログを出力し、Ack応答してキューを削除してください。
エラーの場合は、Dead Letter に再キューする方法もありますが、いずれにしても人間が判断して運用対処が必要となるため、キューの削除と運用ログによる対処でも十分な場合があります。

MQListener
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;

import com.rabbitmq.client.Channel;

@Component("mQListener")
public class MQListener implements ChannelAwareMessageListener {

	@Override
	public void onMessage(Message message, Channel channel) throws Exception {
		String queueName = message.getMessageProperties().getConsumerQueue();
		String body = new String(message.getBody());
		
		try {
			// TODO: implement the business logic here.
			System.out.println("queueName: " + queueName + " body: " + body);

			channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

		} catch (Exception ex) {
			// TODO: sleep and logging.

			channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
		}
	}
}

Spring の XMLベースConfiguration を使用すると、RabbitMQ の接続情報を xmlファイルとして定義することができます。これはキューの送信側(Producer)と共有するため、受信側(Consumer)の設定とは分けて管理します。(サーバのアドレス、認証情報は環境に合わせて設定してください)

rabbit.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">
		
	<bean id="rabbitConnectionFactory" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory" scope="prototype">
		<property name="addresses" value="vm7"/>
		<property name="username" value="rabbitmq"/>
		<property name="password" value="rabbitmq"/>
		<property name="virtualHost" value="Test1"/>

		<property name="publisherReturns" value="true"/>
		<property name="publisherConfirmType" ref="CORRELATED"/>
	</bean>

	<bean id="CORRELATED" class="org.springframework.amqp.rabbit.connection.CachingConnectionFactory.ConfirmType" factory-method="valueOf">
		<constructor-arg value="CORRELATED"/>
	</bean>
</beans>

containers.xml が受信側(Consumer)用の設定です。
SimpleMessageListenerContainer を増やすには、bean の定義を増やすだけです。

containers.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="
		http://www.springframework.org/schema/beans
		http://www.springframework.org/schema/beans/spring-beans-3.1.xsd">

	<bean id="container1"	class="com.github.katohk.sample.rabbit.MQListenerContainer" scope="prototype">
		<constructor-arg name="connectionFactory" ref="rabbitConnectionFactory" />
		<constructor-arg name="listener" ref="mQListener" />

		<property name="queuenames" value="my-queue1,my-queue2"/>
		<property name="concurrentConsumers" value="3" />
		<property name="prefetchCount" value="1" />
	</bean>

	<bean id="container2"	class="com.github.katohk.sample.rabbit.MQListenerContainer" scope="prototype">
		<constructor-arg name="connectionFactory" ref="rabbitConnectionFactory" />
		<constructor-arg name="listener" ref="mQListener" />

		<property name="queuenames" value="my-queue3"/>
		<property name="concurrentConsumers" value="2" />
		<property name="prefetchCount" value="1" />
	</bean>
</beans>

ImportResource で上記のXML定義をimportし、Autowired で ListenerContainer をインジェクションします。

ConsumerApp.java
import java.util.List;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;

@SpringBootApplication
@ImportResource({"classpath:rabbit.xml","classpath:containers.xml"})
public class ConsumerApp implements CommandLineRunner{
	
	@Autowired
	List<MQListenerContainer> containers;

	public static void main(String[] args) {
		SpringApplication app = new SpringApplication(ConsumerApp.class);
		app.run(args);
	}

	public void run(String... args) throws Exception {
		for(MQListenerContainer container: containers) {
			container.start();
		}
	}
}

開始終了のタイミングをアプリケーションで制御したい場合は、setAutoStartupfalse に設定し、startメソッドを使って Listener を開始します。

サンプルでは終了処理を実装していませんが、終了する場合には Listener の実行状態を監視して、Listener の処理が完了してから stopメソッドで Listener を終了します。

SimpleMessageListenerContainerの内部構造

スレッドの分散や性能をチューニングするには、SimpleMessageListenerContainer の内部構造をある程度知っておく必要があります。

SimpleMessageListenerContainer.png

SimpleMessageListenerContainersetConcurrentConsumers で設定した数の BlockingQueueConsumer を作成し、それぞれの BlockingQueueConsumer が Channel#basicConsume でキューを受信します。

SimpleMessageListenerContainer では、setPrefetchCount で同時受信制限数(prefetch count
)を設定することができます。prefetch count は1つの Consumer が同時に受信可能なキューの数であり、処理中でまだ ack を返していないキューの最大値です。

BlockingQueueConsumer は受信したキューを、内部にキューイングし、syncMessageProcessingConsumers スレッドが順次キューを処理します。

prefetch count を1以外に設定すると、空いている Consumer があっても処理中の Consumer がキューを受信し、内部のキューで処理の順番待ちになることに注意が必要です。

例えば、時間のかかるPDF作成を処理している Consumer があった場合、他の Consumer が空いているにもかかわらず、処理中の Consumer がキューを受信してしまい、処理待ちになってしまう場合があります。処理を分散させたいのであれば、prefetch count は 1に設定したほうがよいでしょう。

prefetch count を増やせば、処理完了の ack を待つことなく、次のキューを受信するため、スループットが向上しますが、スレッド分散はうまくいきません。

キュー毎に処理を分散する

SimpleMessageListenerContainer では setQueueNames で複数のキューを処理することができます。処理種別毎にキューを分散して、MessageListener の中でキュー名によって処理を振り分けることができますが、複数のキューで1つのスレッドを共有することになります。

また、prefetch count はチャネル毎(つまりスレッド毎)の制限ではなくキュー毎の制限になります。prefetch count を 1に設定しても、1つの SimpleMessageListenerContainer が2つのキューを対象としていれば、それぞれのキューに対して 1 を設定したことになります。内部では Channel#basicQos の global オプションが false で設定されます。

このため、prefetch count を1に設定しても、1つのキューをスレッドが実行中の状態で、もう1つのキューは受信してしまい、処理待ちになってしまう場合があります。

受信制限をキュー毎ではなく、チャネルに対して制限したい場合は setGlobalQostrue に設定してください。内部では Channel#basicQos の global オプションが true で設定されます。(このメソッドは spring-rabbit 2.2.17 で追加されました)

例えば、PDF作成やファイル転送など、異なる処理を setQueueNames で共有すると、PDF作成の遅延や大量の要求によりスレッドがいっぱいになると、ファイル転送などの他の処理まで遅延してしまうことになります。

特定のキューの遅延や滞留が他のキューの処理に影響を与えないようにするには、SimpleMessageListenerContainer の受信キューは1つだけにして、受信キュー毎に SimpleMessageListenerContainer を作成します。こうすることで、処理の種別の特性に合わせてスレッド数や最大受信数を調整することができます。

上記の設定で Producer を起動して、RabbitMQの管理画面の Channels を見ると以下のようになります。
Channels.png

SimpleMessageListenerContainer 毎に TCP のコネクションが貼られ、1本のコネクションに対して concurrentConsumers で指定した数の Channel ができます。globalオプションを設定しているため、Prefetch の表示が (global) となります。

Channel#basicQos を global オプション ture で設定後、global オプション false で設定すると、キュー毎の数の制限と、チャネル全体の数の制限をそれぞれ個別に設定することができます。ただし、Spring AMQP には両方同時に設定するインタフェースは用意されていません。

ChannelListener#onCreate を以下のように設定すれば、キュー毎の数の制限値も設定することは可能です。

  	connectionFactory.addChannelListener(
  			// ChannelListener#onCreate
  			(channel,transactional)->{
  				try {
  					if (consumerLimit != 0) {
  						channel.basicQos(consumerLimit,false); // Per consumer limit
  					}
  				} catch (IOException e) {
  					throw new AmqpIOException(e);
  				}
  			}); 

送信側(Producer)

キューの送信は RabbitTemplate を使用します。メッセージが欠落しないように確実にキューに格納したいのであれば、確認応答を使用します。

setPublisherConfirmTypeCORRELATED を設定して確認応答を有効にして、setConfirmCallback で確認応答を受け取ります。

また、送信時のエラーを完全に捕捉するには setPublisherReturnssetMandatorytrue に設定し、setReturnsCallback でエラーを受け取ります。未定義のキューへの送信など、ルーティングの異常で送信できなかった場合には、このコールバックでエラーを捕捉する必要があります。

MQProcucer.java
import java.util.concurrent.ExecutionException;

import org.springframework.amqp.AmqpException;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

@Component
public class MQProducer {
	private RabbitTemplate rabbitTemplate;
	private long publishWaitMillisecond = 5000L;

	public MQProducer(CachingConnectionFactory connectionFactory) {

		this.rabbitTemplate = new RabbitTemplate();
		this.rabbitTemplate.setConnectionFactory(connectionFactory);

		rabbitTemplate.setConfirmCallback( //ConfirmCallback#confirm
				(correlationData, ack, cause) -> {
					System.out.println(
							"confirm: id=" + correlationData.getId() + 
							" ack=" + ack + 
							" cause=" + cause);
				});

		rabbitTemplate.setMandatory(true);
		rabbitTemplate.setReturnsCallback( //ReturnsCallback#returnedMessage
				(returned) -> {
					byte[] body = returned.getMessage().getBody();
					String sendData = new String(body);
					String queueName =  returned.getRoutingKey();
					String replyText = returned.getReplyText();
					int code = returned.getReplyCode();
					System.out.println("error: queueName=" + queueName + 
							" sendData=" + sendData +
							" replyText=" + replyText + " code=" + code);
				});
	}

invoke メソッドを使って、送信と確認応答待ちを行います。
convertAndSend でメッセージを送信後、waitForConfirms で確認応答を待ちます。

確認応答は一定時間確認応答がなければ例外となり送信は失敗します。失敗した送信は再送することになりますが、確認応答が遅延しているだけでキューへの格納は遅れて成功している場合があります。

この場合、送信は重複することになるため、重複を許容したくない場合は、シーケンス番号やユニークIDで送受信を管理して、受信側で重複データを無視する対応が必要です。

この方法の場合、送信毎に確認応答を待ち合わせるため、連続して大量のメッセージを送信する場合にはスループットが上がらないことに注意してください。

連続して大量にメッセージを送信する場合は、waitForConfirms を使わずに、非同期で受けた setConfirmCallback で送信データとの対応付を行い、後から、確認応答がない送信データを再送するなどの手段を取ります。送信データと確認応答の対応付けには、CorrelationData を使用します。

MQProcucer.java
	public void publish(String queueName, String sendData) throws MQException { 
		
		byte[] message = sendData.getBytes();

		try {
			CorrelationData correlationData = new CorrelationData();
			System.out.println("publish: queueName=" + queueName +
					" id=" + correlationData.getId() +
					" sendData=" + sendData);

			rabbitTemplate.invoke( //OperationsCallback#doInRabbit
					operations -> {
						operations.convertAndSend(queueName, message, correlationData);
						return operations.waitForConfirms(publishWaitMillisecond);
					});

			if (!correlationData.getFuture().get().isAck()) {
				throw new MQException("publish error");
			}

		} catch (AmqpException e) {
			throw new MQException(e.getMessage(), e);

		} catch (InterruptedException e) {
			Thread.currentThread().interrupt();

		} catch (ExecutionException e) {
			throw new MQException(e.getMessage(), e);
		}
	}

MQProducerの利用サンプルは以下のとおりです。
ImportResorceでRabbitMQの接続情報を読み込んで、MQProducerをインジェクションしています。

MQProducer#publish を使ってキューを送信しています。
"unknown"は未定義のキューへの送信で、setReturnsCallbackが呼ばれることが確認できます。

ProducerApp.java
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;

@SpringBootApplication
@ImportResource({"classpath:rabbit.xml"})
public class ProducerApp implements CommandLineRunner{
	
	@Autowired
	MQProducer producer;
	
	public static void main(String[] args) {
		SpringApplication app = new SpringApplication(ProducerApp.class);
		app.run(args);
	}

	public void run(String... args) throws MQException {

		try {
			for(int n=0; n < 5; n++) {
				producer.publish("my-queue1", "test data my-queue1:" + n);
				producer.publish("my-queue2", "test data my-queue2:" + n);
				producer.publish("my-queue3", "test data my-queue3:" + n);
			}

			producer.publish("unknown", "test data unknown");

		}finally {
			producer.stop();
		}
	}

}

まとめ

Spring AMQP では多数のクラスメソッドが用意されており、色々なやり方がインターネットで紹介されているため、実装に迷うことが多いと思います。この記事で紹介したサンプルも参考としていただければ幸いです。

サンプルは Github でも公開していますので、参考にしてください。

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0