AWS SDK2でのS3取得の並列化のメモ
S3Client内部でスレッドプールを持っているのでS3Clientを複数持つ必要はなく、要求部分を並列化するだけでよい。
package com.example;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class S3ParallelFileDownloaderWithCompletableFuture {
private static final String BUCKET_NAME = "bucket";
private static final String FILE_PREFIX = "key/"; // ファイルの連番部分以外の名前
private static final int FILE_COUNT = 10; // ファイル数
private static final int THREAD_COUNT = 3; // 並列処理数
public static void main(String[] args) {
LocalDateTime nowDate = LocalDateTime.now();
System.err.println("start : " + nowDate);
S3Client s3Client = S3Client.builder()
.region(Region.AP_NORTHEAST_1) // 必要に応じてリージョンを変更
.build();
ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(THREAD_COUNT);
// CompletableFutureで並列にファイルを取得
List<CompletableFuture<String>> futures = IntStream.rangeClosed(1, FILE_COUNT)
.mapToObj(fileIndex -> CompletableFuture.supplyAsync(() -> {
String str = downloadFile(s3Client, fileIndex);
LocalDateTime endDate = LocalDateTime.now();
System.err.println("download end : " + endDate);
return str;
}, executor))
.collect(Collectors.toList());
// 全ての非同期処理の結果を集約
List<String> fileContents = futures.stream()
.map(CompletableFuture::join) // 各非同期処理の完了を待つ
.collect(Collectors.toList());
// 取得した結果を表示(または他の処理に使用)
fileContents.forEach(content -> System.out.println("File Content: " + content));
LocalDateTime endDate = LocalDateTime.now();
System.err.println("end : " + endDate);
executor.shutdown();
}
private static String downloadFile(S3Client s3Client, int fileIndex) {
System.err.println("s3Client instance : " + s3Client.toString());
LocalDateTime startDate = LocalDateTime.now();
System.err.println("download start : " + startDate);
String fileName = FILE_PREFIX + fileIndex + ".txt"; // 連番ファイル名
GetObjectRequest getObjectRequest = GetObjectRequest.builder()
.bucket(BUCKET_NAME)
.key(fileName)
.build();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(s3Client.getObject(getObjectRequest)))) {
return "finish";
} catch (Exception e) {
System.err.println("Error downloading file " + fileName + ": " + e.getMessage());
return "Error: " + fileName; // エラー時のデフォルト値
}
}
}