LoginSignup
14
14

More than 5 years have passed since last update.

Java8のStream#collectをparallelsに実行するときに結果コンテナの変更をスレッドセーフにすべきか?

Last updated at Posted at 2014-07-13

概要

Stream#collectを行う場合、parallelsなStreamにすると、Supplierが作成した結果コンテナに対してマルチスレッドでAccumulatorがどんどん値を突っ込んで行く、さらに複数の結果コンテナを複数スレッドでCombineしていくらしいので場合によってはAccumulateとCombineで結果コンテナを変更する処理をスレッドセーフに作らないといけないのか?と考えたのがきっかけ。
http://docs.oracle.com/javase/8/docs/api/java/text/Collator.html
を見ると大丈夫そうなんですが、筆者は英語が苦手なので(苦笑)実際動かして試してみました。

結論

  • 結果コンテナ(ここで言うMyMapのインスタンス)はスレッドセーフである
  • スレッドプール(ForkJoinPool)が持ってるワーカースレッド(とmainスレッド)毎に結果コンテナが作成される。Accumulate処理は各スレッドで独立して行っているので、結果コンテナはスレッド拘束されておりスレッドセーフである。
  • Combine処理はシリアルに実行されているようなので、スレッドセーフである。

余談

  • ワーカースレッド数の最大値はマシンのコア数-1
    • ForkJoinPool#makeCommonPoolメソッド参照
  • 処理するStream量によってワーカースレッドの数を決めているみたい
    • java.util.stream.AbstractTask#compureメソッドあたり?
  • 今回処理したテキスト量だとワーカースレッド1個 + mainメソッドしか作られなかった。
    • しかも片方のスレッドはなぜかaccumulateで利用されない
    • テキストをコピペして400倍の量にした場合ワーカースレッド3個作成された。

検証コード

import java.nio.file.FileSystems;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collector;

public class WordCount {
    // accumulate時のThreadと使っているMapの対応をチェックする
    private static final ConcurrentMap<String, Set<Integer>> accumulateChecker = new ConcurrentHashMap<>();

    public static void main(String[] args) throws Exception {
        Path path = FileSystems.getDefault().getPath("./src/main/java/wordcount.txt");

        MyMap collect = Files.lines(path)
                .flatMap(line -> Arrays.stream(line.split("( |\\.|,)+")))
                .parallel()
                .collect(Collector.of(
                        () -> {
                            MyMap map = new MyMap();
                            checkSupply(map);
                            return map;
                        },
                        (MyMap map, String word) -> {
                            checkAccumulate(map);
                            AtomicInteger count = map.putIfAbsent(word, new AtomicInteger(1));
                            if (count != null) {
                                count.addAndGet(1);
                            }
                        },
                        (MyMap map1, MyMap map2) -> {
                            checkCombine(map1, map2);
                            map2.forEach(
                                    (k, v) -> {
                                        try {
                                            // ちょっと待たせて時間をかける
                                            TimeUnit.MILLISECONDS.sleep(10);
                                        } catch (InterruptedException e) {
                                            // ignore
                                        }
                                        AtomicInteger count = map1.putIfAbsent(k, v);
                                        if (count != null) {
                                            count.addAndGet(v.get());
                                        }
                                    }
                            );
                            System.out.println("[combine]: thread=" + Thread.currentThread().getName() + ", end.");
                            return map1;
                        }, Collector.Characteristics.CONCURRENT
                ));
        // 結果を表示
        System.out.println(collect.toString());

        // accumulateCheckerの内容を確認
        System.out.println(accumulateChecker.toString());
    }

    private static void checkSupply(MyMap map) {
        System.out.println("[supply] : thread=" + Thread.currentThread().getName() + ", mapNum=" + map.getNum());
    }

    private static void checkAccumulate(MyMap map) {
        HashSet<Integer> initMaps = new HashSet<>();
        initMaps.add(map.getNum());
        Set<Integer> maps = accumulateChecker.putIfAbsent(Thread.currentThread().getName(), initMaps);
        if (maps != null) {
            maps.add(map.getNum());
        }
    }

    private static void checkCombine(MyMap map1, MyMap map2) {
        System.out.println("[combine]: thread=" + Thread.currentThread().getName()
                        + ", map1=[" + map1.getNum() + "," + countArchitectures(map1) + "]"
                        + ", map2=[" + map2.getNum() + "," + countArchitectures(map2) + "]"
        );

    }

    private static int countArchitectures(MyMap map) {
        return map.containsKey("architectures") ? map.get("architectures").get() : 0;
    }

    /**
     * インスタンスを見分けやすいように連番を降るためのConcurrentHashMap
     */
    public static class MyMap extends ConcurrentHashMap<String, AtomicInteger> {
        private static AtomicInteger counter = new AtomicInteger(0);

        private int num;
        public MyMap() {
            num = counter.addAndGet(1);
        }

        public int getNum() {
            return num;
        }
    }
}

wordcount.txt
The objective of the department is to perform basic scientific
research and education in the field of Information Science and
Engineering, whose importance as a research field is continuously
increasing in recent days. In particular, applying of mathematical
methodologies towards information processing and their systems, as
well as exploring basic mathematical disciplines in computer science,
are strongly deemed as immediate research needs.

Rapid advances in computer technologies gave rise to new mathematical
modeling concepts such as chaotic systems and fractals, as well as new
breed of algorithms such as the Interior-point methods for linear
programming, which in turn are influencing the design of new computer
architectures and software which far excel the capabilities of
mankind. The objective of the department is to perform pioneering
research based on such strong interactions between mathematical and
computing sciences, and educate individuals whose imagination and
intelligence could very well lead the research in the area.
14
14
1

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
14