Java

待ちが発生するタスクの並列処理

More than 1 year has passed since last update.

通信を行う処理など、待ちが発生する処理を効率化したい。

Parallel Stream

Java8で導入されたstreamの機能を使って簡単にできないかと思い、まずはStream#parallelを使用した。

ParallelStreamTest.java
public class ParallelStreamTest {
    public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        run(1000, 10);
    }

    private static void run(int taskCount, int sleepTime)
            throws InterruptedException, ExecutionException {
        long start = System.currentTimeMillis();
        Set<String> threadSet = new HashSet<>();
        IntStream.range(0, taskCount).parallel().forEach(x -> {
            threadSet.add(Thread.currentThread().toString());
            try {
                Thread.sleep(sleepTime);
            } catch (Exception e) {
            }
        });

        long time = System.currentTimeMillis() - start;
        System.out.println("threadCount = " + threadSet.size() + ", time = " + time);
    }
}
threadCount = 4, time = 3079

結果、順次処理だと10秒かかる処理が3秒で終わった。スレッド数はおそらく4で、これは実行環境のCPUコア数(CPUコア数2、ハイパースレッディングで×2で4スレッド)に等しい。

ForkJoinPoolを使用したParallel Stream

ForkJoinPoolを使用してCPUコア数を超えたスレッド数で並列実行することができるらしいので試してみた。

参考: http://d.hatena.ne.jp/yohhoy/20140613/p1

ForkJoinPoolTest.java
public class ForkJoinPoolTest {
    public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        for (int i = 1; i <= 30; i++) {
            run(i, 1000, 10);
        }
    }

    private static void run(int threadCount, int taskCount, int sleepTime)
            throws InterruptedException, ExecutionException {
        ForkJoinPool forkJoinPool = new ForkJoinPool(threadCount);

        long start = System.currentTimeMillis();
        Set<String> threadSet = new HashSet<>();
        forkJoinPool.execute(() -> {
            IntStream.range(0, taskCount).parallel().forEach(x -> {
                threadSet.add(Thread.currentThread().toString());
                try {
                    Thread.sleep(sleepTime);
                } catch (Exception e) {
                }
            });
        }).get();

        long time = System.currentTimeMillis() - start;
        System.out.println("threadCount = " + threadCount + ", threadSet.size() = " + threadSet.size() + ", time = " + time);
    }
}
threadCount = 1, threadSet.size() = 1, time = 12151
threadCount = 2, threadSet.size() = 2, time = 5986
threadCount = 3, threadSet.size() = 3, time = 4495
threadCount = 4, threadSet.size() = 4, time = 3012
threadCount = 5, threadSet.size() = 5, time = 2970
threadCount = 6, threadSet.size() = 6, time = 2251
threadCount = 7, threadSet.size() = 7, time = 2240
threadCount = 8, threadSet.size() = 8, time = 1511
threadCount = 9, threadSet.size() = 9, time = 1501
threadCount = 10, threadSet.size() = 10, time = 1490
threadCount = 11, threadSet.size() = 11, time = 1509
threadCount = 12, threadSet.size() = 12, time = 1510
threadCount = 13, threadSet.size() = 13, time = 1501
threadCount = 14, threadSet.size() = 14, time = 1492
threadCount = 15, threadSet.size() = 15, time = 1463
threadCount = 16, threadSet.size() = 16, time = 758
threadCount = 17, threadSet.size() = 16, time = 734
threadCount = 18, threadSet.size() = 16, time = 754
threadCount = 19, threadSet.size() = 16, time = 743
threadCount = 20, threadSet.size() = 16, time = 739
threadCount = 21, threadSet.size() = 16, time = 767
threadCount = 22, threadSet.size() = 16, time = 765
threadCount = 23, threadSet.size() = 16, time = 756
threadCount = 24, threadSet.size() = 17, time = 766
threadCount = 25, threadSet.size() = 16, time = 750
threadCount = 26, threadSet.size() = 16, time = 750
threadCount = 27, threadSet.size() = 16, time = 760
threadCount = 28, threadSet.size() = 16, time = 759
threadCount = 29, threadSet.size() = 16, time = 757
threadCount = 30, threadSet.size() = 16, time = 733

結果、16スレッドまではスレッド数が増えたが、それ以上のスレッド数を指定してもスレッド数は増えなかった。実行時間も9〜15スレッドはほぼ同じ値であり、不自然さがある。

ExecutorService

基本に立ち返り、ExecutorServiceを使用してみた。
すべてのタスクが終わるまで待つ仕組みはCountDownLatchを使用した。

ExecutorServiceTest.java
public class ExecutorServiceTest {
    public static void main(String[] args) throws InterruptedException,
            ExecutionException {
        for (int i = 1; i <= 30; i++) {
            run(i, 1000, 10);
        }
    }

    private static void run(int threadCount, int taskCount, int sleepTime)
            throws InterruptedException, ExecutionException {
        ExecutorService pool = Executors.newFixedThreadPool(threadCount);

        Set<String> threadSet = new HashSet<>();
        AtomicInteger finishCount = new AtomicInteger();
        long start = System.currentTimeMillis();
        CountDownLatch latch = new CountDownLatch(taskCount);
        IntStream.range(0, taskCount).parallel().forEach(x -> {
            pool.submit(() -> {
                try {
                    threadSet.add(Thread.currentThread().toString());
                    Thread.sleep(sleepTime);
                    finishCount.addAndGet(1);
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    latch.countDown();
                }
            });
        });

        latch.await();
        pool.shutdown();

        long time = System.currentTimeMillis() - start;
        System.out.println("threadCount = " + threadCount + ", threadSet.size() = " + threadSet.size() + ", time = " + time);
    }

}
threadCount = 1, threadSet.size() = 1, time = 11796
threadCount = 2, threadSet.size() = 2, time = 5917
threadCount = 3, threadSet.size() = 3, time = 3963
threadCount = 4, threadSet.size() = 4, time = 3010
threadCount = 5, threadSet.size() = 5, time = 2414
threadCount = 6, threadSet.size() = 6, time = 2024
threadCount = 7, threadSet.size() = 7, time = 1690
threadCount = 8, threadSet.size() = 8, time = 1519
threadCount = 9, threadSet.size() = 9, time = 1331
threadCount = 10, threadSet.size() = 10, time = 1209
threadCount = 11, threadSet.size() = 11, time = 1089
threadCount = 12, threadSet.size() = 12, time = 1001
threadCount = 13, threadSet.size() = 13, time = 929
threadCount = 14, threadSet.size() = 14, time = 866
threadCount = 15, threadSet.size() = 15, time = 816
threadCount = 16, threadSet.size() = 16, time = 755
threadCount = 17, threadSet.size() = 17, time = 717
threadCount = 18, threadSet.size() = 18, time = 666
threadCount = 19, threadSet.size() = 19, time = 634
threadCount = 20, threadSet.size() = 20, time = 605
threadCount = 21, threadSet.size() = 21, time = 582
threadCount = 22, threadSet.size() = 22, time = 548
threadCount = 23, threadSet.size() = 23, time = 533
threadCount = 24, threadSet.size() = 24, time = 500
threadCount = 25, threadSet.size() = 25, time = 496
threadCount = 26, threadSet.size() = 26, time = 474
threadCount = 27, threadSet.size() = 27, time = 457
threadCount = 28, threadSet.size() = 28, time = 430
threadCount = 29, threadSet.size() = 29, time = 422
threadCount = 30, threadSet.size() = 30, time = 410

問題なく処理できている。