はじめに
Spring Boot アプリケーションから AWS SQS を扱う際に便利なライブラリが、Spring Cloud AWS です。
このライブラリの @SqsListener
アノテーションを使えば、SQS キューからのメッセージ受信処理を簡単に実装できます。
さらに、このアノテーションには acknowledgementMode
というパラメータがあり、受信したメッセージをいつ削除するかを柔軟に制御できます。
本記事では、Spring Cloud AWS v3系における acknowledgementMode
の各モードの挙動を整理します。
SqsListener とは
@SqsListener
は Spring Cloud AWS に含まれるアノテーションであり、
Spring Boot アプリケーション上で SQS キューからのメッセージをポーリングして受信・処理するために使われます。
指定した SQS キュー名に対して、自動的にポーリングが行われ、メッセージが到着すると処理用メソッドが呼び出されます。
Spring Boot でどうやって使う?
1. 依存ライブラリの追加
以下の2つの依存関係を pom.xml
に追加します(Spring Cloud AWS 3系を使用)。
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter</artifactId>
<version>3.2.0</version>
</dependency>
<dependency>
<groupId>io.awspring.cloud</groupId>
<artifactId>spring-cloud-aws-starter-sqs</artifactId>
<version>3.2.0</version>
</dependency>
2. リスナークラスの作成
受信したメッセージを処理する関数を定義したクラスを用意します。
このクラスには @Component
を付与し、Spring に登録します。
import org.springframework.stereotype.Component;
@Component
public class TestSqsListener {
public TestSqsListener () {
System.out.println("TestSqsListener インスタンスを生成");
}
}
3. メッセージ受信メソッドの実装
SQS から受信したメッセージを処理するメソッドを定義します。
ここではシンプルにメッセージ内容をログ出力しています。
import org.springframework.stereotype.Component;
@Component
public class TestSqsListener {
public TestSqsListener () {
System.out.println("TestSqsListener インスタンスを生成");
}
// 受信したメッセージを処理する関数
public void receiveMessageFromQueue (String message) {
System.out.println("Received a message from SQS: "+ message);
}
}
この関数に、@SqsListener
アノテーションを付与します。
(Spring Cloud AWS v3.x 系では、@SqsListener
は io.awspring.cloud.sqs.annotation.SqsListener
をインポートして使用します。)
アノテーションの value
に、対象の SQS キュー名を指定します。
import io.awspring.cloud.sqs.annotation.SqsListener; // Spring Cloud AWS SQS Listener アノテーション
import org.springframework.stereotype.Component;
@Component
public class TestSqsListener {
public TestSqsListener () {
System.out.println("TestSqsListener インスタンスを生成");
}
@SqsListener(value = "test-queue-spring-boot") // SqsListener アノテーションを付与
public void receiveMessageFromQueue (String message) {
System.out.println("Received a message from SQS: "+ message);
}
}
やることはこれだけです!
簡単ですね~。
メッセージの受信ログ
メッセージを受信すると、関数に定義した処理が実行されます。
SQSにメッセージをセットしておきます。
Spring Boot アプリケーションを実行します。
アプリを実行すると、Spring Cloud AWS がバックグラウンドで自動的にキューをポーリングし、SqsListener アノテーションが付与された関数を実行します。
その様子は以下のような DEBUG ログに現れます。
2025-07-27T15:05:42.535+09:00 DEBUG 5968 --- [demo] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Trying to acquire full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0
2025-07-27T15:05:42.776+09:00 DEBUG 5968 --- [demo] [nc-response-1-1] i.a.c.s.l.s.AbstractSqsMessageSource : Received 1 messages from queue https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxx/test-queue-spring-boot
2025-07-27T15:05:42.931+09:00 DEBUG 5968 --- [demo] [nc-response-1-1] i.a.c.s.l.SemaphoreBackPressureHandler : 9 unused permit(s), setting TM HIGH for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0
Received a message from SQS: test message 1
2025-07-27T15:05:42.995+09:00 DEBUG 5968 --- [demo] [ntContainer#0-1] i.a.c.s.l.s.AbstractPollingMessageSource : Releasing permit for queue test-queue-spring-boot
2025-07-27T15:05:42.996+09:00 DEBUG 5968 --- [demo] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquired full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0
2025-07-27T15:05:42.996+09:00 DEBUG 5968 --- [demo] [essage_source-2] i.a.c.s.l.s.AbstractSqsMessageSource : Polling queue https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxx/test-queue-spring-boot for 10 messages.
キューにメッセージが存在しない場合は、以下のように「0件取得」ログが出力され、受信処理メソッドは実行されません。
2025-07-27T15:05:53.118+09:00 DEBUG 5968 --- [demo] [nc-response-1-3] i.a.c.s.l.s.AbstractSqsMessageSource : Received 0 messages from queue https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxx/test-queue-spring-boot
acknowledgementMode の概要
記事の冒頭でも触れたように、@SqsListener
の acknowledgementMode
パラメータを使うと、 受信したメッセージをどのタイミングで削除するか を制御できます。
(例:処理成功時のみ削除、常に削除、自分で削除処理を実装 など)
acknowledgementMode
に設定する SqsListenerAcknowledgementMode
は、
io.awspring.cloud.sqs.annotation.SqsListenerAcknowledgementMode
に定義されています。
このパラメータには、以下の3つのオプションが用意されています。
SqsListenerAcknowledgementMode.ON_SUCCESS
SqsListenerAcknowledgementMode.ALWAYS
SqsListenerAcknowledgementMode.MANUAL
それぞれのオプションにより、メッセージ削除のタイミングが異なります。
次のセクションで、それぞれの挙動について詳しく解説していきます。
各オプションの挙動
SqsListenerAcknowledgementMode.ON_SUCCESS
SqsListenerAcknowledgementMode.ON_SUCCESS
は、@SqsListener
が付与されたメソッドの処理が正常に終了した場合のみ、SQS から受信したメッセージを削除します。
以下は設定例です。
import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.annotation.SqsListenerAcknowledgementMode;
import org.springframework.stereotype.Component;
@Component
public class TestSqsListener {
public TestSqsListener () {
System.out.println("TestSqsListener インスタンスを生成");
}
@SqsListener(value = "test-queue-spring-boot", acknowledgementMode = SqsListenerAcknowledgementMode.ON_SUCCESS)
public void receiveMessageFromQueue (String message) {
System.out.println("Received a message from SQS: "+ message);
}
}
アプリケーション実行前の SQS キューの状態は以下の通りです。
この状態でアプリケーションを起動すると、受信したメッセージは正常に処理され、削除されます。
一方で、処理中に例外が発生した場合 は、メッセージは削除されずにキューに残ります。
以下のように意図的に例外をスローして動作を確認してみます。
@SqsListener(value = "test-queue-spring-boot", acknowledgementMode = SqsListenerAcknowledgementMode.ON_SUCCESS)
public void receiveMessageFromQueue (String message) {
System.out.println("Received a message from SQS: "+ message);
throw new RuntimeException("これは意図的な例外です");
}
アプリケーションを起動すると、以下のようなログが出力され、例外が発生したことが分かります。
2025-08-01T08:39:21.648+09:00 DEBUG 14716 --- [demo] [nc-response-0-2] i.a.c.s.l.s.AbstractSqsMessageSource : Received 1 messages from queue https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/test-queue-spring-boot
Received a message from SQS: test message 17
2025-08-01T08:39:21.651+09:00 DEBUG 14716 --- [demo] [ntContainer#0-2] i.a.c.s.l.s.AbstractPollingMessageSource : Releasing permit for queue test-queue-spring-boot
2025-08-01T08:39:21.653+09:00 DEBUG 14716 --- [demo] [essage_source-2] i.a.c.s.l.s.AbstractSqsMessageSource : Polling queue https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/test-queue-spring-boot for 10 messages.
2025-08-01T08:39:21.653+09:00 ERROR 14716 --- [demo] [ntContainer#0-2] .s.AbstractMessageProcessingPipelineSink : Error processing message 035fece8-3bf9-439b-92a2-d617a21418c6.
java.util.concurrent.CompletionException: io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException: Error executing action in BlockingMessageListenerAdapter
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:674) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2200) ~[na:na]
at io.awspring.cloud.sqs.listener.pipeline.MessageListenerExecutionStage.process(MessageListenerExecutionStage.java:49) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$ComposingMessagePipelineStage.lambda$process$0(MessageProcessingPipelineBuilder.java:80) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2341) ~[na:na]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$ComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:80) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.sink.AbstractMessageProcessingPipelineSink.lambda$execute$0(AbstractMessageProcessingPipelineSink.java:99) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException: Error executing action in BlockingMessageListenerAdapter
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.wrapWithBlockingException(AsyncComponentAdapters.java:162) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.runInSameThread(AsyncComponentAdapters.java:124) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.execute(AsyncComponentAdapters.java:111) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$BlockingMessageListenerAdapter.onMessage(AsyncComponentAdapters.java:208) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
... 14 common frames omitted
Caused by: io.awspring.cloud.sqs.listener.ListenerExecutionFailedException: Listener failed to process messages 035fece8-3bf9-439b-92a2-d617a21418c6
at io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter.createListenerException(AbstractMethodInvokingListenerAdapter.java:79) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter.createListenerException(AbstractMethodInvokingListenerAdapter.java:83) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter.invokeHandler(AbstractMethodInvokingListenerAdapter.java:59) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:41) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$BlockingMessageListenerAdapter.lambda$onMessage$0(AsyncComponentAdapters.java:208) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.runInSameThread(AsyncComponentAdapters.java:120) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
... 16 common frames omitted
Caused by: java.lang.RuntimeException: これは意図的な例外です
この場合、メッセージは削除されず、SQS キューに残ります。
ただし、SQS の可視性タイムアウト(Visibility Timeout) の間は一時的にメッセージが非表示になるため、一定時間待ってから再度ポーリングすることで確認できます。
なお、例外が発生して関数の処理が失敗した場合、メッセージは削除されませんが、
関数内で例外を catch して処理が正常に終了した場合は「成功」とみなされ、メッセージは削除されます。
たとえば、以下のように RuntimeException
が try ブロック内で発生したとしても、catch ブロックで例外を処理すれば、関数全体としては「正常終了」と判断されます。
@SqsListener(value = "test-queue-spring-boot", acknowledgementMode = SqsListenerAcknowledgementMode.ON_SUCCESS)
public void receiveMessageFromQueue (String message) {
try {
System.out.println("Received a message from SQS: "+ message);
// 意図的に例外をスロー
throw new RuntimeException("これは意図的な例外です");
} catch (RuntimeException e) {
System.out.println("例外が発生しました:" + e);
}
}
SqsListenerAcknowledgementMode.ALWAYS
SqsListenerAcknowledgementMode.ALWAYS
は、@SqsListener
が付与されたメソッドの 処理の成否に関係なく、常にメッセージを削除するモードです。
失敗時でもメッセージを残したくないユースケースに向いています。
以下は設定例です。
@SqsListener(value = "test-queue-spring-boot", acknowledgementMode = SqsListenerAcknowledgementMode.ALWAYS)
public void receiveMessageFromQueue (String message) {
System.out.println("Received a message from SQS: "+ message);
}
処理成功時にメッセージを削除する点は ON_SUCCESS
と共通ですが、
ALWAYS
では 例外が発生した場合でもメッセージが削除される という違いがあります。
次の例では、意図的に RuntimeException
をスローしていますが、
この場合でもメッセージは SQS キューから削除されます。
@SqsListener(value = "test-queue-spring-boot", acknowledgementMode = SqsListenerAcknowledgementMode.ALWAYS)
public void receiveMessageFromQueue (String message) {
System.out.println("Received a message from SQS: "+ message);
throw new RuntimeException("これは意図的な例外です");
}
アプリケーションを起動すると、以下のようなログが出力され、例外が発生したことが分かります。
2025-08-02T15:12:04.887+09:00 DEBUG 10180 --- [demo] [nc-response-1-1] i.a.c.s.l.SemaphoreBackPressureHandler : 9 unused permit(s), setting TM HIGH for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0
Received a message from SQS: test message 18
2025-08-02T15:12:04.901+09:00 DEBUG 10180 --- [demo] [ntContainer#0-1] i.a.c.s.l.s.AbstractPollingMessageSource : Releasing permit for queue test-queue-spring-boot
2025-08-02T15:12:04.901+09:00 DEBUG 10180 --- [demo] [essage_source-2] i.a.c.s.l.SemaphoreBackPressureHandler : Acquired full permits for io.awspring.cloud.sqs.sqsListenerEndpointContainer#0-0. Permits left: 0
2025-08-02T15:12:04.906+09:00 DEBUG 10180 --- [demo] [essage_source-2] i.a.c.s.l.s.AbstractSqsMessageSource : Polling queue https://sqs.ap-northeast-1.amazonaws.com/xxxxxxxxxx/test-queue-spring-boot for 10 messages.
2025-08-02T15:12:04.907+09:00 ERROR 10180 --- [demo] [ntContainer#0-1] .s.AbstractMessageProcessingPipelineSink : Error processing message d7d39724-70e3-4aa2-ae42-9c4790551177.
java.util.concurrent.CompletionException: io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException: Error executing action in BlockingMessageListenerAdapter
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:332) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.uniApplyNow(CompletableFuture.java:674) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.uniApplyStage(CompletableFuture.java:662) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.thenApply(CompletableFuture.java:2200) ~[na:na]
at io.awspring.cloud.sqs.listener.pipeline.MessageListenerExecutionStage.process(MessageListenerExecutionStage.java:49) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$ComposingMessagePipelineStage.lambda$process$0(MessageProcessingPipelineBuilder.java:80) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.CompletableFuture.uniComposeStage(CompletableFuture.java:1187) ~[na:na]
at java.base/java.util.concurrent.CompletableFuture.thenCompose(CompletableFuture.java:2341) ~[na:na]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$ComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:80) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.pipeline.MessageProcessingPipelineBuilder$FutureComposingMessagePipelineStage.process(MessageProcessingPipelineBuilder.java:104) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.sink.AbstractMessageProcessingPipelineSink.lambda$execute$0(AbstractMessageProcessingPipelineSink.java:99) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144) ~[na:na]
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642) ~[na:na]
at java.base/java.lang.Thread.run(Thread.java:1583) ~[na:na]
Caused by: io.awspring.cloud.sqs.listener.AsyncAdapterBlockingExecutionFailedException: Error executing action in BlockingMessageListenerAdapter
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.wrapWithBlockingException(AsyncComponentAdapters.java:162) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.runInSameThread(AsyncComponentAdapters.java:124) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.execute(AsyncComponentAdapters.java:111) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$BlockingMessageListenerAdapter.onMessage(AsyncComponentAdapters.java:208) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
... 14 common frames omitted
Caused by: io.awspring.cloud.sqs.listener.ListenerExecutionFailedException: Listener failed to process messages d7d39724-70e3-4aa2-ae42-9c4790551177
at io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter.createListenerException(AbstractMethodInvokingListenerAdapter.java:79) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter.createListenerException(AbstractMethodInvokingListenerAdapter.java:83) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.adapter.AbstractMethodInvokingListenerAdapter.invokeHandler(AbstractMethodInvokingListenerAdapter.java:59) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:41) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$BlockingMessageListenerAdapter.lambda$onMessage$0(AsyncComponentAdapters.java:208) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
at io.awspring.cloud.sqs.listener.AsyncComponentAdapters$AbstractThreadingComponentAdapter.runInSameThread(AsyncComponentAdapters.java:120) ~[spring-cloud-aws-sqs-3.2.0.jar:3.2.0]
... 16 common frames omitted
Caused by: java.lang.RuntimeException: これは意図的な例外です
処理が失敗していても、SQS キューからメッセージが削除されていることを確認できます。
SqsListenerAcknowledgementMode.MANUAL
SqsListenerAcknowledgementMode.MANUAL
は、@SqsListener
が付与されたメソッドの 処理の成否に関わらず、メッセージを自動では削除しない モードです。
メッセージの削除は、アプリケーションコード側で手動で行う必要があります。
以下は設定例です。
import io.awspring.cloud.sqs.annotation.SqsListener;
import io.awspring.cloud.sqs.annotation.SqsListenerAcknowledgementMode;
import io.awspring.cloud.sqs.listener.acknowledgement.Acknowledgement;
import org.springframework.stereotype.Component;
@Component
public class TestSqsListener {
public TestSqsListener () {
System.out.println("TestSqsListener インスタンスを生成");
}
@SqsListener(value = "test-queue-spring-boot", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL)
public void receiveMessageFromQueue (String message, Acknowledgement acknowledgement) {
System.out.println("Received a message from SQS: "+ message);
acknowledgement.acknowledge(); // 明示的に削除
}
}
このように、io.awspring.cloud.sqs.listener.acknowledgement
パッケージにある Acknowledgement
クラス型の引数をメソッドに追加し、acknowledge()
を呼び出すことでメッセージを削除します。
この呼び出しがなければ、SQS キュー上のメッセージは削除されません。
MANUAL
モードは、以下のような 柔軟な制御が必要なケース で活用できます。
例外の種類に応じてメッセージ削除の有無を切り替えたい場合、など
たとえば以下のように、特定の例外のみ削除対象とし、それ以外はキューにメッセージを残す、といった制御も可能です。
@SqsListener(value = "test-queue-spring-boot", acknowledgementMode = SqsListenerAcknowledgementMode.MANUAL)
public void receiveMessageFromQueue(String message, Acknowledgement acknowledgement) {
try {
process(message);
acknowledgement.acknowledge(); // 成功時のみ削除
} catch (NonRetriableException e) {
// リトライ不要な例外なら削除
acknowledgement.acknowledge();
} catch (Exception e) {
// リトライ可能な例外なら削除せずに再ポーリング対象とする
System.out.println("処理失敗。メッセージは残します: " + e.getMessage());
}
}
設定しなかった場合の挙動
@SqsListener
の acknowledgementMode
に値を指定しなかった場合、明確に公式ドキュメントに記載されてはいませんが、検証の結果、デフォルトでは SqsListenerAcknowledgementMode.ON_SUCCESS
と同じ挙動になることが確認できました。
(ドキュメントに記載されていましたら教えてください!)
そのため、以下のような動作になります。
- 関数の処理が正常に終了した場合:メッセージは SQS キューから削除される
- 関数の処理が例外などで失敗した場合:メッセージは削除されず、キューに残る(可視性タイムアウト後に再ポーリング対象となる)
実際の検証で使用したコード例は以下です。
・関数処理成功時のコード例
@SqsListener(value = "test-queue-spring-boot")
public void receiveMessageFromQueue(String message) {
System.out.println("Received a message from SQS: " + message);
}
・関数処理失敗時のコード例
@SqsListener(value = "test-queue-spring-boot")
public void receiveMessageFromQueue (String message) {
System.out.println("Received a message from SQS: "+ message);
throw new RuntimeException("これは意図的な例外です");
}
さいごに
本記事では、Spring Cloud AWS の @SqsListener
における acknowledgementMode
の各オプションについて、動作確認を通じて整理しました。
-
ON_SUCCESS
:正常に処理が終わった場合のみメッセージを削除 -
ALWAYS
:処理の成否にかかわらずメッセージを削除 -
MANUAL
:削除タイミングをアプリケーション側で制御 - 未指定時:
ON_SUCCESS
と同じ挙動(※検証により確認)
アプリケーションによっては、処理の失敗時にメッセージを残したい/消したいといったニーズがあり、その制御が重要になる場面もあるかと思います。
本記事の内容が、そのような場面での選択の一助になれば幸いです。
なお、実際の運用では 可視性タイムアウトの設定 や メッセージのリトライ制御 なども重要な要素となるため、今後はそれらについても整理していければと考えています。
参考資料