0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

個人用メモ#3

Posted at

ConcurrentHashMapを使用する場合でも更新をする場合はatomicなメソッドを使わないといけない
putIfAbsentで初期値を代入し、computeIfPresentで更新を行う。

package com.example;

import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.GetObjectRequest;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class S3ParallelFileDownloaderWithCompletableFuture {
    private static final String BUCKET_NAME = "mmnkbucket";
    private static final String FILE_PREFIX = "future/"; // ファイルの連番部分以外の名前
    private static final int FILE_COUNT = 1000000; // ファイル数
    private static final int THREAD_COUNT = 10; // 並列処理数

    public static void main(String[] args) {
        LocalDateTime nowDate = LocalDateTime.now();
        System.err.println("start : " + nowDate);
        
        ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();

        ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(THREAD_COUNT);
        int min = 1;
        int max = 5;

        Random random = new Random();
        // CompletableFutureで並列にファイルを取得
        List<CompletableFuture<Void>> futures = IntStream.rangeClosed(1, FILE_COUNT)
                .mapToObj(fileIndex -> CompletableFuture.runAsync(() -> {
                    int value = random.nextInt(max + min) + min;
                    map.putIfAbsent(Integer.toString(value), 0);
                    map.computeIfPresent(Integer.toString(value), (k, v) -> ++v);
                }, executor))
                .collect(Collectors.toList());

        // 全ての非同期処理の結果を集約
        futures.stream()
                .map(CompletableFuture::join) // 各非同期処理の完了を待つ
                .collect(Collectors.toList());

        int count = 0;
        map.forEach(1, (key, value) -> {
            System.err.println(key + ", " + value);
        });
        for (Map.Entry<String, Integer> entry : map.entrySet()){
            count = count + entry.getValue();
        }
        System.err.println(count);

        // 取得した結果を表示(または他の処理に使用)

        LocalDateTime endDate = LocalDateTime.now();
        System.err.println("end : " + endDate);

        executor.shutdown();
    }

}
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?