SQSにメッセージ送信するやつ
package com.example;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.GetQueueUrlRequest;
import software.amazon.awssdk.services.sqs.model.SqsException;
import software.amazon.awssdk.services.sqs.model.CreateQueueRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SqsThreadSendMessage {
private static final int FILE_COUNT = 10000; // ファイル数
private static final int THREAD_COUNT = 100; // 並列処理数
public static void main(String[] args) {
LocalDateTime nowDate = LocalDateTime.now();
System.err.println("start : " + nowDate);
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(THREAD_COUNT);
AtomicInteger ai = new AtomicInteger(0);
String message = "dDMycmZib3d1Zmg5dWg0cjhmdWgyNDk4cmY4MDI0dHI4NGh0ODl1Z2hnOTUzaHRnOGgzcmdoM3J1aWdodXIzaGc5ODNnODM1anUwamc0OGlyOXVnaDNydTloZzl1cjNoZzkzaHJnOTgzaGd1OWgzOWdoMzVoZzhoZ3JoZ3U5aDlndWgzOWgzOTgzOTg0dGYzZ2ZobnUzNGg4OXFoNDh0aDQzOGh0ODM0NXQ1NHV5OHQ0NXU0ODl0ajM4MDk0NWp0MzRnMDUz";
String queueName = "kick_lambda";
SqsClient sqsClient = SqsClient.builder()
.region(Region.AP_NORTHEAST_1)
.build();
// CompletableFutureで並列にファイルを取得
List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, FILE_COUNT)
.mapToObj(fileIndex -> CompletableFuture.runAsync(() -> {
Random random = new Random();
sendMessage(sqsClient, queueName, message + random);
int nowcnt = ai.incrementAndGet();
if(nowcnt % 500 == 0 ){
LocalDateTime nowcntDate = LocalDateTime.now();
System.err.println(nowcnt + " times : " + nowcntDate);
}
}, executor))
.collect(Collectors.toList());
// 全ての非同期処理の結果を集約
futures.stream()
.map(CompletableFuture::join) // 各非同期処理の完了を待つ
.collect(Collectors.toList());
sqsClient.close();
// 取得した結果を表示(または他の処理に使用)
LocalDateTime endDate = LocalDateTime.now();
System.err.println("end : " + endDate);
executor.shutdown();
}
public static void sendMessage(SqsClient sqsClient, String queueName, String message) {
try {
CreateQueueRequest request = CreateQueueRequest.builder()
.queueName(queueName)
.build();
sqsClient.createQueue(request);
GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder()
.queueName(queueName)
.build();
String queueUrl = sqsClient.getQueueUrl(getQueueRequest).queueUrl();
SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
.queueUrl(queueUrl)
.messageBody(message)
.delaySeconds(1)
.build();
SendMessageResponse res = sqsClient.sendMessage(sendMsgRequest);
} catch (SqsException e) {
System.err.println(e.awsErrorDetails().errorMessage());
System.exit(1);
}
}
}