LoginSignup
5
9

More than 1 year has passed since last update.

Java SE11 Gold 対策メモ --並列処理編

Last updated at Posted at 2021-07-12

概要

  • Java SE11 Gold 試験用にまとめていた自分用メモ
  • こちらの記事を参考にSE11版として作成しています。

Java SE11 Gold 試験範囲はこちら

スレッド

スレッドとは、プログラムを実行した場合の処理の最小単位のことです

  • javaでは指定したクラスのmain()メソッドを実行する際、新しくスレッドを生成している
  • プログラムの実行単位を複数のスレッドに分割して実行することをマルチスレッドと呼ぶ
  • スレッドの制御にはThreadクラスRunnableインターフェースを用いる

スレッドの状態

  • スレッドの実行は実行可能状態の時にメソッドの呼び出しで行う
    • Runnable.run()メソッド
    • Thread.start()メソッド
      • start()メソッドの中ではオーバーライドしたrun()メソッドを呼び出している

状態 説明
実行可能 スレッドが生成され、実行の機械が与えられるのを待っている状態(アイドリング状態)
実行中 スレッドが実行され、処理を行っている状態
実行不可能 ディスクの入力操作や、スレッドの排他制御や同期処理などにより、スレッドの動作が一時的に休止している状態
終了 run()メソッドの処理が終わり、スレッドが終了した状態

スレッドの優先度

  • 各スレッドには優先度が設定されている
  • 実行可能状態のスレッドが競合した場合、高い優先度をもつスレッドから実行される
    • スレッドの実行はOSに依存する為、必ず優先度順に実行されることが保証されていない

優先度を取得・設定するメソッド

メソッド名 説明
static Thread currentThread () 現在実行中のスレッドオブジェクトを取得する
final String getName () スレッドの名前を返す
final int getPriority () スレッドの優先度を返す
final void setPriority (int newPriority) スレッドの優先度を変更する。1~10まで引数で指定できる。デフォルトは5

優先度を表す定数

定数名 説明
MAX_PRIORITY 最大値。10を表す
NORMPRIORITY デフォルト値。5を表す
MIN_PRIORITY 最小値。1を表す

スレッドの制御

  • スレッドの制御は、Threadクラスのメソッドとして提供されている
    • sleep(long) 引数ミリ秒呼び出したスレッドが休止する
    • join() 実行中のスレッドが終了するまで待機
    • yield() 呼び出したスレッドを休止し、他のスレッドを実行する
    • interrupt() 休止中のスレッドに割り込みを入れる
      • 割り込まれたスレッドはInterruptedException例外を受け取り、処理を再開する。
public class Test05 {
    public static void main(String[] args) {
        Runnable timertask = ()->{
            int cnt=0;
            while(cnt<5) {
                sleep(1000);    //指定された時間待機
                System.out.println(++cnt+"s");
            }
        };
        Runnable task1 = ()->{
            System.out.println(Thread.currentThread().getName() + ":task1開始");
            Thread.yield(); //他のスレッドを実行
            sleep(1000);
            System.out.println(Thread.currentThread().getName() + ":task1終了");
        };
        Runnable task2 = ()->{
            System.out.println(Thread.currentThread().getName() + ":task2開始");
            System.out.println(Thread.currentThread().getName() + ":task2終了");
        };

        Thread thread_0 = new Thread(task1);
        Thread thread_1 = new Thread(task2);
        Thread timer = new Thread(timertask);
        timer.start();
        thread_0.start();
        thread_1.start();

        try {
            timer.join();   //スレッドが終了するまで待機
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName() + "スレッド");
    }

    static void sleep(long time) {
        try {
            Thread.sleep(time);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
Thread-0:task1開始
Thread-1:task2開始
Thread-1:task2終了
1s
Thread-0:task1終了
2s
3s
4s
5s
mainスレッド

排他制御と同期制御

複数のスレッドを運用する際、同じオブジェクトを操作することがあります。
その際の値の取り扱いを制御することです。

排他制御

  • メソッド内の処理が行われている間は、他スレッドの参照を禁止する
  • synchronizedキーワードを使用する
    • synchronized void method()
    • method(){ synchronized (this){・・・} }

同期制御

  • 実行順序を制御する
    • set()が実行されてからprint()を実行したい
  • Objectクラスには制御用のメソッドがある
    • wait() 現在のスレッドを待機
    • notify 待機中のスレッドを1つ再開させる
    • notifyAll 待機中のスレッドを全て再開させる

デッドロック

  • 全てのスレッドが待機状態になり、ロックを開放するnotify()メソッドを呼ぶスレッドがいない状態
    • 複数のストーカー同士がお互いのストーカーの家で帰宅を待っている状態
      • 全員が待機状態且つ、帰宅(解放)待ち

ライブロック

  • デッドロックとの相違点として、ライブロックに関係するプロセスは、相手の状態によって、自身の状態も定期的に変化していき、その結果としてお互いに進展しない状態を表している。
    • 二人の人が細い道ですれ違う際に、お互いに同じ方向に譲り合いを繰り返し、ぶつかりそうになりながら進めない状態
      • 左右に避けようと常に動いている

並行処理ユーティリティ

並行処理ユーティリティ(Concurrent Utilities)
並行処理プログラミングにおける様々なユーティリティ機能の実装を目的としたライブラリ群です。

主なパッケージとしてはこれらがある。

  • java.util.concurrent パッケージ
    • java.util.conccurent.atomic パッケージ
    • java.util.concurrent.locks パッケージ

これらを用いて並行処理を実装しています。
また、許可されていない操作に対する例外としてConcurrentModificationExceptionをスローします。

並行処理ユーティリティが提供している機能

  • スレッド・プール
    • スレッドを貯蔵(Pool)する仕組み
    • スレッド生成のシステム負荷(オーバーヘッド)を軽減する
  • 並行コレクション
    • マルチスレッド環境下で安全に使用できるコレクションクラスを提供
      • Collectionsクラスで提供されている、スレッドセーフなオブジェクトを返すstaticメソッド
  • アトミック変数
    • ステートの読み込み → 値の変更 → ステートの書き戻し をアトミックに扱う
      • 「完全に終了している」もしくは「未着手である」のいずれかでなければ値を参照できない
  • カウンティング・セマフォ
    • 複数のスレッドを管理し、同期や割り込み制御を行う仕組みのこと

アトミック変数

class Sample{
    public static void main(String[] args){
        AtomicInteger atomicInteger =  new AtomicInteger(0);// 初期化
        int num = atomicInteger.get();// 取得
        AtomicReference<String> aatomicReference= new AtomicReference<String>("Hello");
        String str = aatomicReference.get();
    }
}

並列コレクション

  • java.util パッケージで提供されるコレクション・クラスの多くはシングルスレッド・アプリケーションで使用することを前提に設計
    • 非スレッドセーフ
      • ArrayList
      • HashMap
    • スレッドセーフ
      • Vector
      • Hashtable
      • 利用状況によってはパフォーマンス劣化

java.util.concurrent パッケージが提供する並行コレクション

  • 同時並行正とパフォーマンスを最適化
    • BlockingQueue
    • ConcurrentHashMap
    • ConcurrentLinkedDeque
    • CopyOnWriteArrayList など

CopyOnWriteArrayList

  • add、set、remove などが呼び出されると内部で要素を保持している配列のコピーを作成
    • 変更処理中に他のスレッドが読み取りを行う場合、コピー前(イテレータ作成時点)のリストが返却される
      • java.util.ConcurrentModificationException がスローされることはない

CyclicBarrierクラス

java.util.concurrent.CyclicBarrier にて宣言

  • 各スレッドの足並みを合わせるための機能を提供するクラス
  • 各スレッドは、設定されたバリアに到達すると、他のスレッドがバリアに到達するまで待機することになります
    • 指定した数のスレッドがバリアに到達すると、バリアを解除しスレッドは実行を再開します
public class Main {

    public static void main(String[] args) {

        CyclicBarrier barrier = new CyclicBarrier(3); // バリアの作成

        Runnable r = () -> {
            String threadName = Thread.currentThread().getName();
            try {
                System.out.println(threadName + ": start");
                Thread.sleep((int) (Math.random() * 5000)); // 待ち時間

                System.out.println(threadName + ": waiting..."); // バリアに到着
                barrier.await(); // バリアで待機

            } catch (InterruptedException | BrokenBarrierException e) {
                e.printStackTrace();
            }

            System.out.println(threadName + ": end"); // 終了
        };

        new Thread(r).start();
        new Thread(r).start();
        new Thread(r).start();
    }
}
Thread-0: start
Thread-1: start
Thread-2: start
Thread-0: waiting...
Thread-1: waiting...
Thread-2: waiting...
Thread-2: end
Thread-0: end
Thread-1: end

Excecutorフレームワーク

従来のThreadの問題点

  • Threadは1度しか利用できない為、2回目以降はリソースを生成するしかなかった(2回目以降の実行はIllegalThreadStateException)
  • タスクの実行者とタスクの実行方法の両方が一括りに実装されていた
    • 開発者がタスクの実行方法を詳細に制御できなかった

Escecutorフレームワークはこれらの問題点を解決した

  • タスクの実行者とタスクの実行方法を分離させる
  • 2回目以降も同じリソースで実行することができる(submit() execute())

Excecutorフレームワークの主な機能

  • スレッド・プールの実装を提供
  • タスクの実行におけるライフサイクル管理や監視、統計収集などの機能も提供
  • タスクのスケジューリング(遅延開始と周期的実行)を実現

Excecutorインターフェース

  • ExecutorService
    • タスクの終了管理
    • タスクの進行状況を管理
  • ScheduledExecutorService
    • ExecutorService を継承
    • 上記に加え、タスクのスケジュール管理

Executorオブジェクト

Executorオブジェクトを生成するにはExecutorsクラスのstaticメソッドから生成します。

メソッド 説明
newCashedThreadPool () キャッシュされたスレッド・プールを使用し、必要に応じて新しいスレッド・プールを作成するExecutorServiceを返す
newFixedThreadPool (int スレッド数) 指定された数のスレッドを再利用するスレッド・プールを作成するExecutorServiceを返す
newScheduledThreadPool (int スレッド数※アイドリング) スケジューリング機能を備えたスレッド・プールを作成するScheduledExecutorServiceを返す
newSingleThreadExecutor () 単一のワーカースレッドを使用するExecutorServiceを返す
newSingleThreadScheduledExecutor () スケジューリング機能を備えた単一のスレッドのScheduledExecutorServiceを返す
newWorkStealingPool () すべての使用可能なプロセッサをターゲットとして使用して、Work-stealingスレッド・プールを作成するExecutorServiceを返す

Work-stealingスレッド・プールは従来のスレッド管理とは異なり、終了したスレッドが他のスレッドからタスクを横取り(Stealing)し、処理します(一番効率的、ForkJoinPoolでも採用されている手法)

ExecutorServiceインターフェース

  • java.util.concurrent.ExecutorService にて宣言
    • executeまたはsubmitメソッドにてタスクを開始する
    • isShutdownメソッドはシャットダウンをしているか、isTerminatedメソッドは実行中のタスクがまだ存在しているかを判定
    • タスク後はshutdownメソッドを呼ばないと正常に終了しない
public class Main{

    static Runnable task = ()->{
        try {
            System.out.println(Thread.currentThread().getName() +":start");
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally{
            System.out.println(Thread.currentThread().getName() +":end");
        }
    };

    static void ThreadTest(ExecutorService service) {
        for(int i=0;i<100;i++)
            service.execute(task);
        if(service != null) service.shutdown();
        System.out.println("isShutDown:"+service.isShutdown());
        System.out.println("isTerminated:"+service.isTerminated());
    }

    public static void main(String[] args) {
        ExecutorService service = Executors.newSingleThreadExecutor();  //単一のワーカー・スレッドを使用するexecutorを作成します
//      ExecutorService service = Executors.newCachedThreadPool();  //必要に応じ、スレッドを作成。60秒未使用でキャッシュ削除
//      ExecutorService service = Executors.newFixedThreadPool(3);// 3つのスレッド・プールを作成
//      ExecutorService service = Executors.newWorkStealingPool();// 3つのWork-stealingスレッド・プールを作成

        ThreadTest(service);//タスク開始
    }
}

Callable<V>インターフェース

  • java.util.concurrent.Callable にて宣言
  • タスク用のインターフェース
    • 戻り値を返すことができる
    • checked例外をスローできる

Future<V>インターフェース

  • java.util.concurrent.Future にて宣言
  • 非同期計算の結果を保持する
    • 計算が完了したかどうかのチェック、完了までの待機、計算結果の取得などを行うためのメソッドが用意されている
      • get () タスクの実行結果の取得
      • isDone () タスクの完了の判定
      • isCancelled () タスクのキャンセル判定etc...
public class Main{
    public static void main(String[] args) throws InterruptedException, ExecutionException {

            ExecutorService service = Executors.newWorkStealingPool();
            Runnable task1 = () -> System.out.println("task1");
            Callable<?> task2 = () -> {System.out.println("task2"); return true;};

            Future<?> result1 =service.submit(task1, true);
            Future<?> result2 =service.submit(task2);
            System.out.println("result1:"+result1.get());
            System.out.println("result2:"+result2.get());

            if(service != null) service.shutdown();
    }
}

ScheduledExecutorServiceインターフェース

  • java.util.concurrent.ScheduledExecutorService にて宣言
  • タスクのスケジューリングを行う
  • 3つのスケジューリングメソッドが用意されている
    • schedule
    • scheduleAtFixedRate
    • scheduleWithFixedDelay

scheduleメソッド

schedule(Callable<V> callable, long delay, TimeUnit unit)

public class Main{

    public static void main(String[] args) throws Exception {
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

        service.schedule(() -> {
            System.out.println("end");
            service.shutdown();
            System.exit(0);
        }, 1000, TimeUnit.MILLISECONDS);
        //(Runnable,待ち時間,時間単位)

        int count = 0;
        System.out.println("start");
        while (true) {
            Thread.sleep(100);
            System.out.println((++count) * 100 + " ms");
        }
    }
}
start
100 ms
200 ms
300 ms
400 ms
500 ms
600 ms
700 ms
800 ms
900 ms
end

scheduleAtFixedRateメソッド

scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit)

public class Main{
     private static int n = 0;

    public static void main(String[] args) throws Exception {
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

        service.scheduleAtFixedRate(() -> {
            System.out.println("begin(" + n + ")");
            sleep(1000);
            System.out.println("end(" + n + ")");
            n++;
        }, 3, 2, TimeUnit.SECONDS);
        //(Runnable,初回待ち時間,2回目以降待ち時間,時間単位)
        //タスク開始時から待ち時間が計測される
        //end後、1秒間隔でstartが表示される
        int count = 0;
        while (true) {
            sleep(1000);
            System.out.println((++count) + " s");
        }
    }

    private static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {}
    }
}

scheduleWithFixedDelayメソッド

scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit)

public class Main{
    private static int n = 0;

    public static void main(String[] args) throws Exception {
        ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();

        service.scheduleWithFixedDelay(() -> {
            System.out.println("begin(" + n + ")");
            sleep(1000);
            System.out.println("end(" + n + ")");
            n++;
        }, 3, 2, TimeUnit.SECONDS);
        //(Runnable,初回待ち時間,2回目以降待ち時間,時間単位)
        //タスク終了後から待ち時間が計測される。
        //end後2秒感覚でstartが表示される
        int count = 0;
        while (true) {
            sleep(1000);
            System.out.println((++count) + " s");
        }
    }

    private static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {}
    }
}

Fork / Join フレームワーク

  • 大きなタスクを小さなタスクに分割し、複数スレッドで同時並行的に実行することで処理パフォーマンスを向上させる仕組み
    • マルチコア・CPUを効率的に使用することが最大の目的
    • Work-stealing アルゴリズムを採用
      • 自身のワークキューをすべて実行し終えたワーカースレッドは、他のスレッドのワークキューの末尾から未処理のタスクを横取り(stealing)して実行
      • タスクの割り振りが自然と最適化される仕組み

ForkJoinPoolクラス

java.util.concurrent.ForkJoinPool にて宣言

  • Fork / Join フレームワークを利用したタスクを実行するための ExecutorService インタフェースの実装クラス
  • Executorインターフェースを継承している
    • メソッドなども引き継いでいる
  • java.util.concurrent.ForkJoinTask<V>

ForkJoinPoolクラスのタスク実行用メソッド

メソッド 説明
void execute(ForkJoinTask<?> task) 非同期で実行、処理結果は受け取らない
T invoke(ForkJoinTask task) タスクの実行が終了するまで待機、処理結果を受け取る
ForkJoinTask submit(ForkJoinTask task) 非同期でタスクを実行、処理結果をタスクから受け取る

ForkJoinTask のサブクラス(抽象クラス)

  • RecursiveAction クラス
    • 戻り値が必要ない場合
  • RecursiveTask クラス
    • 戻り値が必要な場合

RecursiveAction クラス

class MyAction extends RecursiveAction {

    @Override
    protected void compute() {
        try {
            Thread.sleep(2000); // 2秒待機
        } catch (InterruptedException e) {}
        System.out.println("MyAction: OK");
    }
}

public class Main{

    public static void main(String[] args) throws InterruptedException {
        ForkJoinPool executor = new ForkJoinPool();
        executor.execute(new MyAction());
        System.out.println("Main: OK");

        System.out.println("シャットダウン");
        executor.shutdown();
        executor.awaitTermination(10, TimeUnit.SECONDS);//shotdown後にタスクが続いている場合は待機
        System.out.println("End");
    }
}
#非同期処理(execute)の場合
Main: OK
シャットダウン
MyAction: OK
End

#同期処理(invoke)の場合
MyAction: OK
Main: OK
シャットダウン
End

RecursiveTask<T> クラス

public class RecursiveTask_ {

    public static void main(String[] args) throws InterruptedException{
        ForkJoinPool executor = new ForkJoinPool();

        RecursiveTask<Integer> task = new RecursiveTask<Integer>() {
            @Override
            protected Integer compute() {return 1;}
        };

        Integer num = executor.invoke(task);
        //Integer num = executor.submit(task);  //コンパイルエラー 非同期だと取得する値が変わる為、参照元を返す
        System.out.println(num);
    }
}
1

ストリーム並列処理

  • ストリームAPiでの処理は
    • 順次処理(Sequential)
    • 並列処理(Parallel) のいずれかで実行できる
      • ForkJoinPoolクラスで並行処理を実現している
  • java.util.Collection インタフェースの stream / parallelStream メソッドを使用したのかで決定
  • ストリーム処理の途中で実行処理を変更することも可能
    • parallel()
      • 並列処理モードに変更
    • sequential()
      • 順次実行モードに変更
  • isParallel() でどちらか判別可能

IntStream.of(1,2,3,4,5).forEach(System.out::print); // 12345
IntStream.of(1,2,3,4,5).parallel().forEach(System.out::print);  //毎回ランダム
IntStream.of(1,2,3,4,5).parallel().sequential().forEach(System.out::print); //12345

参考書

オラクル認定資格教科書 Javaプログラマ Gold SE11
徹底攻略Java SE 8 Gold問題集

5
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
9