0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

自分用メモ2

Posted at

AWS SDK2でのS3取得の並列化のメモ
S3Client内部でスレッドプールを持っているのでS3Clientを複数持つ必要はなく、要求部分を並列化するだけでよい。

package com.example;

import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class S3ParallelFileDownloaderWithCompletableFuture {
    private static final String BUCKET_NAME = "bucket";
    private static final String FILE_PREFIX = "key/"; // ファイルの連番部分以外の名前
    private static final int FILE_COUNT = 10; // ファイル数
    private static final int THREAD_COUNT = 3; // 並列処理数

    public static void main(String[] args) {
        LocalDateTime nowDate = LocalDateTime.now();
        System.err.println("start : " + nowDate);
        S3Client s3Client = S3Client.builder()
        .region(Region.AP_NORTHEAST_1) // 必要に応じてリージョンを変更
        .build();

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(THREAD_COUNT);

        // CompletableFutureで並列にファイルを取得
        List<CompletableFuture<String>> futures = IntStream.rangeClosed(1, FILE_COUNT)
                .mapToObj(fileIndex -> CompletableFuture.supplyAsync(() -> {
                    String str = downloadFile(s3Client, fileIndex);
                    LocalDateTime endDate = LocalDateTime.now();
                    System.err.println("download end : " + endDate);
                    return str;
                }, executor))
                .collect(Collectors.toList());

        // 全ての非同期処理の結果を集約
        List<String> fileContents = futures.stream()
                .map(CompletableFuture::join) // 各非同期処理の完了を待つ
                .collect(Collectors.toList());

        // 取得した結果を表示(または他の処理に使用)
        fileContents.forEach(content -> System.out.println("File Content: " + content));

        LocalDateTime endDate = LocalDateTime.now();
        System.err.println("end : " + endDate);

        executor.shutdown();
    }

    private static String downloadFile(S3Client s3Client, int fileIndex) {

        System.err.println("s3Client instance : " + s3Client.toString());
        LocalDateTime startDate = LocalDateTime.now();
        System.err.println("download start : " + startDate);

        String fileName = FILE_PREFIX + fileIndex + ".txt"; // 連番ファイル名

        GetObjectRequest getObjectRequest = GetObjectRequest.builder()
                .bucket(BUCKET_NAME)
                .key(fileName)
                .build();

        try (BufferedReader reader = new BufferedReader(new InputStreamReader(s3Client.getObject(getObjectRequest)))) {
            return "finish";
        } catch (Exception e) {
            System.err.println("Error downloading file " + fileName + ": " + e.getMessage());
            return "Error: " + fileName; // エラー時のデフォルト値
        }
    }
}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?