はじめに
SQSのメッセージ登録や削除を行う際、バッチアクションを使用すると、1件ずつリクエストを送信するのと比較して処理時間やコストを抑えられます。
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-batch-api-actions.html
AWS SDK for Java 2.xを使用してこれらの操作を行うためのソースコードを作成しました。
メッセージ送信側
public class SqsSendMessage {
private SqsClient sqsClient = createSqsClient();
private Random random = new Random();
public SqsClient createSqsClient() {
return SqsClient.builder()
.region(Region.AP_NORTHEAST_1)
.build();
}
public void sendMessage(String queueUrl, List<String> messages) {
AtomicInteger counter = new AtomicInteger();
List<SendMessageBatchRequestEntry> entries = messages.stream()
.map(message -> createSendMessageBatchRequestEntry(message,
Integer.toString(counter.incrementAndGet()))
).collect(Collectors.toUnmodifiableList());
SendMessageBatchResponse response = sqsClient.sendMessageBatch(
SendMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entries)
.build());
if (!response.failed().isEmpty()) {
System.out.println("SQS登録に失敗): " + response.failed());
}
List<String> messageIds = response.successful().stream()
.map(SendMessageBatchResultEntry::messageId)
.collect(Collectors.toUnmodifiableList());
System.out.println("messageId: " + messageIds);
}
public SendMessageBatchRequestEntry createSendMessageBatchRequestEntry(
String messageBody, String id) {
return SendMessageBatchRequestEntry.builder()
.messageBody(messageBody)
.id(id)
.messageGroupId(messageGroupId())
.messageDeduplicationId(UUID.randomUUID().toString())
.build();
}
public String messageGroupId() {
return Integer.toString(random.nextInt(9999));
}
public static void main(String[] args) {
SqsSendMessage sqsSendMessageBatchRequest = new SqsSendMessage();
sqsSendMessageBatchRequest.sendMessage("", List.of());
}
}
メッセージ受信側
public class SqsReceiveMessage {
private SqsClient sqsClient = createSqsClient();
public SqsClient createSqsClient() {
return SqsClient.builder()
.region(Region.AP_NORTHEAST_1)
.build();
}
public void process(String queueUrl) {
List<Message> messages = receiveMessage(queueUrl);
if (!messages.isEmpty()) {
System.out.println("キューにメッセージがありません。");
return;
}
// 受信したメッセージを使用して何らかの処理を行う
deleteMessage(queueUrl, messages);
}
List<Message> receiveMessage(String queueUrl,
String... messageAttributeNames) {
ReceiveMessageRequest receiveMessageRequest =
ReceiveMessageRequest.builder()
.queueUrl(queueUrl)
.waitTimeSeconds(20)
.maxNumberOfMessages(10)
.messageAttributeNames(messageAttributeNames)
.attributeNames(QueueAttributeName.ALL)
.build();
return sqsClient.receiveMessage(receiveMessageRequest).messages();
}
public void deleteMessage(String queueUrl, List<Message> messages) {
AtomicInteger counter = new AtomicInteger();
List<DeleteMessageBatchRequestEntry> entries = messages.stream()
.map(message -> createDeleteMessageBatchRequestEntry(
message.receiptHandle(),
Integer.toString(counter.incrementAndGet())))
.collect(Collectors.toUnmodifiableList());
DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(
DeleteMessageBatchRequest.builder()
.queueUrl(queueUrl)
.entries(entries)
.build());
if (!response.failed().isEmpty()) {
System.out.println("SQSメッセージ削除に失敗: " + response.failed());
}
}
public DeleteMessageBatchRequestEntry createDeleteMessageBatchRequestEntry(
String receiptHandle, String id) {
return DeleteMessageBatchRequestEntry.builder()
.id(id)
.receiptHandle(receiptHandle)
.build();
}
}