5
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Java 非同期処理

Last updated at Posted at 2024-01-13

Javaの非同期処理

Javaにおいてマルチスレッドとは、一つのプロセス内で同時に実行される複数のスレッドを指す。

プロセスとは、実行中のプログラムのインスタンスを指す。プロセスはOSによって管理されるプログラムの実行単位である。

スレッドもまた、プログラムの実行単位を指す。一つのプロセス内には複数のスレッドが存在することができるため、プロセスが親、スレッドは子という親子関係にある。

Javaは一つのプロセス内で動作していて、そのプロセス内では複数のスレッドが活用されることで、より効率的に処理が実行されるようになっている。

コンピュータ上で複数のプロセスが起動されている場合、通常、各プロセスは独立したそれぞれのメモリ領域を持ち、プロセス同士はそれぞれ隔離された環境を持っている。プロセス内での資源の割り当てやスケジューリングはOSによって管理されていて、異なるプロセス間ではメモリ領域が共有されない

「資源の割り当て」とは具体的に、ヒープ、スタックなどのメモリ領域の割り当て(メモリ上のどの番地からどの番地を使うか)や、CPUの使用権利の割り当て、ファイルや入出力デバイスの割り当てなどを指す。異なるプロセス間で情報をやり取りするためには、IPC(Inter Process Communication)やネットワーク通信(ソケット、パイプ)などの特殊な技術を使用する必要がある。

Javaは一つのプロセス内で動作していて、プロセスに割り当てられたメモリ領域を、プロセス内の各スレッドが共有するため、複数のスレッドが同時に同じメモリ領域にアクセスする競合を起こすことがある。

競合を避けるためには、開発者がスレッドセーフなプログラミングを行う必要がある。

スレッドとは

スレッドとは、プログラムの実行単位であり、一つのプロセス内で動作する独立した処理フローのことを指す。単一のプロセス内で、複数存在することができる。各スレッドは個別の実行コンテキストを持ち、プロセス内で共有されるメモリ領域へアクセスを行う。

プロセスがOSによって管理されるのと同様に、スレッドもまた、OSによって管理される。具体的には、各スレッドに対してCPUの使用権の割り当て、つまり実行順の管理がOSによって行われる。

Javaでは、スレッドが起動されると、各スレッドに「スタック」と呼ばれるメモリ領域が割り当てられる。スタックとは、スレッドがメソッドを実行した時に、そのメソッド内のローカル変数、ステータス情報などのコンテキストが保持される領域のことを指す。メソッドの実行が終了すると、スタックに積まれたコンテキストは取り除かれる。

プログラムの実行中は、メソッドの実行によってコンテキストが積まれ、またメソッド内から別のメソッドが実行されることで、その上にさらに新たなコンテキストが積まれる。反対に、メソッドが終了するとスタックの一番上に積まれたコンテキストは除去され、これらが繰り返されることで、スレッド(Thread、糸)が形成される。

つまりスレッドとは、物理的な何かを指すものではなく、プログラムの処理の流れを抽象化したものと言うことができる。

(言語は異なりますが、スレッドの概念は下記の記事でもまとめています。異なるプログラミング言語においては、実行モデルが異なることがあるため、ご注意ください。)

Main.java
public class Main {
    public static void main(String[] args) {
        fn();
    }

    public static void fn() {
        System.out.println("hello world");
    }
}

Thread_2.gif

マルチスレッドと並列処理

プログラムには並行処理並列処理という概念がある。(詳細は下記の記事にまとめています)

簡単に区別すると、実行中の処理を高速に切り替えることで複数の処理が同時に実行されているように見せるのが並行処理で、実際に複数の処理を物理的に同時実行しているのが並列処理。

物理的な並列処理は、マルチコアプロセッサなどの複数のコアを持つハードウェア上でプログラムを実行することによって実現される。複数のコアに演算処理を分散させ、同時に実行させることで、全体としてプログラムの処理が高速に実行される。

プログラムが、シングルコアのプロセッサ上で動作している場合や、スレッドが競合状態(複数のスレッドが同時にデータにアクセスしようとする状態)に陥っている場合などは、物理的な同時性は実現しない。

マルチコアプロセッサとは、複数のコアを持つCPUのことを指す。コアは、算術論理演算(加算、減算、乗算、除算など)や論理演算(AND、OR、NOTなど)などの基本的な演算を実行する能力を持っている。シングルコアプロセッサは、コアが一つしかないCPU。

マルチスレッド環境で実行されるプログラムは、実際には、必ずしも複数のスレッドが物理的に同時に動いているとは限らない。人間から見ると同時に実行されているように見えても、実際には複数のスレッドが高速に一つのCPUを譲り合って並行処理を進めているだけかもしれない。

並行処理におけるCPUの割り当てはスレッドスケジューラによって行われ、処理中のスレッドはアクティブスレッドとも呼ばれる。スレッドスケジューラとは、実行可能なスレッドの優先度や状態に基づいて、どのスレッドを次に実行するかを決定している。

スレッドスケジューラは、複数のスレッドを物理的に並列処理するためのものではなく、複数のスレッドを効果的に管理し、複数のスレッドの並行処理を効率的に実行するための仕組みである

つまり、物理的な同時性はハードウェアが提供するものであり、マルチスレッドによるプログラミングが提供できるものではない。

スレッドを生成する

Javaのプログラムは通常、JVMによって自動的に起動されたメインスレッド上で実行される。そのため、開発者は意図的に「メインスレッドを起動するためのプログラム」を書かない。

一方で、メインスレッド以外のスレッドは、明示的に生成を行わない限り存在しない(ライブラリやフレームワークが提供する非同期処理や、ガベージコレクションなどのJVMによって自動的に生成されるスレッドは存在する)。

スレッドを明示的に生成しない場合のプログラムの処理は、すべてメインスレッド上で実行される。

スレッドを生成する方法には以下の方法がある。

  • Threadクラスを継承する(古い)
  • Runnableインターフェースを実装する(新しい)

実際には後述するCompletableFutureを使用した方が書きやすく便利。ただ、スレッドの原理や仕組みを理解するためにはThreadRunnableの理解も重要。

Threadクラスを継承する

Threadクラスは、Javaの標準ライブラリ(java.langパッケージ)に含まれている。Threadクラスを利用する場合、Threadクラスを継承して、非同期的に実行したい処理をオーバーライドしたrun()メソッド内に記述する必要がある。

MyThread.java
public class MyThread extends Thread {
    @Override
    public void run(){
        System.out.println("非同期的に処理を実行します。");
    }
}
Main.java
public class Main {
    public static void main(String[] args) {
        // スレッドを生成
        var thread = new MyThread();

        // 生成したスレッド上で、処理を実行
        thread.start();
        // >> 非同期的に処理を実行します。
    }
}

この時、スレッドは以下のような様子になっている。

Thread_new.png

JavaScriptの非同期処理との違い

JavaScriptはJavaのマルチスレッドと異なり、単一スレッド(シングルスレッド)上で実行され、非同期処理はイベントループと呼ばれる実行モデルによって実現される。

イベントループにおける非同期処理は、コールバック関数としてコールバックキューに格納され、「スタックが空になった」という条件時にスタックに積まれ、実行される。

Thread_javascript.png

一方のJavaは、マルチスレッドのプログラミングが可能となっていて、JVMのコンポーネントであるスレッドスケジューラが異なるスレッド間で実行中のスレッド(アクティブスレッド)を切り替える制御を行っている。

Thread_java.png

そのため、異なるスレッドで実行される処理は、どのような順序で実行されるかがJVMの実装に依存していて、予測することはできない。(開発者がこの制御に影響を与えることはできるが、強制的に特定のスレッドをアクティブスレッドとすることはできない。)マルチスレッドのプログラミングで排他制御が重要な理由はこういった理由によるものである。

スレッドスケジューラ

Javaのマルチスレッド環境における実行中のスレッド(アクティブスレッド)は、JVMのコンポーネントであるスレッドスケジューラが制御している。

正確には、OSのスレッドを、JVMがJavaのスレッドに関連づけることで、JVMによる間接的な制御が行われている。スケジュール機能はOSに依存するが、JVMがOSごとの差異を吸収することで、プラットフォームに依存しない仕組みとなっている。

スレッドには「実行中」、「実行可能」、「ブロック中」の3つの状態が存在し、スレッドの生成直後の状態は実行可能状態であり、生成してすぐに実行中になるわけではない

実行可能状態のスレッドはスレッドスケジューラによる判断や決定によって、実行中へと昇格する。スレッドが実行中になると、そのスレッドのスタック(コンテキスト)が有効になる。

Threadクラスのsleep()wait()が実行されると、スレッドはブロック状態になる。また、synchronizedキーワードによって、アクセスしたオブジェクトがロックされていた場合にも、スレッドはブロック状態になる。つまりブロック状態とは、スレッドが実行を一時的に停止した状態を指す。これらの制御がスレッドスケジューラによって行われている。

またコンピュータのコアが1つであれば、同時に二つのスレッドが実行中になることはない。

スレッドスケジューラはJVMのコンポーネントであり、JVMの実装に依存しているため、開発者が直接制御を行うことはできない。そのため、同じマシン上で同じプログラムを動かしたとしても、スレッドスケジューラが全く同じ挙動をすることは保証されない。

スレッドスケジューラの挙動は、開発者(プログラマ)が予測できないことを前提にする

マルチスレッド環境においてはこのことを前提として開発を行う必要がある。

Runnableインターフェースを実装する

Runnableインターフェースも、Javaの標準ライブラリであるjava.langパッケージに含まれている。

Runnableインターフェースには、run()抽象メソッドが定義されている。

Threadクラスを継承した時と形がよく似ているが、今回はThreadを直接インスタンス化する。

MyRunner.java
public class MyRunner implements Runnable{
    @Override
    public void run() {
        System.out.println("非同期的に処理を実行します。");
    }
}
Main.java
public class Main {
    public static void main(String[] args) {
        // スレッドを生成
        var thread = new Thread(new MyRunner());

        // 生成したスレッド上で、処理を実行
        thread.start();
        // >> 非同期的に処理を実行します。
    }
}

ここで、古い方法と新しい方法の両方で利用したThreadクラスは、Runnableインターフェースを実装したクラスとなっている。

Threadクラスを継承するでは、Runnableインターフェースを実装したThreadクラスを継承してさらにオーバーライドしたrun()メソッドを、ここでは直接Runnableから実装しているという関係になっている。

runnable.png

匿名クラスを使って記述する

スレッド生成のコードは、匿名クラスを使うことで簡潔に記述できるようになる。

匿名クラスとは

匿名クラスとは、名前のないクラスのインスタンスを生成するための特殊な構文のこと。

普通、Javaのクラスは名前とともに宣言され、宣言された名前とnewキーワードを使ってインスタンス化が行われる。

new クラス名()

匿名クラスは宣言と同時にインスタンスが生成される。

new インターフェース名() {
    ここがクラス宣言部分
    インターフェースの実装をここで行う
}

通常インターフェースや抽象クラスを実装する場合、実装用のクラスを宣言する必要がある。ただ、1度しかインスタンス化しないクラスを、わざわざ宣言したくない時などに匿名クラスが有効な手段になる。

MyInterface.java
public interface MyInterface {
    void method();
}
本来はインターフェースを実装するためのクラスを宣言する必要がある(クリックで展開する)
MyClass.java
// 匿名クラスを使用することでこのクラス宣言が不要になる
public class MyClass implements MyInterface {
    @Override
    public void method() {
        System.out.println("処理を実行します。");
    }
}
Main.java
public class Main {
    public static void main(String[] args){
        // インターフェースの名前を利用して、
        // インターフェースを実装した、匿名クラスのインスタンスを生成
        var instance = new MyInterface(){
            @Override
            public void method(){
                System.out.println("処理を実行します。");
            }
        };

        // 実装した処理を呼び出す
        instance.method();
        // >> 処理を実行します。
    }
}

匿名クラスにはコードが簡潔になるというメリットがある一方で、コンストラクタを持つことができない、継承関係になれないなどの制約がある。

匿名クラスを使って記述する

匿名クラスを利用すると、先ほどのコードは以下のようになる。宣言する必要のあるクラスがそれぞれ一つ減っていることがわかる。

匿名クラスはインターフェースに使用した場合にはインターフェースを「実装(implements)」したクラスになり、通常のクラスに使用した場合にはそのクラスをスーパークラスとして「継承(extends)」したサブクラスになる。

Threadクラスに対して使用したときは継承(extends)した匿名クラスになり、Runnnableインターフェースに対して使用したときは実装(implements)した匿名クラスになる。

匿名クラスを使ってThreadクラスを継承する
public class Main {
    public static void main(String[] args) {
        // 匿名クラスを使ってスレッドを生成
        var thread = new Thread(){
            @Override
            public void run() {
                System.out.println("非同期的に処理を実行します。");
            }
        };

        thread.start();
        // >> 非同期的に処理を実行します。
    }
}
匿名クラスを使ってRunnableインターフェースを実装する
public class Main {
    public static void main(String[] args) {
        // 匿名クラスを使ってスレッドを生成
        var thread = new Thread(new Runnable(){
            @Override
            public void run() {
                System.out.println("非同期的に処理を実行します。");
            }
        });

        thread.start();
        // >> 非同期的に処理を実行します。
    }
}

ラムダ式を使って記述する

Runnableインターフェースは関数型インターフェースでもあるため、ラムダ式を利用することでさらに簡潔にできる。

ラムダ式とは

Java8から導入された関数型インターフェースのインスタンスを簡易な記述で生成するための構文。

ラムダ式を理解するには、関数型インターフェースについて理解する必要がある。

ラムダ式は関数型インターフェースのインスタンスを生成するための構文

関数型インターフェースとは

関数型インターフェースは、抽象メソッドを一つだけ持つという条件を満たすインターフェースのこと。SAM(Single Abstract Method)と呼ばれることもある。

関数型インターフェースは抽象メソッドを1つだけ持つインターフェースのこと

関数型インターフェースは、前述のrun()メソッドを格納するための型として機能する。run()メソッドは戻り値を持たず引数も無いメソッドだが、戻り値を持つメソッド、引数を持つメソッドも当然存在する。

関数型インターフェースにおいては、メソッドの「引数」、「戻り値」の組み合わせが型としての表現になる。

標準ライブラリのjava.util.functionパッケージには、引数、戻り値のそれぞれ異なるFunctionPredicateConsumerといった関数型インターフェースがあらかじめ用意されているため、自分で関数型インターフェースを宣言して作成する機会はあまりないかもしれない。

java.util.functionパッケージの関数型インターフェース
関数型インターフェース 定義された抽象メソッド 補足
Supplier<T> T get();
Consumer<T> void accept(T t);
BiConsumer<T, U> void accept(T t, U u);
Predicate<T> bolean test(T t);
BiPredicate<T, U> boolean test(T t, U u);
Function<T> R apply(T t);
BiFunction<T, U> R apply(T t, U u);
UnaryOperator<T> T apply(T t);
BinaryOperator<T> T apply(T t1, T t2); BiFunction<T, U>TUが同じパターン。
BiFunction<T, U>の特殊系と言う位置づけ。
Runnabale void run();
Callable<V> V call();

また通常、インターフェースにはdefaultメソッドや、Objectクラスのメソッド(toString()equals()など)をオーバーライドしたメソッドも宣言できるため、これらが含まれるインターフェースでは、関数型インターフェースかどうかがわかりづらいことがある。そのような場合には、@FunctioalInterfaceアノテーションを使用すると、仮に関数型インターフェースの条件を満たさない宣言をしてしまった場合でも、コンパイルエラーによって知らせてくれるようになる。

関数型インターフェース
// 関数型インターフェース
@FunctionalInterface
public interface MyInterface {
    void method();
}	

// これも関数型インターフェース
@FunctionalInterface
public interface MyInterface2 {
    void method();

    default void defaultMethod() {
        System.out.println("デフォルトの処理です。");
    }
    
    String toString();
}

// これは抽象メソッドが2つあるため関数型インターフェースではない
// @FunctionalInterfaceがコンパイルエラーを起こして知らせてくれる
@FunctionalInterface
public interface MyInterface {
    void method1();

    void method2();
}

基本構文

ラムダ式は、関数型インターフェースの実装を定義するための構文。ラムダ式を利用することで、関数型インターフェースが持つメソッドの型を「式」によって表現することができる。またメソッドの引数として渡したり受け取ったりすることで、Javascriptのコールバック関数のような機能を実現できる。

() -> {
  ---処理内容---
}

ラムダ式は、関数型インターフェースの実装として利用される

関数型インターフェースは実装すべき抽象メソッドを一つしか持たないため、ラムダ式を使用して実装する際には、実装するメソッド名を記述する必要がない。

(個人的に、「ラムダ式は、関数型インターフェースの実装として利用される」の意味が、最初全く理解できなかった。コールバック関数を代入した変数をなぜ()で実行しないのかがわからなかったが、ここでのラムダ式は、コールバック関数としてのラムダ式ではないと理解したことで納得できた。「=」が何かを代入しているように見えるのでわかりづらいが、ラムダ式はコールバック関数としても利用できるし、関数型インターフェースの実装としても利用できる。「=」を「代入」と捉えると頭が混乱してしまうが、匿名メソッドを「定義」している、と捉えることで納得できた。「コールバック関数を代入している」のではなく、あくまでも、「インターフェースでオーバーライドすべきメソッドを実装している」だけ。つまり、ラムダ式は、関数型インターフェースの実装として利用される。)

MyInterface.java
// 関数型インターフェース
@FunctionalInterface
public interface MyInterface {
    void method();
}
Main.java
public class Main {
    public static void main(String[] args) {
        // ラムダ式が関数型インターフェースを実装する
        MyInterface instance = () -> {
            System.out.println("hello world");
        };

        instance.method();
        // >> hello world
    }
}

{}return は省略できる。

ラムダ式の処理が1行で表現できる場合、{}ブロックとreturnキーワードは省略することができる。

MyInterface instance = () -> System.out.println("hello world");

引数ありの構文

(引数の型 仮引数) -> {
  ---処理内容---
}
MyInterface.java
@FunctionalInterface
public interface MyInterface {
    void method(int i);
}
Main.java
public class Main {
    public static void main(String[] args) {
        // 引数ありのラムダ式
        MyInterface instance = (int index) -> System.out.println(index + " hello world");

        instance.method(100);
        // >> 100 hello world
    }
}

引数の型は省略できる。

ラムダ式の引数の型は、暗黙的に推論されるため、省略できる。

MyInterface instance = (index) -> System.out.println(index + " hello world");

引数が一つの場合、()が省略できる。

MyInterface instance = index -> System.out.println(index + " hello world");

ただし、引数がない場合の()は省略できない。

メソッド参照

メソッド参照とは、ラムダ式を簡潔するための構文のことを指す。

メソッドへの参照を表現することができ、主に関数型インターフェースの実装として利用される。3種類の構文がある。

クラス名::静的メソッド名
インスタンス変数::メソッド名
クラス名::new
MyInterface.java
@FunctionalInterface
public interface MyInterface {
    void method();
}
MyClass.java
public class MyClass {
    // 静的メソッド
    public static void staticMethod() {
        System.out.println("静的メソッドです。");
    }

    // インスタンスメソッド
    public void instanceMethod() {
        System.out.println("インスタンスメソッドです。");
    }

    // コンストラクタ
    public MyClass() {
        System.out.println("コンストラクタです。");
    }
}

下の3つは全てインターフェースMyInterfaceを、ラムダ式を簡略化したメソッド参照によって実装している。

Main.java
public class Main {
    public static void main(String[] args) {

        // クラス名::静的メソッド名
        // ラムダ式によってここで実装を行うのではなく、既存のメソッドをインターフェースの実装として利用する
        MyInterface staticMethodReference = MyClass::staticMethod;
        staticMethodReference.method();
        // >> 静的メソッドです。

        // インスタンス変数::メソッド名
        var myClass = new MyClass();
        MyInterface instanceMethodReference = myClass::instanceMethod;
        instanceMethodReference.method();
        // >> コンストラクタです。
        // >> インスタンスメソッドです。

        // クラス名::new
        MyInterface constructorReference = MyClass::new;
        constructorReference.method();
        // >> コンストラクタです。
    }
}

defaultメソッドとは

Java8で導入された修飾子。インターフェース内でdefault修飾子をつけて宣言したメソッドは、実装するクラスでオーバーライドしなくても良い。

MyInterface.java
public interface MyInterface {
    // defaultメソッド
    default void method(){
        System.out.println("デフォルトの処理です。");
    }
}
MyClass.java
public class MyClass implements MyInterface {
    // defaultメソッドはオーバーライドしなくてもOK(もちろん、オーバーライドしてもOK)
}
Main.java
public class Main {
    public static void main(String[] args) {
        var instance = new MyClass();
        instance.method();
        // >> デフォルトの処理です。
    }
}

さらに、defaultメソッドはsuperキーワードを使用することで明示的に呼び出すこともできる。

MyInterface,java
public interface MyInterface {
    // defaultメソッド
    default void method(){
        System.out.println("デフォルトの処理です。");
    }
}
MyClass.java
public class MyClass implements MyInterface {
    @Override
    public void method(){
        // defaultメソッドを明示的に呼び出すこともできる
        MyInterface.super.method();
        
        System.out.println("実装クラスの処理です。");
    }
}
Main.java
public class Main {
    public static void main(String[] args) {
        var instance = new MyClass();
        instance.method();
        // >> デフォルトの処理です。
        // >> 実装クラスの処理です。
    }
}

ラムダ式を使ってRunnableを実装する

Runnableインターフェースは関数型インターフェースでもあるため、ラムダ式を利用することができる。

ラムダ式を使ってRunnableインターフェースを実装する
public class Main {
    public static void main(String[] args) {
        // ラムダ式を使ってスレッドを生成
        var thread = new Thread(() -> System.out.println("非同期的に処理を実行します。"));

        thread.start();
        // >> 非同期的に処理を実行します。
    }
}

スレッドプール

スレッドはプロセスよりも軽量であるものの、数が増えればパフォーマンスに影響を与えてしまうことがある。これを防ぐためにスレッドプールという仕組みがある。

スレッドプールでは使用するスレッドがあらかじめ複数用意される。スレッドは必要になったときに取り出され、使用後は再びプールに戻される。こうすることで、プールされたスレッドが何度も再利用できるようになっている。

スレッドプールを利用すると、スレッドの新規生成による負荷が極力抑えられるため、スレッド生成によるオーバーヘッドが減少し、結果としてパフォーマンスを向上させることができる。

スレッドプールの生成

スレッドプールを生成するには、java.util.concurrentパッケージに含まれるExecutorsクラスを利用する。

Executorsクラスには、同じくjava.util.concurrentパッケージのインターフェースExecutorExcutorServiceScheduledExcutorServiceを実装したクラスのインスタンスを提供するための静的メソッド(ファクトリメソッド)が用意されている。

Executorはインターフェースで、Executorsはクラス。これらの関係性を把握しておくと各種メソッドなどがどのクラス、インターフェースで定義されているかがわかり、頭の中で体系立てて整理することができる(と個人的に思っている)。

thread_pool.png

Executors.newFixedThreadPool()

指定された数のスレッドを持つ固定スレッドプールを生成することができる。

public static ExecutorService newFixedThreadPool​(int nThreads)

Executors.newFixedThreadPool()
ExecutorService executor = Executors.newFixedThreadPool(5);

Executorsクラスの静的メソッド`newFixedThreadPool()

Executors.newSingleThreadExecutor()

1つのスレッド(シングルスレッド)でタスクを順次実行するスレッドプールを生成することができる。

Executors.newSingleThreadExecutor()
ExecutorService executor = Executors.newSingleThreadExecutor();

Executors.newCachedThreadPool()

過去に作成されたスレッドを再利用するキャッシュ型スレッドプールを作成することができる。

ExecutorService executor = Executors.newCachedThreadPool();

Executors.newScheduledThreadPool()

定期的に、または遅延して実行されるタスクを事前にスケジューリングできるスレッドプールを生成することができる。

Executors.newScheduledThreadPool()
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);

Executors.newSingleThreadScheduledExecutor()

1つのスレッドでタスクを順次実行するスケジューリングスレッドプールを作成することができる。

Executors.newSingleThreadScheduledExecutor()
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();

非同期処理の実行

生成したスレッドプールに対してタスクを渡すことで、非同期処理を実行することができる。ここでいうタスクとはRunnableインターフェースの実装のことを指す。

タスクを渡すためのメソッドには execute()submit() がある。

execute()

void execute(Runnable command)

Executorインターフェースで定義された非同期処理を実行するためのメソッド。

Thread(new(RunnableTask())).start()に相当する。

execute()
ExecutorService executor = Executors.newFixedThreadPool(10);

Runnable task = () -> System.out.println("hello world");

executor.execute(task);

後述するsubmit()と異なり、戻り値を持たない

またexecute()を使用した場合、生成したスレッド上で実行させた処理中に発生した例外はスレッドプール内で処理されてしまうため、キャッチすることができない。一方、submit()を使用した場合はFutureを介して例外をキャッチすることができる。

execute()は戻り値を持たない

submit()

<T> Future<T> submit​(Callable<T> task)

ExecutorServiceインターフェースで定義された非同期処理実行のためのメソッド。非同期処理の結果をFutureオブジェクトでラップした状態(Future<T>)で返す。

submit()
ExecutorService executor = Executors.newFixedThreadPool(10);

Callable<String> task = () -> "hello world";

Future<String> future = executor.submit(task);

非同期処理に戻り値がある場合のタスクは、Runnableインターフェースではなく Callable<T>インターフェースを使用する。Callable<T>インターフェースは関数型インターフェースの一つである。execute()が例外処理を行えないのは、RunnableCallableの違いによるものでもある。

Runnableを渡して取得したFutureは、非同期処理が完了した時get()nullを返す。

Future<?> submit​(Runnable task)

submit()戻り値を持つ

Future

Futureインフターフェースは非同期処理のハンドリングをしやすくするためのものであり、処理の成否や処理結果を取得するために利用される。

Future<T>には非同期処理の結果を取得することができるget()が定義されている。

V get() throws InterruptedException, ExecutionException

get()
ExecutorService executor = Executors.newFixedThreadPool(10);

Callable<String> task = () -> "hello world";

Future<String> future = executor.submit(task);

try {
    String result = future.get();
    System.out.println(result);
    // >> hello world
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}

get()メソッドは非同期処理の結果を取得するメソッドだが、非同期処理の完了を待つためにも使用される。非同期処理を待つメソッドには、他にThreadインターフェースのjoin()がある。どちらも非同期処理の完了を待つメソッドだが、get()InterruptedExceptionExecutionExceptionをスローする可能性があるのに対して、join()はそのようなチェック例外(try-catchが必要な例外)をスローしないという違いがある。

join()を使う際には、非同期処理が正常に完了することが保証された場合に使用し、非同期処理が例外をスローしたり、キャンセルされる場合などの例外処理が必要な場合はget()を使う。

Threadjoin()は例外処理が行えない
Futureget()は例外処理を行える

非同期処理が戻り値を返さない例

Task.java
public class Task implements Runnable {
    private int id;

    public Task(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        System.out.println("タスクを実行します" + " id: " + id);
    }
}

Main.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class Main {
    public static void main(String[] args) {
        // 10個のスレッドから構成されるスレッドプールを生成
        ExecutorService executor = Executors.newFixedThreadPool(10);

        // タスクをスレッドプールに送信
        for (int i = 0; i < 10; i++) {
            Runnable task = new Task(i);
            // 戻り値がない場合、executor.execute()としてもOK
            executor.submit(task);
        }

        // >> タスクを実行します id: 5
        // >> タスクを実行します id: 8
        // >> タスクを実行します id: 3
        // >> タスクを実行します id: 9
        // >> タスクを実行します id: 0
        // >> タスクを実行します id: 2
        // >> タスクを実行します id: 6
        // >> タスクを実行します id: 4
        // >> タスクを実行します id: 7
        // >> タスクを実行します id: 1

        // スレッドプールをシャットダウン(新しいタスクの送信を受け入れ停止し、既存のタスク実行が終了したらスレッドプールを破棄する)
        executor.shutdown();
    }
}

非同期処理が戻り値を返す例

Task.java
import java.util.concurrent.Callable;

// 戻り値がある場合、Runnable ではなく Callble<T> を使用する(Tは戻り値の型)
public class Task implements Callable<Integer> {
    private int id;

    public Task(int id) {
        this.id = id;
    }

    // Callble では call() をオーバーライドする
    @Override
    public Integer call() throws Exception {
        return id;
    }
}
Main.java
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
    public static void main(String[] args) {
        // 10個のスレッドから構成されるスレッドプールを生成
        ExecutorService executor = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i++) {
            Callable task = new Task(i);
            // タスクをスレッドプールに送信
            Future<Integer> future = executor.submit(task);
            try {
                // 結果を取得する
                Integer taskId = future.get();
                System.out.println("タスクを実行します" + " id: " + taskId);
            } catch (Exception e) {
                // 例外処理
            }
        }

        // >> タスクを実行します id: 0
        // >> タスクを実行します id: 1
        // >> タスクを実行します id: 2
        // >> タスクを実行します id: 3
        // >> タスクを実行します id: 4
        // >> タスクを実行します id: 5
        // >> タスクを実行します id: 6
        // >> タスクを実行します id: 7
        // >> タスクを実行します id: 8
        // >> タスクを実行します id: 9

        // スレッドプールをシャットダウン(新しいタスクの送信を受け入れない)
        executor.shutdown();
    }
}

非同期処理が戻り値を返す例(ラムダ式)

ラムダ式を使用してCallable<T>を実装した場合、戻り値はsubmit()の第2引数に渡すことで、Future<T>オグジェクトから取得する事ができる。

Main.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class Main {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newFixedThreadPool(10);

        for (int i = 0; i < 10; i++) {
            // ラムダ式を使ってCallabel<T>を実装する
            Future<Integer> future = executor.submit(() -> {
                // 処理
            }, i); // submit()の第2引数 i が戻り値になる

            try {
                // ここで i を取得している
                Integer taskId = future.get();
                System.out.println("タスクを実行します" + " id: " + taskId);
            } catch (Exception e) {
                // 例外処理
            }
        }

        // >> タスクを実行します id: 0
        // >> タスクを実行します id: 1
        // >> タスクを実行します id: 2
        // >> タスクを実行します id: 3
        // >> タスクを実行します id: 4
        // >> タスクを実行します id: 5
        // >> タスクを実行します id: 6
        // >> タスクを実行します id: 7
        // >> タスクを実行します id: 8
        // >> タスクを実行します id: 9

        // スレッドプールをシャットダウン(新しいタスクの送信を受け入れない)
        executor.shutdown();
    }
}

複数の非同期処理のハンドリング

CompletableFuture

非同期処理を扱うときに、

ある非同期処理(A)が成功したら、その実行結果(A)を利用して、また別の処理(B)を実行したい

という場合がある。

複数の同期処理、非同期処理の実行順を制御したい場合には、CompletableFutureを利用する。

CompletableFutureはJava8以降のjava.util.concurrentパッケージに含まれる。

まず、最初に実行する非同期処理(コールバック関数)をCompletableFutureクラスの静的メソッドCompletableFuture.supplyAsync()に渡す必要がある。

ここでのポイントは、RunnableCallableを使用したときのように、開発者が明示的にスレッドを生成したりしなくても非同期処理が実現できること。

CompletableFutureを使うと、スレッドの生成処理を記述しなくて良い

CompletableFuture.supplyAsync()に渡したコールバック関数は、JavaのForkJoinPoolと呼ばれるデフォルトのスレッドプールが管理してくれる。

非同期処理の管理、スレッドの再利用、効率的なスケジューリングなどをForkJoinPoolが行ってくれることにより、開発者は手動でスレッドの生成や管理をする必要がなくなり、より直感的で簡単な非同期プログラミングが可能になる

直列実行

CompletableFuture.supplyAsync()CompletableFuture<T>型のオブジェクトを返却する。

返却されたオブジェクトには、thenAccept()thenAcceptAsync()といったメソッドが用意されているため、これらのメソッドにさらにコールバック関数を渡すことで、後続の処理を登録していくことができる。

thenAccept()は同期処理をコールバック関数として登録する際に使用するのに対して、thenAcceptAsync()は非同期処理をコールバック関数をして登録する際に使用する。

thenAccept()に登録した処理は、CompletableFuture.supplyAsync()に登録した非同期処理と同じスレッド上で実行されるのに対して、thenAcceptAsync()では、ForkJoinPoolによって、プラットフォームや環境によっては新しいスレッドがスレッドプールから割り当てられる可能性がある。そのため、非同期処理の後続に、さらに別の非同期処理が続く場合にはthenAcceptAsyncを使用する。

thenAccept()thenAcceptAsync()が戻り値を持たないのに対して、処理をさらにつなげたい場合には、CompletableFuture<T>型を戻り値に持つthenApply()thenApplyAsync()を使用する。イメージとしては下の図のような形。

Async.png

Main.java
import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task1を実行します。");
            return "[Task1]";
        }).thenApplyAsync((result) -> {
            System.out.println("Task2を実行します。");
            return result + "[Task2]";
        }).thenApplyAsync((result) -> {
            System.out.println("Task3を実行します。");
            return result + "[Task3]";
        }).thenAcceptAsync((result) -> {
            System.out.println("完了したタスク:" + result);
        });

        // メインスレッドが終了しないように、非同期処理の完了を待つ
        future.join();

        // >> Task1を実行します。
        // >> Task2を実行します。
        // >> Task3を実行します。
        // >> 完了したタスク:[Task1][Task2][Task3]
    }
}

並列実行

CompletableFutureには、複数の非同期処理を並列で実行するためのメソッドallOf()がある。allOf()を使用することで、複数の非同期処理が全て完了したときの処理を登録することができる。

並列実行
import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task1を実行します。");
            return "[Task1]";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task2を実行します。");
            return "[Task2]";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task3を実行します。");
            return "[Task3]";
        });

        CompletableFuture.allOf(future1, future2, future3);

        // 全ての非同期処理の完了を待つ
        futures.join();

        String result1 = future1.join();
        String result2 = future2.join();
        String result3 = future3.join();

        System.out.println("完了したタスク:" + result1 + result2 + result3);

        // ⭐️並列実行なので、タスクの実行順は毎回変わる
        // >> Task1を実行します。
        // >> Task3を実行します。
        // >> Task2を実行します。
        // >> 完了したタスク:[Task1][Task2][Task3]
    }
}

allOf()CompletableFuture<T>を返すので、さらにその後に直列処理をつなげることもできる。下の図のようなイメージ。

Async_2.png

並列処理と直列処理
import java.util.concurrent.CompletableFuture;

public class Main {
    public static void main(String[] args) {
        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task1を実行します。");
            return "[Task1]";
        });

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task2を実行します。");
            return "[Task2]";
        });

        CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
            System.out.println("Task3を実行します。");
            return "[Task3]";
        });

        CompletableFuture.allOf(future1, future2, future3)
                .whenCompleteAsync((voidResult, ex) -> {
                    if (ex == null) {
                        System.out.println("Task1、Task2、Tsk3が完了しました。");
                    } else {
                        // 例外処理
                    }
                }).thenApplyAsync((result) -> {
                    System.out.println("後続処理を実行します。");
                    return "[後続処理]";
                }).thenAcceptAsync((result) -> {
                    System.out.println("最後の処理を実行します。");
                });


        // ⭐️並列実行なので、タスクの実行順は毎回変わる
        // >> Task2を実行します。
        // >> Task3を実行します。
        // >> Task1を実行します。
        // >> Task1、Task2、Tsk3が完了しました。
        // >> 後続処理を実行します。
        // >> 最後の処理を実行します。
    }
}

同期プリミティブ

マルチスレッドプログラミングにおいて、複数のスレッド間でデータの一貫性を保持したり、または実行のタイミングを同期したりするための基本的なツールや仕組みのことを同期プリミティブと言う。

スレッドは一つのプロセス内で複数存在し、それぞれのスレッドはプロセスに割り当てられたメモリ領域を共有するため、一つのリソースへ複数のスレッドが同時にアクセスしたり、値を更新しようとするとデータに不整合が発生することがある。この複数のスレッドが同時に一つの対象へアクセスを行うことを競合と言う。

セマフォ

Semaphore

共有リソースに対するアクセス権を、複数のスレッド間で一定数に制限、管理する仕組みのことをセマフォと言う。

データベースへの接続やファイルへのアクセスなど、限られたリソースに対する同時アクセス数を制限したい場合に利用される。

ここでセマフォ内部の機能について着目してみると、セマフォはリソースへのアクセス「権利数」を管理するカウンターとしての役目を果たしている。

リソースへのアクセス権には最大許可数があり、全てのスレッドがアクセス権を取得していない状態であれば、「取得可能数 = 最大許可数 」となる。

アクセス権が1つ取得された場合、セマフォは取得可能数のカウンターを1だけ減らす。反対にリソースへのアクセス権が1つ解放された場合、セマフォはカウンターを1だけ増やす。

このようなカウンター機能を内部に持つセマフォは、複数のスレッドからアクセス権の取得要求を受け付け、取得可能であれば権利を付与し、付与可能なアクセス権が無ければスレッドを待機させることができる。

Semaphore

java.util.concurrent.Semaphoreクラスを利用することでセマフォを実装することができる。

Semaphoreクラスの初期化では、セマフォが管理するアクセス権の最大許可数を指定する必要がある。

Semaphore semaphore = new Semaphore(最大許可数); 

共有リソースへのアクセス権要求はacquire()によって行われる。

semaphore.acquire()
// ⬇️ ここからクリティカルセクション ⬇️

クリティカルセクションにて、共有リソースを使った処理が終了したら、release()によって使用していたアクセス権を解放することができる。

// ⬆️ ここまでクリティカルセクション ⬆️
semaphore.release();

このSemaphoreクラスのメソッドを使用すると、セマフォ機能を持つMySemaphoreを作成することができる。

セマフォの実装
import java.util.concurrent.Semaphore;

public class MySemaphore {
    // 最大アクセス権許可数: 3
    private final Semaphore semaphore = new Semaphore(3);

    public void accessResourceAndDoSomething() throws InterruptedException {
        String threadName = Thread.currentThread().getName();

        // アクセス権をリクエスト(取得可能数が無ければここで待機)
        System.out.println("【リクエスト】" + threadName + " (現在の取得可能数: " + semaphore.availablePermits() + ")");
        semaphore.acquire();

        // アクセス権が取得される
        System.out.println("【取得】"  + threadName + " (現在の取得可能数: " + semaphore.availablePermits() + ")");

        try {
            // クリティカルセクション
            System.out.println(threadName + " :共有リソースを使った処理を実行(クリティカルセクション)");

            // 擬似的にリソースを使った処理を挿入(これがないとクリティカルセクションが早く終了しすぎてログが見づらくなるので)
            Thread.sleep(1000);
        } finally {
            // アクセス権を解放する
            semaphore.release();
        }
    }
}
セマフォの利用
public class Main {
    public static void main(String[] args) {
        // セマフォは1つだけ用意する
        final MySemaphore mySemaphore = new MySemaphore();

        Runnable task = () -> {
            try {
                mySemaphore.accessResourceAndDoSomething();
            } catch (InterruptedException e) {
            }
        };

        // スレッドは10個用意する
        Thread thread1 = new Thread(task);
        Thread thread2 = new Thread(task);
        Thread thread3 = new Thread(task);
        Thread thread4 = new Thread(task);
        Thread thread5 = new Thread(task);
        Thread thread6 = new Thread(task);
        Thread thread7 = new Thread(task);
        Thread thread8 = new Thread(task);
        Thread thread9 = new Thread(task);
        Thread thread10 = new Thread(task);

        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();
        thread6.start();
        thread7.start();
        thread8.start();
        thread9.start();
        thread10.start();

        // >> 【リクエスト】Thread-5 (現在の取得可能数: 3)
        // >> 【リクエスト】Thread-8 (現在の取得可能数: 3)
        // >> 【リクエスト】Thread-0 (現在の取得可能数: 3)
        // >> 【リクエスト】Thread-9 (現在の取得可能数: 3)
        // >> 【リクエスト】Thread-4 (現在の取得可能数: 3)
        // >> 【リクエスト】Thread-3 (現在の取得可能数: 3)
        // >> 【リクエスト】Thread-1 (現在の取得可能数: 3)
        // >> 【リクエスト】Thread-7 (現在の取得可能数: 3)
        // >> 【リクエスト】Thread-2 (現在の取得可能数: 3)
        // >> 【リクエスト】Thread-6 (現在の取得可能数: 3)

        // 以降の出力結果はJVMのスレッドスケジューラに依存するため実行する度に変わる
        // (セマフォが同時に3つのスレッドにしかアクセス権を付与しないというのは毎回同じ)

        // >> 【取得】Thread-1 (現在の取得可能数: 0)
        // >> 【取得】Thread-3 (現在の取得可能数: 1)
        // >> 【取得】Thread-5 (現在の取得可能数: 2)
        // >> Thread-1 :共有リソースを使った処理を実行(クリティカルセクション)
        // >> Thread-5 :共有リソースを使った処理を実行(クリティカルセクション)
        // >> Thread-3 :共有リソースを使った処理を実行(クリティカルセクション)
        // >> 【取得】Thread-6 (現在の取得可能数: 0)
        // >> 【取得】Thread-7 (現在の取得可能数: 1)
        // >> 【取得】Thread-4 (現在の取得可能数: 2)
        // >> Thread-7 :共有リソースを使った処理を実行(クリティカルセクション)
        // >> Thread-6 :共有リソースを使った処理を実行(クリティカルセクション)
        // >> Thread-4 :共有リソースを使った処理を実行(クリティカルセクション)
        // >> 【取得】Thread-8 (現在の取得可能数: 0)
        // >> 【取得】Thread-0 (現在の取得可能数: 0)
        // >> 【取得】Thread-9 (現在の取得可能数: 1)
        // >> Thread-0 :共有リソースを使った処理を実行(クリティカルセクション)
        // >> Thread-8 :共有リソースを使った処理を実行(クリティカルセクション)
        // >> Thread-9 :共有リソースを使った処理を実行(クリティカルセクション)
        // >> 【取得】Thread-2 (現在の取得可能数: 0)
        // >> Thread-2 :共有リソースを使った処理を実行(クリティカルセクション)
    }
}

出力結果を見てみると、クリティカルセクションに侵入可能なスレッド数がMySemaphoreによって3つに制限されていることがわかる。

取得可能数が

// >> 【取得】Thread-8 (現在の取得可能数: 0)
// >> 【取得】Thread-0 (現在の取得可能数: 0)
// >> 【取得】Thread-9 (現在の取得可能数: 1)

のようになっているのは、JVMが高速にアクティブスレッドを高速に切り替えることで並行処理を行い、Thread-7Thread-6Thread-4のスレッドのアクセス権解放の直後、ログ出力前にアクティブスレッドが切り替わり、Thread-8Thread-0Thread-9がアクセス権を取得しているためと思われる。

ミューテックス(排他制御)

Mutual Exclusion

同時に1つのスレッドのみが共有リソースへのアクセス権を取得できるようにした排他制御の仕組みをミューテクスと言う。

排他制御=Exlusive Control

1つしか用意されていないアクセス権をロックと表現し、またそのアクセス権を取得することをロックの獲得と言う。

共有リソースへのアクセス権取得はロックと表現される

排他制御は競合を防ぐ一方で、対象範囲をむやみに広げるとパフォーマンスに悪影響を与えるため、安全性とパフォーマンスとのバランスを考えながら使用する必要がある。

ReentrantLock

java.util.concurrent.locksパッケージには、複数のスレッドからリソースへのアクセスを排他制御することが可能なReentrantLockクラスがある。

ReentrantLockクラスはLockインターフェースを実装しているため、lock()メソッドが定義されている。これにより共有リソースへのロックを行うことができる。

Lock lock = new ReentrantLock();
lock.lock();

取得したロックは必ずunlock()によって解放する必要がある。ロック解放にはtry-finally構文を利用する。

lock.unlock();

1度unlock()を実行したスレッドが、再びロックを取得することもできる。

reentrant=再入可能

ReentrantLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyLock {
    private final Lock lock = new ReentrantLock();

    public void method() {
        // ロックの取得
        lock.lock();
        
        try {
            // クリティカルセクション:ロックされた領域での処理
        } finally {
            // ロックの解放
            lock.unlock();
        }
    }
}

ロックを取得できなかった場合の処理、ロックを指定時間内に取得できなかった場合の処理なども設定できる。

ロックを獲得できなかった場合の処理
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyLock {
    private final Lock lock = new ReentrantLock();

    public void method() {
        if (lock.tryLock()) {
            try {
                // ロックに成功した場合の処理
            } finally {
                lock.unlock();
            }
        } else {
            // ロックに失敗した場合の処理
        }
    }
}
指定時間内にロックを獲得できなかった場合の処理
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class MyLock {
    private final Lock lock = new ReentrantLock();

    public void method() throws InterruptedException {
        if (lock.tryLock(500, TimeUnit.MILLISECONDS)) {
            try {
                // 指定時間内にロックが取得できた場合の処理
            } finally {
                lock.unlock();
            }
        } else {
            // 指定時間内にロックが取得できなかった場合の処理
        }
    }
}

tryLock()throwする可能性のあるInterruptedExceptionは、スレッドが中断されたときに発生する。

具体的には、スレッドがsleep()wait()join()などによってブロック状態にあるとき、外部からそのスレッドに対してinterrupt()が呼ばれると、スレッドは中断状態になり、InterruptedExceptionが発生する。

ReentrantReadWriteLock

java.util.concurrent.locks.ReentrantReadWriteLockを使用すると、共有リソースに対して読み取り専用のロック(Read Lock)と書き込み専用のロック(Write Lock)を分けて管理することができる。

Read Lock は、複数のスレッドが同時に取得できる。

Write Lock は、1つのスレッドのみが取得することができ、また Write Lock を取得したスレッドはリソースへのアクセスを独占する(排他的)。つまり、Write Lock が取得されると、読み取り、書き込みを行おうとするスレッドは待機させられることになる。

このようにReadWriteLockは、Write Lock がロックを独占することから読み取りが頻繁に行われるが、書き込み頻度は少ないと言うシチュエーションに適している。

ReentrantReadWriteLockクラスはReadWriteLockインターフェースを実装しているため、readLock()writeLock()メソッドが定義されている。これらメソッドはいずれもLockインターフェース型を返却する。

ReentrantReadWriteLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class MyLock {
    private final ReadWriteLock rwLock = new ReentrantReadWriteLock();

    private int value = 0;

    public int readValue() {
        // Read Lock の取得
        Lock lock = rwLock.readLock();
        lock.lock();

        try {
            // クリティカルセクション:ロックされた領域での処理
            return value;
        } finally {
            // ロックの解放
            lock.unlock();
        }
    }

    public void writeValue(int value) {
        // Write Lock の取得
        Lock lock = rwLock.writeLock();
        lock.lock();

        try {
            // クリティカルセクション:ロックされた領域での処理
            this.value = value;
        } finally {
            // ロックの解放
            lock.unlock();
        }
    }
}

デッドロック

デッドロックとは、以下の状態に陥ることを指す。

  • ABという2つのリソースが存在する
  • スレッドXは、A → Bという順番でリソースに対するロックを獲得する
  • スレッドYは、B → Aという順番でリソースに対するロックを獲得する

この時スレッドXがAのロックを獲得した直後に、スレッドYがBに対するロックを獲得したとする。

この状況に陥ると、スレッドXとスレッドYがお互いのロック解除を永遠に待つこととなる。この状態をデッドロックという。

ReentrantLockReentrantReadWriteLockを使用する場合、デッドロックに気をつける必要がある。

モニター

ミューテックスの機能に加えて、スレッド間の通信や同期をするための高レベルの同期プリミティブとしてモニターがある。

モニターは、1つ以上のミューテックスと条件変数から構成される。モニター内部のクリティカルセクションは、同時に1つのスレッドしか実行できない(ミューテックス)。

またモニター内ではロックの取得と解放が暗黙的に行われる。

synchronized

Javaのモニターはオブジェクト単位で提供され、オブジェクトのsynchronizedメソッドやsynchronizedブロックを使うことによって、スレッドはモニターを取得できる。

「モニターを取得する」とは、「オブジェクトに対する排他的なロックを取得する」ことを意味し、具体的な効果としては、そのオブジェクトに対する同時アクセスを一つのスレッドに限定することができる。

synchronizedによってモニターは取得できる。
モニターが取得されると、暗黙的なロックが行われる。

モニターを取得したスレッドだけが、synchronizedブロック内のコードを実行することができる。

反対に、synchronizedブロックを抜けるときに、モニターは解放される。

モニターを取得している間に実行されるコード範囲をクリティカルセクションという。クリティカルセクション内のコードは、同時に一つのスレッドしか実行することができない。

synchronized修飾子

synchronized修飾子
synchronized void method() {
    // クリティカルセクション:ロックされた領域での処理
}

メソッドに対して修飾子として使用した場合、取得できるのはthisオブジェクトに対するモニターになる。

thisのモニターを取得する
public class MyClass {
    // synchronizedMethod()が呼ばれると、オブジェクト(this)のモニター(ロック)がスレッドによって取得される
    public synchronized void synchronizedMethod() {
        // クリティカルセクション
    } // ブロックを抜けると、モニターが解放される
}

ここで取得されるのはthisのモニターであるため、インスタンスをまたがる排他制御は行われないないことに注意(MyClassからインスタンスAとBが作成されている時、AとBの間では排他制御が行われない)。

synchronizedブロック

取得したいモニターが自身のオブジェクト(this)ではない場合、synchronizedブロックを使用して取得する。

オブジェクトに対して使用する際、Javaでは慣例的にロック対象のオブジェクトの変数名をlockにすることが多い。

synchronized(ロック対象のオブジェクト) {
    // クリティカルセクション:ロックされた領域での処理
}
synchronizedブロック
public class MyClass {
    // モニターを取得したいオブジェクト(MyClass(this)ではない点に注目)
    private final Object lock = new Object();

    public void method() {
        // このブロックに入るときに、lockオブジェクトのモニター(ロック)が取得される
        synchronized (lock) {
            // クリティカルセクション   
        } // ブロックを抜けると、モニターが解放される
    }
}

synchronizedの利用

synchronizedを使用することで、共有リソースに対する複数スレッドからのアクセスを簡単に制御することができる。

public class Counter {
    private int count = 0;

    public synchronized void increment() {
        count++;
    }

    public synchronized int getCount() {
        return count;
    }
}

条件変数

Condition Variable

スレッドが特定の条件を満たすまで待機したり、条件が満たされたことを他のスレッドに通知するための仕組みを条件変数と言う。

(個人的に初めここがよく理解できなかった。ある「変数」のことを条件変数と言っているのかと思っていたが、そうではなく、仕組み全体を条件変数と呼んでいる。)

java.lang.Objectクラスのwait()notify()notifyAll()メソッドから構成される。

wait()

wait()は、スレッドがモニターが取得した状態で実行されなればならない。モニターが取得されない状態でwait()が実行されると、IllegalMonitorStateExceptionが発生する。

スレッドからwait()が実行されると、スレッドは一時的にオブジェクトのモニターを解放し、指定された条件が満たされるまでスレッドをその場で待機させる。

スレッドは再びモニターのロックを取得するまで待機し続け、ロックが再取得された時に待機を解除する。

// 条件(ture=達成、 false=未達成)
boolean condition;

// モニターを取得
synchronized(lock) {
    // 条件を確認
    while (!condition) {
        // 条件未達成なら、スレッドはモニターを解放し、条件達成まで待機する
        lock.wait();
    }
    // 条件が達成されると、再びモニターを取得してスレッドの待機を解除する
}

notify()

wait()によって待機しているスレッドのうち、1つのスレッドを再開させる。どのスレッドを再開させるかはJVMによって決定される。

モニターを取得したスレッドから実行しないとIllegalMonitorStateExceptionが発生する。

synchronized(lock) {
    condition = true;
    lock.notify();
}

notifyAll()

wait()によって待機している全てのスレッドを再開させる。

モニターを取得したスレッドから実行しないとIllegalMonitorStateExceptionが発生する。

synchronized(lock) {
    condition = true;
    lock.notifyAll();
}

条件変数を実装する

条件変数を実装する
public class MyConditionVariable {
    private final Object lock = new Object();

    // 条件(true:達成 false:未達成)
    private boolean condition = false;

    // 条件達成時のみ処理を実行し、条件未達時はスレッドを待機させる
    public void awaitCondition() throws InterruptedException {
        synchronized (lock) {
            String threadName = Thread.currentThread().getName();

            while (!condition) {
                System.out.println(threadName + ": 条件未達成のため、待機します...");
                lock.wait();
            }
        }
    }

    // 条件達成を待機中のうちの1つのスレッドに知らせる
    public void signalCondition() {
        synchronized (lock) {
            condition = true;

            lock.notify(); // 待機中のスレッドに通知
            System.out.println(Thread.currentThread().getName() + ": 条件が達成されたため、待機中のスレッドがいれば通知します");
        }
    }

    // 条件達成を待機中の全てのスレッドに通知する
    public void signalConditionAll() {
        synchronized (lock) {
            condition = true;
            
            System.out.println(Thread.currentThread().getName() + ": 条件が達成されたため、待機中のスレッドがいれば通知します");
            lock.notifyAll(); // 待機中のスレッドに通知
        }
    }
}
条件変数の利用
public class Main {
    public static void main(String[] args) {

        MyConditionVariable myConditionVariable = new MyConditionVariable();

        Runnable waitAndDoTask = () -> {
            String threadName = Thread.currentThread().getName();

            try {
                System.out.println(threadName + ": 条件が達成されていれば、処理を実行したいです");
                myConditionVariable.awaitCondition();
            } catch (InterruptedException e) {
            }

            // 条件が満たされた後の処理
            System.out.println(threadName + ": 条件が達成されたようなので、待機を解除し、処理を実行します");
        };

        Thread waitThread1 =  new Thread(waitAndDoTask);
        Thread waitThread2 =  new Thread(waitAndDoTask);

        waitThread1.start();
        waitThread2.start();

        try {
            // ログの出力をわかりやすくするため、メインスレッドを1秒間待機させる
            Thread.sleep(1000); // 3000ミリ秒 = 3秒
        } catch (InterruptedException e) {
        }

        Runnable signalTask = () -> {
            try {
                System.out.println(Thread.currentThread().getName() + ": 条件達成のための処理をこれから実行します!(2秒かかる)");

                // 条件達成までにかかる処理時間
                Thread.sleep(2000);

                myConditionVariable.signalConditionAll();
            } catch (InterruptedException e) {
            }
        };

        Thread signalThread = new Thread(signalTask);
        signalThread.start();

        // >> Thread-0: 条件が達成されていれば、処理を実行したいです
        // >> Thread-1: 条件が達成されていれば、処理を実行したいです
        // >> Thread-0: 条件未達成のため、待機します...
        // >> Thread-1: 条件未達成のため、待機します...
        // >> Thread-2: 条件達成のための処理をこれから実行します!(2秒かかる)
        // >> Thread-2: 条件が達成されたため、待機中のスレッドがいれば通知します
        // >> Thread-1: 条件が達成されたようなので、待機を解除し、処理を実行します
        // >> Thread-0: 条件が達成されたようなので、待機を解除し、処理を実行します
    }
}

Condition

より高機能な条件変数の機能を提供するものに、java.util.concurrent.locks.Conditionクラスがある。

ConditionクラスはLockから取得することができ、2つは組み合わせて利用する。

Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();

ConditionLockの組み合わせにより条件変数を実装すると、synchronizedでは開発者が意識することのなかったロックの取得と解放を、より低級な記述によってカスタマイズしながら実装することができる。synchronizedの役割をLockが代わりに果たし、Objectの待機、通知機能をConditionが代わりに果たす。

await()

Objectwait()に相当するメソッド。

await()を実行したConditionsignal()またはsignalAll()により signal を発信するまでスレッドを待機させる。

signal()

Objectnotify()に相当するメソッド。

ロックを取得した状態で実行する必要がない。

signalAll()

ObjectnotifyAll()に相当するメソッド。

Conditionによる条件変数
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;

public class MyConditionVariable {
    private final Lock lock = new ReentrantLock();

    // Lock と紐づく Condition
    private final Condition condition = lock.newCondition();
    
    private boolean conditionMet = false; // met: 「meet=会う、(条件を)満たす」の過去形

    public void awaitCondition() throws InterruptedException {
        lock.lock();
        
        try {
            while (!conditionMet) {
                // 条件達成までここで待機
                condition.await();
            }
            // 条件達成時の処理
        } finally {
            lock.unlock();
        }
    }

    public void signalCondition() {
        lock.lock();
        
        try {
            conditionMet = true; // 条件達成

            // 1つの待機中のスレッドに通知
            condition.signal();
        } finally {
            lock.unlock();
        }
    }

    public void signalAllConditions() {
        lock.lock();
        
        try {
            conditionMet = true; // 条件達成

            // すべての待機中のスレッドに通知
            condition.signalAll();
        } finally {
            lock.unlock();
        }
    }
}

生産者/消費者問題

Conditionオブジェクトは一つのロックに対して複数の条件を設定することができる。

条件変数が複数の条件を管理する例として、生産者/消費者問題Producer–consumer problem)と呼ばれる古典的な問題がある。

生産者/消費者問題では、キューに対してデータを生成して格納するスレッド(生産者、Producer)とキューからデータを取り出して利用するスレッド(消費者、Consumer)について考える。

キューには容量があるので、もし空きが無ければ生産者はデータを入れることができない。このとき生産者スレッドは空きができるまで待機することになるため、Conditionが管理する条件は「キューに空きができること」になる。条件が達成されたとき、待機していたスレッドは再開する。

一方、消費者はキューにデータが存在しない場合、データを取り出すことができない。生産者と同様に、消費者のスレッドもデータがキューに格納されるまで待機する。Conditionが管理する条件は「キューにデータが格納されていること」になる。

Conditionを利用することで、このような複数の条件に対応する待機スレッドを、条件ごとに管理することができる。

生産者/消費者問題
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;


public class ProducerConsumer {
    // キュー
    private final Queue<Integer> queue = new LinkedList<>();

    // キューの容量
    private final int MAX_SIZE = 10;

    private final Lock lock = new ReentrantLock();

    // 一つの Lock に複数の Condition が紐づいている
    // 「キューが満杯ではない」という条件
    private final Condition notFull = lock.newCondition();

    // 「キューが空ではない」という条件
    private final Condition notEmpty = lock.newCondition();

    // 生産者スレッド
    public void produce() throws InterruptedException {
        int value = 0;

        // 生産者は生産を続ける
        while (true) {
            lock.lock();
            try {
                // 条件 = 「キューが満杯ではない」
                while (queue.size() == MAX_SIZE) {
                    // 条件未達成の場合、Condition によって一時的にロックが解放され、スレッドは待機させられる
                    notFull.await();
                }
                // 条件が達成されると、Condition によってロックが再取得され、スレッドの待機は解除される
                queue.add(value);
                System.out.println("生産: " + value);

                // 生産処理に1秒かかるようにする
                Thread.sleep(1000);

                // 次回生産用の値
                value++;

                // キューが空ではなくなったため、signal を発信
                notEmpty.signal();
            } finally {
                lock.unlock();
            }
        }
    }

    // 消費者スレッド
    public void consume() throws InterruptedException {
        // 消費者は消費を続ける
        while (true) {
            lock.lock();
            try {
                // 条件 = 「キューが空ではない」
                while (queue.isEmpty()) {
                    // 条件未達成の場合、Condition によって一時的にロックが解放され、スレッドは待機させられる
                    notEmpty.await();
                }
                // 条件が達成されると、Condition によってロックが再取得され、スレッドの待機は解除される
                int value = queue.poll();
                System.out.println("消費: " + value);

                // 生産処理に1秒かかるようにする
                Thread.sleep(1000);

                // キューが満杯ではなくなったため、signal を発信
                notFull.signal();
            } finally {
                lock.unlock();
            }
        }
    }
}
Main.java
public class Main {
    public static void main(String[] args) {
        ProducerConsumer pc = new ProducerConsumer();

        Thread producerThread = new Thread(() -> {
            try {
                pc.produce();
            } catch (InterruptedException e) {
            }
        });

        Thread consumerThread = new Thread(() -> {
            try {
                pc.consume();
            } catch (InterruptedException e) {
            }
        });

        producerThread.start();
        consumerThread.start();

        // >> 生産: 0
        // >> 生産: 1
        // >> 生産: 2
        // >> 生産: 3
        // >> 生産: 4
        // >> 生産: 5
        // >> 生産: 6
        // >> 生産: 7
        // >> 生産: 8
        // >> 生産: 9
        // >> 消費: 0
        // >> 消費: 1
        // >> 消費: 2
        // >> 消費: 3
        // >> 消費: 4
        // >> 消費: 5
        // >> 消費: 6
        // >> 消費: 7
        // >> 消費: 8
        // >> 消費: 9
        // >> 生産: 10
        // >> 生産: 11
        // >> 生産: 12
        // >> 生産: 13
        // >> 生産: 14
        // >> 生産: 15
        // >> 生産: 16
        // >> 生産: 17
        // >> 生産: 18
        // >> 生産: 19
        // >> 消費: 10
        // >> 消費: 11
        // >> 消費: 12
        // >> 消費: 13
        // >> 消費: 14
        // >> 消費: 15
        // >> 消費: 16
        // >> 消費: 17
        // >> 消費: 18
        // >> 消費: 19
        // >> 生産: 20
        // >> 生産: 21
        .
        .
        .
        
    }
}

volatile

synchronizedと似たものにvolatileと言う修飾子がある。

volatileは「揮発性」を意味し、値が変化しやすいことをコンパイラに通知する修飾子。

volatileで宣言されたフィールドは、コンパイラによって最適化されず、常にメインメモリからアクセスされることが保証される。

メモリの種類

メモリには以下の種類があり、それぞれアクセス速度と容量が異なる。

  • メインメモリ : 補助記憶装置からプログラムが展開される主要なメモリ
  • キャッシュメモリ : メインメモリよりも高速でCPUに近い位置にある
  • レジスタ : 最も高速なメモリでCPU内部にある

アクセス速度順に並べると、

遅い < メインメモリ < キャッシュメモリ < レジスタ < 速い

容量はその逆。

小さい < レジスタ < キャッシュメモリ < メインメモリ < 大きい

という関係になっている。

コンパイル

Javaのプログラムは、実行されるまでに2回のコンパイルを経ることがある。

事前コンパイル

ソースコード(Main.java)から中間バイトコード(Main.class)への静的コンパイルを指す。

「実行前に」 javac によって行われる。

プラットフォーム(OS)に依存しない中間コードが生成される。

実行時コンパイル

JVM によって「実行時に」行われる動的コンパイル。

事前コンパイルされた中間コードの一部が機械語に変換される。

パフォーマンス向上を目的として行われる。

JVM

JVMのJava実行エンジンには インタプリタモードと、 JITコンパイラモード (JIT = Just-In-Time)がある。

インタプリタモード

中間コードを1行づつ解釈して実行する。

JITコンパイラモード

中間コードの一部を、実行時コンパイルによって機械語に変換しながら実行するモード。

機械語はCPUが直接理解することのできる形式のため、中間コードのままインタプリタで実行させるよりも実行速度が向上する。

この処理を最適化と言う。

最適化

JITコンパイラは、実行されるコードのうち頻繁に利用される部分に対して実行時コンパイルを行い、更にキャッシュメモリを利用して 最適化 を行う。

通常フィールドは、メインメモリ上に展開されて、プログラム内で更新されることがある。

一方で、プログラム内で頻繁に利用される一部のフィールドなどは、最適化によってキャッシュメモリ上に展開される。

キャッシュされたフィールドは、メインメモリ上のフィールドと同期されないため、最新の値が反映されず、複数のスレッドが同一のフィールドを更新したときに矛盾が生じることがある。

volatileの効果

複数のスレどから更新される可能性のあるフィールドに対してvolatile修飾子を使用して宣言しておくと、そのフィールドが他のスレッドによって変更される可能性があることをコンパイラに対して示すことになるため、コンパイラがそのフィールドへのアクセスを最適化しないようにすることができる。

volatileを付与したフィールドは、常にメインメモリから読み込まれ、変更が即座にメインメモリ上に反映されることが保証される

volatileで宣言したフィールドは、キャッシュされずに常にメインメモリから読み込まれる

複数のスレッド間で最新の値にアクセスできることから、「変数の可視性が確保される」とも表現される。

ただし、複数のスレッド間からの更新処理の競合を防ぐものではないので、「原子性は確保されない」と表現される。

volatileは競合自体を防ぐものではない

volatileの例
class MyClass {
    // volatileを使ったフィールド
    volatile boolean flag = false;
}

MyClass myClass = new MyClass();

// スレッドAからのアクセス
myClass.flag = true;  // メインメモリ上で更新が即反映される

// スレッドBからのアクセス
if (myClass.flag) {
    // volatileを使用しない場合、キャッシュされていたflag(falseが格納)にアクセスされることがある
    // volatileを使用することで、必ずメインメモリから読み出されるため、矛盾が発生しない
}

synchronzedvolatile

synchronzedvolatileは、どちらもマルチスレッド環境でデータの一貫性を確保するための手段であるが、それぞれ違いがある。

synchronized

排他制御を実現するためにメソッドやブロックに対して使用される。

あるスレッドがsynchronizedブロックに入ると、他のスレッドはロックしたインスタンスやメソッドへアクセスすることができなくなる。

メソッドやブロックに対して使用することができるため、トランザクションのようなある程度まとまった単位で処理の原子性を確保することができる。デッドロックの危険がある。

volatile

複数のスレッドからの可視性を確保するために、変数に対して使用される。volatileを使用した変数はキャッシュメモリではなく、必ずメインメモリを介してアクセスされる。単一な操作に対しては効果があるものの、synchronizedと比べて、局所的な制御という感じで、複数の処理を排他制御するなどはできない。volatileを使用しても、複数のスレッドが同時に同じフィールドを変更しようとする競合は防ぐことができない。

volatile
class MyClass {
    // キャッシュされないようにできる
    volatile boolean flag = false;
}
synchronized
class MyClass {
    private int i1 = 0;
    private int i2 = 0;

    public synchronized void incrementCounters() {
        // 複数のスレッドから呼び出されたとしても、競合しない
        i1++;
        i2++;
    }
}
volatile と synchronized
// 組み合わせて使用することもできる
class MyClass {
    private volatile int i1 = 0;
    private volatile int i2 = 0;

    public void incrementCounters() {
        synchronized (this) {
            i1++;
            i2++;
        }
    }
}

複数の文を排他制御したり、複雑な処理を同期処理したい場合にはsynchronizedを使用し、単に一つの変数の可視性を確保したい場合にはvolatileを使用する。

アトミック変数

java.util.concurrent.atomicパッケージ

スレッドセーフなプログラミング手法には他にも、java.util.concurrent.atomicパッケージに含まれるAtomicBooleanAtomicIntegerなどのクラス群を利用する方法もある。

これらのクラス群を使用した型は、値の取得、演算、再代入までの操作を、他のスレッドに割り込まれることなく実行できる(スレッド同士が競合しない)。これはアトミックな操作と表現とされる(Atomic=原子の、不可分の)。

元々、Javaのlongdouble型以外の基本型(intshortbytecharfloat)はアトミックに操作できるため、AtomicLongAtomicDoubleが主に利用される。

Javaのlongdoubleのデータ型は、64bitのメモリ領域を必要とする。64bitのデータ型は、ハードウェア上では2回にわたる32bitの操作として扱われることがあり、この2回の操作の間に他のスレッドに割り込まれる可能性があるため、アトミックな操作が保証されない。

AtomicInteger
import java.util.concurrent.atomic.AtomicInteger;

AtomicInteger atomicInt = new AtomicInteger(10);

// 値の取得
int currentValue = atomicInt.get();

// 値の設定(インクリメント)
int newValue = atomicInt.incrementAndGet();

// アトミックな更新
// 10と等しい場合、20を代入する。10と等しい場合、メソッドはtrueを返却する
boolean isUpdated = atomicInt.compareAndSet(10, 20);

バリアー

CyclicBarrier

複数のスレッドで実行される処理において、それぞれの実行タイミングを一致させるような制御をスレッドの同期化という。

同期化を行うには、java.util.concurrent.CyclicBarrierクラスを使用する。

CyclicBarrier は同期化支援機能を持つクラスであり、バリアー という概念を扱う。

複数のスレッドが並列実行されているときに、全てのスレッドが特定のポイントに到達するまで待機させるポイントを バリアーポイント と言う。

また全てのスレッドがバリアーポイントに到達した時、バリアーの解除と同時に実行される処理をバリアーアクションと言う。

CompletableFuture.allOf()と混同してしまいがちだが、CompletableFuture が非同期処理の「完了」に焦点を当てた高級なフレームワークであるのに対して、CyclicBarrier はスレッド間の「同期」に焦点を当てた低級な同期プリミティブである。

CompletableFuture では非同期処理の完了結果を次の非同期処理に渡し、複数の非同期処理を連携させることができる。

CyclicBarrier では非同期処理の途中の好きな場所にバリアーポイントを設置し、複数の非同期処理間のタイミングを同期させることができる。

全てのスレッドのデータ集約、ログ出力などの共通処理がバリアーアクションとされることが多い。

cyclic_barrier.png

CyclicBarrierを利用するには、まず初めに同期させるスレッド数、バリアーアクションを指定する必要がある。

バリアーの生成
CyclicBarrier barrier = new CyclicBarrier(int parties, Runnable barrierAction)
// parties = スレッド数

生成したバリアーを利用して、バリアーポイントを作成することができる

バリアーポイントの設置
barrier.await();

以下ではスレッドを3つ作成して実行している。

実行の順番は前述の通りスレッドスケジューラが管理しているため実行する度に変わるが、バリアーポイントを設置したことによって3つのスレッドの間で同期が行われ、バリアーアクションがバリアーポイント通過後に実行されている。

Main.java
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Main {
    public static void main(String[] args) {
        // バリアーの生成
        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            System.out.println("バリアーアクションの実行");
        });

        Runnable task = () -> {
            try {
                String threadName = Thread.currentThread().getName();
                System.out.println(threadName + " バリアーポイントへ到達");

                // バリアーポイントで待機
                barrier.await();
                System.out.println(threadName + " バリアー解除");
            } catch (InterruptedException | BrokenBarrierException e) {
            }
        };

        // スレッドの生成
        Thread thread1 = new Thread(task);
        Thread thread2 = new Thread(task);
        Thread thread3 = new Thread(task);

        // スレッドの開始
        thread1.start();
        thread2.start();
        thread3.start();

        // >> Thread-2 バリアーポイントへ到達
        // >> Thread-0 バリアーポイントへ到達
        // >> Thread-1 バリアーポイントへ到達
        // >> バリアーアクションの実行
        // >> Thread-0 バリアー解除
        // >> Thread-2 バリアー解除
        // >> Thread-1 バリアー解除
    }
}

バリアーポイントの設置のために利用したawait()は到着したスレッドのインデックスを返すため、バリアーポイントへの到着順序に応じた処理を実行することもできる。インデックスの最小値は 0、最大値は getParties() - 1

await()の利用
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Main2 {
    public static void main(String[] args) {
        // スレッド数を10に変更
        CyclicBarrier barrier = new CyclicBarrier(10, () -> {
            System.out.println("バリアーアクションの実行");
        });

        Runnable task = () -> {
            try {
                String threadName = Thread.currentThread().getName();
                System.out.println(threadName + " バリアーポイントへ到達");

                // 到着インデックスを取得する
                int index = barrier.await();
                
                if (index == barrier.getParties() - 1) {
                    System.out.println("最初にバリアーポイントへ到着したのは " + threadName + " です");
                } else if (index == 0) {
                    System.out.println("最後にバリアーポイントへ到着したのは " + threadName + " です");
                }
            } catch (InterruptedException | BrokenBarrierException e) {
            }
        };

        Thread thread1 = new Thread(task);
        Thread thread2 = new Thread(task);
        Thread thread3 = new Thread(task);
        Thread thread4 = new Thread(task);
        Thread thread5 = new Thread(task);
        Thread thread6 = new Thread(task);
        Thread thread7 = new Thread(task);
        Thread thread8 = new Thread(task);
        Thread thread9 = new Thread(task);
        Thread thread10 = new Thread(task);

        thread1.start();
        thread2.start();
        thread3.start();
        thread4.start();
        thread5.start();
        thread6.start();
        thread7.start();
        thread8.start();
        thread9.start();
        thread10.start();

        // >> Thread-2 バリアーポイントへ到達
        // >> Thread-6 バリアーポイントへ到達
        // >> Thread-1 バリアーポイントへ到達
        // >> Thread-4 バリアーポイントへ到達
        // >> Thread-3 バリアーポイントへ到達
        // >> Thread-7 バリアーポイントへ到達
        // >> Thread-8 バリアーポイントへ到達
        // >> Thread-9 バリアーポイントへ到達
        // >> Thread-5 バリアーポイントへ到達
        // >> Thread-0 バリアーポイントへ到達
        // >> バリアーアクションの実行
        // >> 最初にバリアーポイントへ到着したのは Thread-2 です
        // >> 最後にバリアーポイントへ到着したのは Thread-7 です
    }
}

CyclicBarrierオブジェクトは再利用可能なオブジェクトなので、バリアーポイントの設置、バリアーアクションの登録は何度でも繰り返し行うことができる。

cyclic_barrier.png

バリアーの再利用
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;

public class Main2 {
    public static void main(String[] args) {
        CyclicBarrier barrier = new CyclicBarrier(3, () -> {
            System.out.println("バリアーアクションの実行");
        });

        Runnable task = () -> {
            try {
                System.out.println(Thread.currentThread().getName() + " バリアーポイントへ到達");
                barrier.await();
            } catch (InterruptedException | BrokenBarrierException e) {
            }
        };

        Thread thread1 = new Thread(task);
        Thread thread2 = new Thread(task);
        Thread thread3 = new Thread(task);
        Thread thread4 = new Thread(task);
        Thread thread5 = new Thread(task);
        Thread thread6 = new Thread(task);

        thread1.start();
        thread2.start();
        thread3.start();

        try {
            // メインメソッドを1秒間待機させることで、thread1~3がバリアーポイントへ到着する時間を与える
            Thread.sleep(1000);
        } catch (InterruptedException e) {
        }

        thread4.start();
        thread5.start();
        thread6.start();

        // >> Thread-2 バリアーポイントへ到達
        // >> Thread-0 バリアーポイントへ到達
        // >> Thread-1 バリアーポイントへ到達
        // >> バリアーアクションの実行

        // >> Thread-3 バリアーポイントへ到達
        // >> Thread-5 バリアーポイントへ到達
        // >> Thread-4 バリアーポイントへ到達
        // >> バリアーアクションの実行
    }
}

InterruptedException

InterruptedExceptionは、スレッドが

  • スリープ(Thead.sleep()
  • 待機(Object.wait()Condition.await()
  • または他のブロッキング操作(Thread.join()

を実行しているときに、他のスレッドによって割り込みが発生したことを示すためにthrowされる検査例外のことを指す。

Threadには以下のメソッドが定義されている。

スレッドに割り込む
public void interrupt()

実行したThreadのスレッドに対して割り込みを発生させる。

現在のアクティブスレッドが割り込まれているかどうかを調べる
public static boolean interrupted()

割り込みステータスと呼ばれる、Threadクラスが内部で保持する自身のスレッドに対して割り込みが発生したかどうかの情報がinterrupted()の実行によって削除される。そのため、2回連続で実行した場合、2回目の実行結果は必ずfalseになる。

このスレッドが割り込まれているかどうかを調べる
public boolean isInterrupted()

割り込みステータスが削除されない。

(この辺りの知識が非常に複雑でうまく理解できなかった。また折を見て内容を更新したい。)

段階的に整理したいので、まずはスレッドが待機している状態を作る。

Thread waitThread = new Thread(() -> {
    try {
        System.out.println("waitThreadを10秒間待機させます...");
        Thread.sleep(10000);
        System.out.println("waitThreadの待機を解除します");
    } catch (InterruptedException e) {
        System.out.println("waitThreadへの割り込みが発生しました");
    }
});

waitThread.start();

// >> waitThreadを10秒間待機させます...
// ----- 10秒間何も出力されない -----
// >> waitThreadの待機を解除します

次に、この待機中のスレッドに割り込みを発生させる。

Thread waitThread = new Thread(() -> {
    try {
        System.out.println("waitThreadを10秒間待機させます...");
        Thread.sleep(10000);
        System.out.println("waitThreadの待機を解除します");
    } catch (InterruptedException e) {
        System.out.println("waitThreadへの割り込みが発生しました");
    }
});

waitThread.start();

try {
    // waitThread が確実に待機状態に入るための時間を与える
    Thread.sleep(5000);

    // メインスレッド上から、現在待機中の waitThread への割り込みを発生させる
    waitThread.interrupt();
} catch (InterruptedException e) {
    // 今回ここはあまり意識しなくて良いところ
    System.out.println("メインスレッドへの割り込みが発生しました");
}

// >> waitThreadを10秒間待機させます...
// ----- 5秒間何も出力されない -----
// >> waitThreadへの割り込みが発生しました

Threadクラスに定義されていたisInterrupted()メソッドの使い方を確認してみる。

Thread waitThread = new Thread(() -> {
    try {
        int i = 0;
        // 割り込みが発生しない限り、ループを抜けない
        while (!Thread.currentThread().isInterrupted()) {
            i++;
            System.out.println(i + "回目のループ");
            Thread.sleep(2000);
        }
        System.out.println("waitThreadの: isInterrupted true");
    } catch (InterruptedException e) {
        System.out.println("waitThreadへの割り込みが発生しました");
    }
});
waitThread.start();

try {
    Thread.sleep(5000);

    System.out.println("interrupt()");
    waitThread.interrupt();

    if (waitThread.isInterrupted()) {
        System.out.println("isInterrupted() : true");
    } else {
        System.out.println("isInterrupted() : false");
    }

    // waitThread の終了を待つ(この処理が無いと、メインスレッドが waitThread より先に終了する)
    waitThread.join();
} catch (InterruptedException e) {
    System.out.println("メインスレッドへの割り込みが発生しました");
}

System.out.println("メインスレッドが終了");

// >> 1回目のループ
// >> 2回目のループ
// >> 3回目のループ
// >> interrupt()
// >> isInterrupted() : true
// >> waitThreadへの割り込みが発生しました
// >> メインスレッドが終了

InterruptedException発生時の処理

Threadは内部で割り込みステータスと呼ばれる、割り込みが発生したかどうかの情報を管理している。

InterruptedExceptionが発生した場合、この割り込みステータスはクリアされるため、もしスレッド生成元に割り込みがあったことを伝えたい場合には、割り込みがあったスレッドからさらに割り込みを行う必要がある。

catch (InterruptedException e) {
    Thread.currentThread().interrupt();
}

InterruptedExceptionが発生すると、割り込みステータスはクリアされる

割り込みを受け取った場合に適切な処理を行います。これは、割り込みフラグをクリアするか(Thread.interrupted()を呼び出す)、例外処理を行うか、必要に応じてリソースを解放するなどの対策を含みます

参考

Head First Java 第2版 ―頭とからだで覚えるJavaの基本
独習Java 新版
徹底攻略Java SE 11 Gold問題集[1Z0-816]対応
マルチスレッド・プログラミングの道具箱

5
12
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
12

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?