1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Javaの並列処理の書き方を確認したときのメモ

Last updated at Posted at 2019-12-15
package com.example.demo;

import org.junit.jupiter.api.Test;

import java.util.*;
import java.util.concurrent.*;
import java.util.stream.IntStream;

class ParallelTests {
    /** 戻り値を受け取る */
    static class Result {
        public String name;
        public Long age;

        @Override
        public String toString() {
            return "Result{" +
                    "name='" + name + '\'' +
                    ", age=" + age +
                    '}';
        }
    }

    @Test
    void simple_parallel_test() throws ExecutionException, InterruptedException {

        // newWorkStealingPoolがJava8から追加されたもので効率よくプールしているスレッドを使うらしい
        ExecutorService executorService = Executors.newWorkStealingPool(10);
        // 無名クラスを定義して使う。Lambdaの暗黙の型なんだろうか。。。
        Callable<Result> callable = new Callable<Result>() {
            @Override
            public Result call() throws Exception {
                Result result = new Result();
                result.name = "callable";
                result.age = Thread.currentThread().getId();
                return result;
            }
        };
        // 同じ意味合いのものをLambdaで定義する場合
        Callable<Result> lambda = () -> {
            Result result = new Result();
            result.name = "lambda";
            result.age = Thread.currentThread().getId();
            return result;
        };
        Future<Result> callableFuture = executorService.submit(callable);
        Future<Result> lambdaFuture = executorService.submit(lambda);
        // 2つのタスクがどっちも終わるまで待つ
        while (! (lambdaFuture.isDone() && callableFuture.isDone())) {
            System.out.println(callableFuture.get().toString());
            System.out.println(lambdaFuture.get().toString());
        }
        executorService.shutdown();
    }

    @Test
    void list_parallel_test() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newWorkStealingPool(10);
        Callable<Result> lambda1 = () -> {
            Result result = new Result();
            result.name = "lambda1";
            result.age = Thread.currentThread().getId();
            Thread.sleep(100);
            return result;
        };
        Callable<Result> lambda2 = () -> {
            Result result = new Result();
            result.name = "lambda2";
            result.age = Thread.currentThread().getId();
            Thread.sleep(10);
            return result;
        };
        // 指定した順番で結果が帰るよ。
        List<Future<Result>> futureList = executorService.invokeAll(Arrays.asList(lambda1, lambda2));
        System.out.println(futureList.get(0).get().toString());
        System.out.println(futureList.get(1).get().toString());
        executorService.shutdown();
    }

    @Test
    void list_parallel_type_test() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newWorkStealingPool(10);
        Callable<String> lambda1 = () -> {
            return "test";
        };
        Callable<Integer> lambda2 = () -> {
            return 123;
        };
        // StringとIntegerが混ざると以下がやれない!不便すぎるよ!
        // List<Future> futureList = executorService.invokeAll(Arrays.asList(lambda1, lambda2));
        Callable<Object> lambda3 = () -> {
            return "test";
        };
        Callable<Object> lambda4 = () -> {
            return 123;
        };
        // Objectで受け取ってあとでキャストするしかないのかな…悲しみ
        List<Future<Object>> futureList = executorService.invokeAll(Arrays.asList(lambda3, lambda4));
        System.out.println(futureList.get(0).get().toString());
        System.out.println(futureList.get(1).get().toString());
        executorService.shutdown();
    }

    @Test
    void shared_value_test() throws InterruptedException, ExecutionException {
        ExecutorService executorService = Executors.newWorkStealingPool(10);
        Integer i = 0;
        // これだけで安全になるらしいよ
        Set<Integer> set = Collections.synchronizedSet(new HashSet<>());
        Callable<Integer> lambda1 = () -> {
            // Setはいじれる
            set.add(1);
            // この中でiの値を変更することはできない
            return i + 1;
        };
        Callable<Integer> lambda2 = () -> {
            set.add(2);
            return i + 1;
        };
        List<Future<Integer>> futureList = executorService.invokeAll(Arrays.asList(lambda1, lambda2));
        System.out.println(futureList.get(0).get().toString());
        System.out.println(futureList.get(1).get().toString());
        System.out.println(set.toString());
        executorService.shutdown();
    }

    @Test
    void stream_test() {
        // ストリームに対して、副作用のない処理をするならstreamのparallelが手軽
        // 以下だけだとデフォルトのcommomPoolを使うだけなので、そこまで並列性が高くない
        // VM引数に-Djava.util.concurrent.ForkJoinPool.common.parallelism=100とかってやると拡張できる。
        System.out.println("Parallelism:" + ForkJoinPool.commonPool().getParallelism());
        int[] result = IntStream.range(10, 20).parallel().map(i -> {
            System.out.println("Thread.currentThread()=" + Thread.currentThread().getId());
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return i + 2;
        }).toArray();

        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < result.length; i++) {
            sb.append(result[i] + ",");
        }
        System.out.println(sb.toString());
    }
}
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?