2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

AWS SDK for Java 2.xでSQSバッチアクション処理

Posted at

はじめに

SQSのメッセージ登録や削除を行う際、バッチアクションを使用すると、1件ずつリクエストを送信するのと比較して処理時間やコストを抑えられます。
https://docs.aws.amazon.com/ja_jp/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-batch-api-actions.html
AWS SDK for Java 2.xを使用してこれらの操作を行うためのソースコードを作成しました。

メッセージ送信側

public class SqsSendMessage {

    private SqsClient sqsClient = createSqsClient();

    private Random random = new Random();

    public SqsClient createSqsClient() {
        return SqsClient.builder()
                .region(Region.AP_NORTHEAST_1)
                .build();
    }

    public void sendMessage(String queueUrl, List<String> messages) {
        AtomicInteger counter = new AtomicInteger();
        List<SendMessageBatchRequestEntry> entries = messages.stream()
                .map(message -> createSendMessageBatchRequestEntry(message,
                        Integer.toString(counter.incrementAndGet()))
                ).collect(Collectors.toUnmodifiableList());

        SendMessageBatchResponse response = sqsClient.sendMessageBatch(
                SendMessageBatchRequest.builder()
                        .queueUrl(queueUrl)
                        .entries(entries)
                        .build());

        if (!response.failed().isEmpty()) {
            System.out.println("SQS登録に失敗): " + response.failed());
        }

        List<String> messageIds = response.successful().stream()
                .map(SendMessageBatchResultEntry::messageId)
                .collect(Collectors.toUnmodifiableList());
        System.out.println("messageId: " + messageIds);
    }

    public SendMessageBatchRequestEntry createSendMessageBatchRequestEntry(
            String messageBody, String id) {
        return SendMessageBatchRequestEntry.builder()
                .messageBody(messageBody)
                .id(id)
                .messageGroupId(messageGroupId())
                .messageDeduplicationId(UUID.randomUUID().toString())
                .build();
    }

    public String messageGroupId() {
        return Integer.toString(random.nextInt(9999));
    }

    public static void main(String[] args) {
        SqsSendMessage sqsSendMessageBatchRequest = new SqsSendMessage();
        sqsSendMessageBatchRequest.sendMessage("", List.of());
    }
}

メッセージ受信側

public class SqsReceiveMessage {

    private SqsClient sqsClient = createSqsClient();

    public SqsClient createSqsClient() {
        return SqsClient.builder()
                .region(Region.AP_NORTHEAST_1)
                .build();
    }

    public void process(String queueUrl) {
        List<Message> messages = receiveMessage(queueUrl);
        if (!messages.isEmpty()) {
            System.out.println("キューにメッセージがありません。");
            return;
        }

        // 受信したメッセージを使用して何らかの処理を行う

        deleteMessage(queueUrl, messages);
    }

    List<Message> receiveMessage(String queueUrl,
            String... messageAttributeNames) {
        ReceiveMessageRequest receiveMessageRequest =
                ReceiveMessageRequest.builder()
                        .queueUrl(queueUrl)
                        .waitTimeSeconds(20)
                        .maxNumberOfMessages(10)
                        .messageAttributeNames(messageAttributeNames)
                        .attributeNames(QueueAttributeName.ALL)
                        .build();
        return sqsClient.receiveMessage(receiveMessageRequest).messages();
    }

    public void deleteMessage(String queueUrl, List<Message> messages) {
        AtomicInteger counter = new AtomicInteger();
        List<DeleteMessageBatchRequestEntry> entries = messages.stream()
                .map(message -> createDeleteMessageBatchRequestEntry(
                        message.receiptHandle(),
                        Integer.toString(counter.incrementAndGet())))
                .collect(Collectors.toUnmodifiableList());

        DeleteMessageBatchResponse response = sqsClient.deleteMessageBatch(
                DeleteMessageBatchRequest.builder()
                    .queueUrl(queueUrl)
                    .entries(entries)
                    .build());

        if (!response.failed().isEmpty()) {
            System.out.println("SQSメッセージ削除に失敗: " + response.failed());
        }
    }

    public DeleteMessageBatchRequestEntry createDeleteMessageBatchRequestEntry(
            String receiptHandle, String id) {
        return DeleteMessageBatchRequestEntry.builder()
                .id(id)
                .receiptHandle(receiptHandle)
                .build();
    }
}
2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?