ConcurrentHashMapを使用する場合でも更新をする場合はatomicなメソッドを使わないといけない
putIfAbsentで初期値を代入し、computeIfPresentで更新を行う。
package com.example;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class S3ParallelFileDownloaderWithCompletableFuture {
private static final String BUCKET_NAME = "mmnkbucket";
private static final String FILE_PREFIX = "future/"; // ファイルの連番部分以外の名前
private static final int FILE_COUNT = 1000000; // ファイル数
private static final int THREAD_COUNT = 10; // 並列処理数
public static void main(String[] args) {
LocalDateTime nowDate = LocalDateTime.now();
System.err.println("start : " + nowDate);
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(THREAD_COUNT);
int min = 1;
int max = 5;
Random random = new Random();
// CompletableFutureで並列にファイルを取得
List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, FILE_COUNT)
.mapToObj(fileIndex -> CompletableFuture.runAsync(() -> {
int value = random.nextInt(max + min) + min;
map.putIfAbsent(Integer.toString(value), 0);
map.computeIfPresent(Integer.toString(value), (k, v) -> ++v);
}, executor))
.collect(Collectors.toList());
// 全ての非同期処理の結果を集約
futures.stream()
.map(CompletableFuture::join) // 各非同期処理の完了を待つ
.collect(Collectors.toList());
int count = 0;
map.forEach(1, (key, value) -> {
System.err.println(key + ", " + value);
});
for (Map.Entry<String, Integer> entry : map.entrySet()){
count = count + entry.getValue();
}
System.err.println(count);
// 取得した結果を表示(または他の処理に使用)
LocalDateTime endDate = LocalDateTime.now();
System.err.println("end : " + endDate);
executor.shutdown();
}
}