Javaの非同期処理
Javaにおいてマルチスレッドとは、一つのプロセス内で同時に実行される複数のスレッドを指す。
プロセスとは、実行中のプログラムのインスタンスを指す。プロセスはOSによって管理されるプログラムの実行単位である。
スレッドもまた、プログラムの実行単位を指す。一つのプロセス内には複数のスレッドが存在することができるため、両者は、プロセスが親、スレッドは子という親子関係にある。
Javaは一つのプロセス内で動作していて、そのプロセス内では複数のスレッドが活用されることで、より効率的に処理が実行されるようになっている。
コンピュータ上で複数のプロセスが起動されている場合、各プロセスは独立したそれぞれのメモリ領域を持ち、通常プロセス同士はそれぞれ隔離された環境を持っている。プロセス内での資源の割り当てやスケジューリングは、OSによって管理されていて、異なるプロセス間ではメモリ領域が共有されない。
「資源の割り当て」とは具体的に、ヒープ、スタックなどのメモリ領域の割り当て(メモリ上のどの番地からどの番地を使うか)や、CPUの使用権利の割り当て、ファイルや入出力デバイスの割り当てなどを指す。異なるプロセス間で情報をやり取りするためには、IPC(Inter Process Communication)やネットワーク通信(ソケット、パイプ)などの特殊な技術を使用する必要がある。
Javaは一つのプロセス内で動作していて、プロセスに割り当てられたメモリ領域を、プロセス内の各スレッドが共有するため、複数のスレッドが同時に同じメモリ領域にアクセスする競合を起こすことがある。
競合を避けるためには、開発者がスレッドセーフなプログラミングを行う必要がある。
スレッドとは
スレッドとは、プログラムの実行単位であり、一つのプロセス内で動作する独立した処理フローのことを指す。同じプロセス内で、複数存在することができる。各スレッドは個別の実行コンテキストを持ち、プロセス内で共有されるメモリ領域へアクセスを行う。
プロセスがOSによって管理されるのと同様に、スレッドもまた、OSによって管理される。具体的には、各スレッドに対してCPUの使用権の割り当て、つまり実行順の管理がOSによって行われる。
Javaでは、スレッドが起動されると、各スレッドに「スタック」と呼ばれるメモリ領域が割り当てられる。スタックとは、スレッドがメソッドを実行した時に、そのメソッド内のローカル変数、ステータス情報などのコンテキストが保持される領域のことを指す。メソッドの実行が終了すると、スタックに積まれたコンテキストは取り除かれる。
プログラムの実行中は、メソッドの実行によってコンテキストが積まれ、またメソッド内から別のメソッドが実行されることで、その上にさらに新たなコンテキストが積まれる。反対に、メソッドが終了するとスタックの一番上に積まれたコンテキストは除去され、これらが繰り返されることで、スレッド(Thread、糸)が形成される。
つまりスレッドとは、物理的な何かを指すものではなく、プログラムの処理の流れを抽象化したものと言うことができる。
(言語は異なりますが、スレッドの概念は下記の記事でもまとめています。異なるプログラミング言語においては、実行モデルが異なることがあるため、ご注意ください。)
public class Main {
public static void main(String[] args) {
fn();
}
public static void fn() {
System.out.println("hello world");
}
}
マルチスレッドと並列処理
プログラムには並行処理と並列処理という概念がある。(詳細は下記の記事にまとめています)
簡単に区別すると、実行中の処理を高速に切り替えることで複数の処理が同時に実行されているように見せるのが並行処理で、実際に複数の処理を物理的に同時実行しているのが並列処理。
物理的な並列処理は、マルチコアプロセッサなどの複数のコアを持つハードウェア上でプログラムを実行することによって実現される。複数のコアに演算処理を分散させ、同時に実行させることで、全体としてプログラムの処理が高速に実行される。
プログラムが、シングルコアのプロセッサ上で動作している場合や、スレッドが競合状態(複数のスレッドが同時にデータにアクセスしようとする状態)に陥っている場合などは、物理的な同時性は実現しない。
マルチコアプロセッサとは、複数のコアを持つCPUのことを指す。コアは、算術論理演算(加算、減算、乗算、除算など)や論理演算(AND、OR、NOTなど)などの基本的な演算を実行する能力を持っている。シングルコアプロセッサは、コアが一つしかないCPU。
マルチスレッド環境で実行されるプログラムは、実際には、必ずしも複数のスレッドが物理的に同時に動いているとは限らない。人間から見ると同時に実行されているように見えても、実際には複数のスレッドが高速に一つのCPUを譲り合って並行処理を進めているだけかもしれない。
並行処理におけるCPUの割り当てはスレッドスケジューラによって行われ、処理中のスレッドはアクティブスレッドとも呼ばれる。スレッドスケジューラとは、実行可能なスレッドの優先度や状態に基づいて、どのスレッドを次に実行するかを決定している。
スレッドスケジューラは、複数のスレッドを物理的に並列処理するためのものではなく、複数のスレッドを効果的に管理し、複数のスレッドの並行処理を効率的に実行するための仕組みである。
つまり、物理的な同時性はハードウェアが提供するものであり、マルチスレッドによるプログラミングが提供できるものではない。
スレッドを生成する
Javaのプログラムは通常、JVMによって自動的に起動されたメインスレッド上で実行される。そのため、開発者は「メインスレッドを起動するためのプログラム」を書かない。
一方で、メインスレッド以外のスレッドは、明示的に生成を行わない限り存在しない(ライブラリやフレームワークが提供する非同期処理や、ガベージコレクションなどのJVMによって自動的に生成されるスレッドは存在する)。
スレッドを明示的に生成しない場合のプログラムの処理は、すべてメインスレッド上で実行される。
スレッドを生成する方法には以下の方法がある。
-
Thread
クラスを継承する(古い) -
Runnable
インターフェースを実装する(新しい)
実際には後述するCompletableFuture
を使用した方が書きやすく便利。ただ、スレッドの原理や仕組みを理解するためにはThread
、Runnable
の理解も重要。
Thread
クラスを継承する
Thread
クラスは、Javaの標準ライブラリ(java.lang
パッケージ)に含まれている。Thread
クラスを利用する場合、Thread
クラスを継承して、非同期的に実行したい処理をオーバーライドしたrun()
メソッド内に記述する必要がある。
public class MyThread extends Thread {
@Override
public void run(){
System.out.println("非同期的に処理を実行します。");
}
}
public class Main {
public static void main(String[] args) {
// スレッドを生成
var thread = new MyThread();
// 生成したスレッド上で、処理を実行
thread.start();
// >> 非同期的に処理を実行します。
}
}
この時、スレッドは以下のような様子になっている。
JavaScriptの非同期処理との違い
JavaScriptはJavaのマルチスレッドと異なり、単一スレッド(シングルスレッド)上で実行され、非同期処理はイベントループと呼ばれる実行モデルによって実現される。
イベントループにおける非同期処理は、コールバック関数としてコールバックキューに格納され、「スタックが空になった」という条件時にスタックに積まれ、実行される。
一方のJavaは、マルチスレッドのプログラミングが可能となっていて、JVMのコンポーネントであるスレッドスケジューラが異なるスレッド間で実行中のスレッド(アクティブスレッド)を切り替える制御を行っている。
そのため、異なるスレッドで実行される処理は、どのような順序で実行されるかがJVMの実装に依存していて、予測することはできない。(開発者がこの制御に影響を与えることはできるが、強制的に特定のスレッドをアクティブスレッドとすることはできない。)マルチスレッドのプログラミングで排他制御が重要な理由はこういった理由によるものである。
スレッドスケジューラ
Javaのマルチスレッド環境における実行中のスレッド(アクティブスレッド)は、JVMのコンポーネントであるスレッドスケジューラが制御している。
正確には、OSのスレッドを、JVMがJavaのスレッドに関連づけることで、JVMによる間接的な制御が行われている。スケジュール機能はOSに依存するが、JVMがOSごとの差異を吸収することで、プラットフォームに依存しない仕組みとなっている。
スレッドには「実行中」、「実行可能」、「ブロック中」の3つの状態が存在し、スレッドの生成直後の状態は実行可能状態であり、生成してすぐに実行中になるわけではない。
実行可能状態のスレッドはスレッドスケジューラによる判断や決定によって、実行中へと昇格する。スレッドが実行中になると、そのスレッドのスタック(コンテキスト)が有効になる。
Thread
クラスのsleep()
、wait()
が実行されると、スレッドはブロック状態になる。また、synchronized
キーワードによって、アクセスしたオブジェクトがロックされていた場合にも、スレッドはブロック状態になる。つまりブロック状態とは、スレッドが実行を一時的に停止した状態を指す。これらの制御がスレッドスケジューラによって行われている。
またコンピュータのコアが1つであれば、同時に二つのスレッドが実行中になることはない。
スレッドスケジューラはJVMのコンポーネントであり、JVMの実装に依存しているため、開発者が直接制御を行うことはできない。そのため、同じマシン上で同じプログラムを動かしたとしても、スレッドスケジューラが全く同じ挙動をすることは保証されない。
スレッドスケジューラの挙動は、開発者(プログラマ)が予測できないことを前提にする
マルチスレッド環境においてはこのことを前提として開発を行う必要がある。
Runnable
インターフェースを実装する
Runnable
インターフェースも、Javaの標準ライブラリであるjava.lang
パッケージに含まれている。
Runnable
インターフェースには、run()
抽象メソッドが定義されている。
Thread
クラスを継承した時と形がよく似ているが、今回はThread
を直接インスタンス化する。
public class MyRunner implements Runnable{
@Override
public void run() {
System.out.println("非同期的に処理を実行します。");
}
}
public class Main {
public static void main(String[] args) {
// スレッドを生成
var thread = new Thread(new MyRunner());
// 生成したスレッド上で、処理を実行
thread.start();
// >> 非同期的に処理を実行します。
}
}
ここで、古い方法と新しい方法の両方で利用したThread
クラスは、Runnable
インターフェースを実装したクラスとなっている。
Threadクラスを継承するでは、Runnable
インターフェースを実装したThread
クラスを継承してさらにオーバーライドしたrun()
メソッドを、ここでは直接Runnable
から実装しているという関係になっている。
匿名クラスを使って記述する
スレッド生成のコードは、匿名クラスを使うことで簡潔に記述できるようになる。
匿名クラスとは
匿名クラスとは、名前のないクラスのインスタンスを生成するための特殊な構文のこと。
普通、Javaのクラスは名前とともに宣言され、宣言された名前とnew
キーワードを使ってインスタンス化が行われる。
new クラス名()
匿名クラスは宣言と同時にインスタンスが生成される。
new インターフェース名() {
ここがクラス宣言部分
インターフェースの実装をここで行う
}
通常インターフェースや抽象クラスを実装する場合、実装用のクラスを宣言する必要がある。ただ、1度しかインスタンス化しないクラスを、わざわざ宣言したくない時などに匿名クラスが有効な手段になる。
public interface MyInterface {
void method();
}
本来はインターフェースを実装するためのクラスを宣言する必要がある(クリックで展開する)
// 匿名クラスを使用することでこのクラス宣言が不要になる
public class MyClass implements MyInterface {
@Override
public void method() {
System.out.println("処理を実行します。");
}
}
public class Main {
public static void main(String[] args){
// インターフェースの名前を利用して、
// インターフェースを実装した、匿名クラスのインスタンスを生成
var instance = new MyInterface(){
@Override
public void method(){
System.out.println("処理を実行します。");
}
};
// 実装した処理を呼び出す
instance.method();
// >> 処理を実行します。
}
}
匿名クラスにはコードが簡潔になるというメリットがある一方で、コンストラクタを持つことができない、継承関係になれないなどの制約がある。
匿名クラスを使って記述する
匿名クラスを利用すると、先ほどのコードは以下のようになる。宣言する必要のあるクラスがそれぞれ一つ減っていることがわかる。
匿名クラスはインターフェースに使用した場合にはインターフェースを「実装(implements
)」したクラスになり、通常のクラスに使用した場合にはそのクラスをスーパークラスとして「継承(extends
)」したサブクラスになる。
Thread
クラスに対して使用したときは継承(extends
)した匿名クラスになり、Runnnable
インターフェースに対して使用したときは実装(implements
)した匿名クラスになる。
public class Main {
public static void main(String[] args) {
// 匿名クラスを使ってスレッドを生成
var thread = new Thread(){
@Override
public void run() {
System.out.println("非同期的に処理を実行します。");
}
};
thread.start();
// >> 非同期的に処理を実行します。
}
}
public class Main {
public static void main(String[] args) {
// 匿名クラスを使ってスレッドを生成
var thread = new Thread(new Runnable(){
@Override
public void run() {
System.out.println("非同期的に処理を実行します。");
}
});
thread.start();
// >> 非同期的に処理を実行します。
}
}
ラムダ式を使って記述する
Runnable
インターフェースは関数型インターフェースでもあるため、ラムダ式を利用することでさらに簡潔にできる。
ラムダ式とは
Java8から導入された関数型インターフェースのインスタンスを簡易な記述で生成するための構文。
ラムダ式を理解するには、関数型インターフェースについて理解する必要がある。
ラムダ式は関数型インターフェースのインスタンスを生成するための構文
関数型インターフェースとは
関数型インターフェースは、抽象メソッドを一つだけ持つという条件を満たすインターフェースのこと。SAM(Single Abstract Method)と呼ばれることもある。
関数型インターフェースは抽象メソッドを1つだけ持つインターフェースのこと
関数型インターフェースは、前述のrun()
メソッドを格納するための型として機能する。run()
メソッドは戻り値を持たず引数も無いメソッドだが、戻り値を持つメソッド、引数を持つメソッドも当然存在する。
関数型インターフェースにおいては、メソッドの「引数」、「戻り値」の組み合わせが型としての表現になる。
標準ライブラリのjava.util.function
パッケージには、引数、戻り値のそれぞれ異なるFunction
、Predicate
、Consumer
といった関数型インターフェースがあらかじめ用意されているため、自分で関数型インターフェースを宣言して作成する機会はあまりないかもしれない。
java.util.functionパッケージの関数型インターフェース
関数型インターフェース | 定義された抽象メソッド | 補足 |
---|---|---|
Supplier<T> |
T get(); |
|
Consumer<T> |
void accept(T t); |
|
BiConsumer<T, U> |
void accept(T t, U u); |
|
Predicate<T> |
bolean test(T t); |
|
BiPredicate<T, U> |
boolean test(T t, U u); |
|
Function<T> |
R apply(T t); |
|
BiFunction<T, U> |
R apply(T t, U u); |
|
UnaryOperator<T> |
T apply(T t); |
|
BinaryOperator<T> |
T apply(T t1, T t2); |
BiFunction<T, U> のT とU が同じパターン。BiFunction<T, U> の特殊系と言う位置づけ。 |
Runnabale |
void run(); |
|
Callable<V> |
V call(); |
また通常、インターフェースにはdefault
メソッドや、Object
クラスのメソッド(toString()
、equals()
など)をオーバーライドしたメソッドも宣言できるため、これらが含まれるインターフェースでは、関数型インターフェースかどうかがわかりづらいことがある。そのような場合には、@FunctioalInterface
アノテーションを使用すると、仮に関数型インターフェースの条件を満たさない宣言をしてしまった場合でも、コンパイルエラーによって知らせてくれるようになる。
// 関数型インターフェース
@FunctionalInterface
public interface MyInterface {
void method();
}
// これも関数型インターフェース
@FunctionalInterface
public interface MyInterface2 {
void method();
default void defaultMethod() {
System.out.println("デフォルトの処理です。");
}
String toString();
}
// これは抽象メソッドが2つあるため関数型インターフェースではない
// @FunctionalInterfaceがコンパイルエラーを起こして知らせてくれる
@FunctionalInterface
public interface MyInterface {
void method1();
void method2();
}
基本構文
ラムダ式は、関数型インターフェースの実装を定義するための構文。ラムダ式を利用することで、関数型インターフェースが持つメソッドの型を「式」によって表現することができる。またメソッドの引数として渡したり受け取ったりすることで、Javascriptのコールバック関数のような機能を実現できる。
() -> {
---処理内容---
}
ラムダ式は、関数型インターフェースの実装として利用される。
関数型インターフェースは実装すべき抽象メソッドを一つしか持たないため、ラムダ式を使用して実装する際には、実装するメソッド名を記述する必要がない。
(個人的に、「ラムダ式は、関数型インターフェースの実装として利用される」の意味が、最初全く理解できなかった。コールバック関数を代入した変数をなぜ()
で実行しないのかがわからなかったが、ここでのラムダ式は、コールバック関数としてのラムダ式ではないと理解したことで納得できた。「=」が何かを代入しているように見えるのでわかりづらいが、ラムダ式はコールバック関数としても利用できるし、関数型インターフェースの実装としても利用できる。「=」を「代入」と捉えると頭が混乱してしまうが、匿名メソッドを「定義」している、と捉えることで納得できた。「コールバック関数を代入している」のではなく、あくまでも、「インターフェースでオーバーライドすべきメソッドを実装している」だけ。つまり、ラムダ式は、関数型インターフェースの実装として利用される。)
// 関数型インターフェース
@FunctionalInterface
public interface MyInterface {
void method();
}
public class Main {
public static void main(String[] args) {
// ラムダ式が関数型インターフェースを実装する
MyInterface instance = () -> {
System.out.println("hello world");
};
instance.method();
// >> hello world
}
}
{}
と return
は省略できる。
ラムダ式の処理が1行で表現できる場合、{}
ブロックとreturn
キーワードは省略することができる。
MyInterface instance = () -> System.out.println("hello world");
引数ありの構文
(引数の型 仮引数) -> {
---処理内容---
}
@FunctionalInterface
public interface MyInterface {
void method(int i);
}
public class Main {
public static void main(String[] args) {
// 引数ありのラムダ式
MyInterface instance = (int index) -> System.out.println(index + " hello world");
instance.method(100);
// >> 100 hello world
}
}
引数の型は省略できる。
ラムダ式の引数の型は、暗黙的に推論されるため、省略できる。
MyInterface instance = (index) -> System.out.println(index + " hello world");
引数が一つの場合、()
が省略できる。
MyInterface instance = index -> System.out.println(index + " hello world");
ただし、引数がない場合の()
は省略できない。
メソッド参照
メソッド参照とは、ラムダ式を簡潔するための構文のことを指す。
メソッドへの参照を表現することができ、主に関数型インターフェースの実装として利用される。3種類の構文がある。
クラス名::静的メソッド名
インスタンス変数::メソッド名
クラス名::new
@FunctionalInterface
public interface MyInterface {
void method();
}
public class MyClass {
// 静的メソッド
public static void staticMethod() {
System.out.println("静的メソッドです。");
}
// インスタンスメソッド
public void instanceMethod() {
System.out.println("インスタンスメソッドです。");
}
// コンストラクタ
public MyClass() {
System.out.println("コンストラクタです。");
}
}
下の3つは全てインターフェースMyInterface
を、ラムダ式を簡略化したメソッド参照によって実装している。
public class Main {
public static void main(String[] args) {
// クラス名::静的メソッド名
// ラムダ式によってここで実装を行うのではなく、既存のメソッドをインターフェースの実装として利用する
MyInterface staticMethodReference = MyClass::staticMethod;
staticMethodReference.method();
// >> 静的メソッドです。
// インスタンス変数::メソッド名
var myClass = new MyClass();
MyInterface instanceMethodReference = myClass::instanceMethod;
instanceMethodReference.method();
// >> コンストラクタです。
// >> インスタンスメソッドです。
// クラス名::new
MyInterface constructorReference = MyClass::new;
constructorReference.method();
// >> コンストラクタです。
}
}
default
メソッドとは
Java8で導入された修飾子。インターフェース内でdefault
修飾子をつけて宣言したメソッドは、実装するクラスでオーバーライドしなくても良い。
public interface MyInterface {
// defaultメソッド
default void method(){
System.out.println("デフォルトの処理です。");
}
}
public class MyClass implements MyInterface {
// defaultメソッドはオーバーライドしなくてもOK(もちろん、オーバーライドしてもOK)
}
public class Main {
public static void main(String[] args) {
var instance = new MyClass();
instance.method();
// >> デフォルトの処理です。
}
}
さらに、default
メソッドはsuper
キーワードを使用することで明示的に呼び出すこともできる。
public interface MyInterface {
// defaultメソッド
default void method(){
System.out.println("デフォルトの処理です。");
}
}
public class MyClass implements MyInterface {
@Override
public void method(){
// defaultメソッドを明示的に呼び出すこともできる
MyInterface.super.method();
System.out.println("実装クラスの処理です。");
}
}
public class Main {
public static void main(String[] args) {
var instance = new MyClass();
instance.method();
// >> デフォルトの処理です。
// >> 実装クラスの処理です。
}
}
ラムダ式を使ってRunnable
を実装する
Runnable
インターフェースは関数型インターフェースでもあるため、ラムダ式を利用することができる。
public class Main {
public static void main(String[] args) {
// ラムダ式を使ってスレッドを生成
var thread = new Thread(() -> System.out.println("非同期的に処理を実行します。"));
thread.start();
// >> 非同期的に処理を実行します。
}
}
スレッドプール
スレッドはプロセスよりも軽量であるものの、数が増えればパフォーマンスに影響を与えてしまうことがある。これを防ぐためにスレッドプールという仕組みがある。
スレッドプールでは使用するスレッドがあらかじめ複数用意される。スレッドは必要になったときに取り出され、使用後は再びプールに戻される。こうすることで、プールされたスレッドが何度も再利用できるようになっている。
スレッドプールを利用すると、スレッドの新規生成による負荷が極力抑えられるため、スレッド生成によるオーバーヘッドが減少し、結果としてパフォーマンスを向上させることができる。
スレッドプールの生成
スレッドプールを生成するには、java.util.concurrent
パッケージに含まれるExecutors
クラスを利用する。
Executors
クラスには、同じくjava.util.concurrent
パッケージのインターフェースExecutor
、ExcutorService
、ScheduledExcutorService
を実装したクラスのインスタンスを提供するための静的メソッド(ファクトリメソッド)が用意されている。
Executor
はインターフェースで、Executors
はクラス。これらの関係性を把握しておくと各種メソッドなどがどのクラス、インターフェースで定義されているかがわかり、頭の中で体系立てて整理することができる(と個人的に思っている)。
Executors.newFixedThreadPool()
指定された数のスレッドを持つ固定スレッドプールを生成することができる。
public static ExecutorService newFixedThreadPool(int nThreads)
ExecutorService executor = Executors.newFixedThreadPool(5);
Executors
クラスの静的メソッド`newFixedThreadPool()
Executors.newSingleThreadExecutor()
1つのスレッド(シングルスレッド)でタスクを順次実行するスレッドプールを生成することができる。
ExecutorService executor = Executors.newSingleThreadExecutor();
Executors.newCachedThreadPool()
過去に作成されたスレッドを再利用するキャッシュ型スレッドプールを作成することができる。
ExecutorService executor = Executors.newCachedThreadPool();
Executors.newScheduledThreadPool()
定期的に、または遅延して実行されるタスクを事前にスケジューリングできるスレッドプールを生成することができる。
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(5);
Executors.newSingleThreadScheduledExecutor()
1つのスレッドでタスクを順次実行するスケジューリングスレッドプールを作成することができる。
ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
非同期処理の実行
生成したスレッドプールに対してタスクを渡すことで、非同期処理を実行することができる。ここでいうタスクとはRunnable
インターフェースの実装のことを指す。
タスクを渡すためのメソッドには execute()
と submit()
がある。
execute()
void execute(Runnable command)
Executor
インターフェースで定義された非同期処理を実行するためのメソッド。
Thread(new(RunnableTask())).start()
に相当する。
ExecutorService executor = Executors.newFixedThreadPool(10);
Runnable task = () -> System.out.println("hello world");
executor.execute(task);
後述するsubmit()
と異なり、戻り値を持たない。
またexecute()
を使用した場合、生成したスレッド上で実行させた処理中に発生した例外はスレッドプール内で処理されてしまうため、キャッチすることができない。一方、submit()
を使用した場合はFuture
を介して例外をキャッチすることができる。
execute()
は戻り値を持たない
submit()
<T> Future<T> submit(Callable<T> task)
ExecutorServiceインターフェース
で定義された非同期処理実行のためのメソッド。非同期処理の結果をFuture
オブジェクトでラップした状態(Future<T>
)で返す。
ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<String> task = () -> "hello world";
Future<String> future = executor.submit(task);
非同期処理に戻り値がある場合のタスクは、Runnable
インターフェースではなく Callable<T>
インターフェースを使用する。Callable<T>
インターフェースは関数型インターフェースの一つである。execute()
が例外処理を行えないのは、Runnable
とCallable
の違いによるものでもある。
Runnable
を渡して取得したFuture
は、非同期処理が完了した時get()
がnull
を返す。
Future<?> submit(Runnable task)
submit()
は戻り値を持つ
Future
Future
インフターフェースは非同期処理のハンドリングをしやすくするためのものであり、処理の成否や処理結果を取得するために利用される。
Future<T>
には非同期処理の結果を取得することができるget()
が定義されている。
V get() throws InterruptedException, ExecutionException
ExecutorService executor = Executors.newFixedThreadPool(10);
Callable<String> task = () -> "hello world";
Future<String> future = executor.submit(task);
try {
String result = future.get();
System.out.println(result);
// >> hello world
} catch (InterruptedException e) {
} catch (ExecutionException e) {
}
get()
メソッドは非同期処理の結果を取得するメソッドだが、非同期処理の完了を待つためにも使用される。非同期処理を待つメソッドには、他にThread
インターフェースのjoin()
がある。どちらも非同期処理の完了を待つメソッドだが、get()
がInterruptedException
やExecutionException
をスローする可能性があるのに対して、join()
はそのようなチェック例外(try-catch
が必要な例外)をスローしないという違いがある。
join()
を使う際には、非同期処理が正常に完了することが保証された場合に使用し、非同期処理が例外をスローしたり、キャンセルされる場合などの例外処理が必要な場合はget()
を使う。
Thread
のjoin()
は例外処理が行えない
Future
のget()
は例外処理を行える
非同期処理が戻り値を返さない例
public class Task implements Runnable {
private int id;
public Task(int id) {
this.id = id;
}
@Override
public void run() {
System.out.println("タスクを実行します" + " id: " + id);
}
}
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
// 10個のスレッドから構成されるスレッドプールを生成
ExecutorService executor = Executors.newFixedThreadPool(10);
// タスクをスレッドプールに送信
for (int i = 0; i < 10; i++) {
Runnable task = new Task(i);
// 戻り値がない場合、executor.execute()としてもOK
executor.submit(task);
}
// >> タスクを実行します id: 5
// >> タスクを実行します id: 8
// >> タスクを実行します id: 3
// >> タスクを実行します id: 9
// >> タスクを実行します id: 0
// >> タスクを実行します id: 2
// >> タスクを実行します id: 6
// >> タスクを実行します id: 4
// >> タスクを実行します id: 7
// >> タスクを実行します id: 1
// スレッドプールをシャットダウン(新しいタスクの送信を受け入れ停止し、既存のタスク実行が終了したらスレッドプールを破棄する)
executor.shutdown();
}
}
非同期処理が戻り値を返す例
import java.util.concurrent.Callable;
// 戻り値がある場合、Runnable ではなく Callble<T> を使用する(Tは戻り値の型)
public class Task implements Callable<Integer> {
private int id;
public Task(int id) {
this.id = id;
}
// Callble では call() をオーバーライドする
@Override
public Integer call() throws Exception {
return id;
}
}
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) {
// 10個のスレッドから構成されるスレッドプールを生成
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
Callable task = new Task(i);
// タスクをスレッドプールに送信
Future<Integer> future = executor.submit(task);
try {
// 結果を取得する
Integer taskId = future.get();
System.out.println("タスクを実行します" + " id: " + taskId);
} catch (Exception e) {
// 例外処理
}
}
// >> タスクを実行します id: 0
// >> タスクを実行します id: 1
// >> タスクを実行します id: 2
// >> タスクを実行します id: 3
// >> タスクを実行します id: 4
// >> タスクを実行します id: 5
// >> タスクを実行します id: 6
// >> タスクを実行します id: 7
// >> タスクを実行します id: 8
// >> タスクを実行します id: 9
// スレッドプールをシャットダウン(新しいタスクの送信を受け入れない)
executor.shutdown();
}
}
非同期処理が戻り値を返す例(ラムダ式)
ラムダ式を使用してCallable<T>
を実装した場合、戻り値はsubmit()
の第2引数に渡すことで、Future<T>
オグジェクトから取得する事ができる。
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
// ラムダ式を使ってCallabel<T>を実装する
Future<Integer> future = executor.submit(() -> {
// 処理
}, i); // submit()の第2引数 i が戻り値になる
try {
// ここで i を取得している
Integer taskId = future.get();
System.out.println("タスクを実行します" + " id: " + taskId);
} catch (Exception e) {
// 例外処理
}
}
// >> タスクを実行します id: 0
// >> タスクを実行します id: 1
// >> タスクを実行します id: 2
// >> タスクを実行します id: 3
// >> タスクを実行します id: 4
// >> タスクを実行します id: 5
// >> タスクを実行します id: 6
// >> タスクを実行します id: 7
// >> タスクを実行します id: 8
// >> タスクを実行します id: 9
// スレッドプールをシャットダウン(新しいタスクの送信を受け入れない)
executor.shutdown();
}
}
複数の非同期処理のハンドリング
CompletableFuture
非同期処理を扱うときに、
ある非同期処理(A)が成功したら、その実行結果(A)を利用して、また別の処理(B)を実行したい
という場合がある。
複数の同期処理、非同期処理の実行順を制御したい場合には、CompletableFuture
を利用する。
CompletableFuture
はJava8以降のjava.util.concurrent
パッケージに含まれる。
まず、最初に実行する非同期処理(コールバック関数)をCompletableFuture
クラスの静的メソッドCompletableFuture.supplyAsync()
に渡す必要がある。
ここでのポイントは、Runnable
やCallable
を使用したときのように、開発者が明示的にスレッドを生成したりしなくても非同期処理が実現できること。
CompletableFuture
を使うと、スレッドの生成処理を記述しなくて良い
CompletableFuture.supplyAsync()
に渡したコールバック関数は、JavaのForkJoinPool
と呼ばれるデフォルトのスレッドプールが管理してくれる。
非同期処理の管理、スレッドの再利用、効率的なスケジューリングなどをForkJoinPool
が行ってくれることにより、開発者は手動でスレッドの生成や管理をする必要がなくなり、より直感的で簡単な非同期プログラミングが可能になる。
直列実行
CompletableFuture.supplyAsync()
はCompletableFuture<T>
型のオブジェクトを返却する。
返却されたオブジェクトには、thenAccept()
やthenAcceptAsync()
といったメソッドが用意されているため、これらのメソッドにさらにコールバック関数を渡すことで、後続の処理を登録していくことができる。
thenAccept()
は同期処理をコールバック関数として登録する際に使用するのに対して、thenAcceptAsync()
は非同期処理をコールバック関数をして登録する際に使用する。
thenAccept()
に登録した処理は、CompletableFuture.supplyAsync()
に登録した非同期処理と同じスレッド上で実行されるのに対して、thenAcceptAsync()
では、ForkJoinPool
によって、プラットフォームや環境によっては新しいスレッドがスレッドプールから割り当てられる可能性がある。そのため、非同期処理の後続に、さらに別の非同期処理が続く場合にはthenAcceptAsync
を使用する。
thenAccept()
やthenAcceptAsync()
が戻り値を持たないのに対して、処理をさらにつなげたい場合には、CompletableFuture<T>
型を戻り値に持つthenApply()
、thenApplyAsync()
を使用する。イメージとしては下の図のような形。
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> {
System.out.println("Task1を実行します。");
return "[Task1]";
}).thenApplyAsync((result) -> {
System.out.println("Task2を実行します。");
return result + "[Task2]";
}).thenApplyAsync((result) -> {
System.out.println("Task3を実行します。");
return result + "[Task3]";
}).thenAcceptAsync((result) -> {
System.out.println("完了したタスク:" + result);
});
// メインスレッドが終了しないように、非同期処理の完了を待つ
future.join();
// >> Task1を実行します。
// >> Task2を実行します。
// >> Task3を実行します。
// >> 完了したタスク:[Task1][Task2][Task3]
}
}
並列実行
CompletableFuture
には、複数の非同期処理を並列で実行するためのメソッドallOf()
がある。allOf()
を使用することで、複数の非同期処理が全て完了したときの処理を登録することができる。
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("Task1を実行します。");
return "[Task1]";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Task2を実行します。");
return "[Task2]";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("Task3を実行します。");
return "[Task3]";
});
CompletableFuture.allOf(future1, future2, future3);
// 全ての非同期処理の完了を待つ
futures.join();
String result1 = future1.join();
String result2 = future2.join();
String result3 = future3.join();
System.out.println("完了したタスク:" + result1 + result2 + result3);
// ⭐️並列実行なので、タスクの実行順は毎回変わる
// >> Task1を実行します。
// >> Task3を実行します。
// >> Task2を実行します。
// >> 完了したタスク:[Task1][Task2][Task3]
}
}
allOf()
はCompletableFuture<T>
を返すので、さらにその後に直列処理をつなげることもできる。下の図のようなイメージ。
import java.util.concurrent.CompletableFuture;
public class Main {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
System.out.println("Task1を実行します。");
return "[Task1]";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
System.out.println("Task2を実行します。");
return "[Task2]";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
System.out.println("Task3を実行します。");
return "[Task3]";
});
CompletableFuture.allOf(future1, future2, future3)
.whenCompleteAsync((voidResult, ex) -> {
if (ex == null) {
System.out.println("Task1、Task2、Tsk3が完了しました。");
} else {
// 例外処理
}
}).thenApplyAsync((result) -> {
System.out.println("後続処理を実行します。");
return "[後続処理]";
}).thenAcceptAsync((result) -> {
System.out.println("最後の処理を実行します。");
});
// ⭐️並列実行なので、タスクの実行順は毎回変わる
// >> Task2を実行します。
// >> Task3を実行します。
// >> Task1を実行します。
// >> Task1、Task2、Tsk3が完了しました。
// >> 後続処理を実行します。
// >> 最後の処理を実行します。
}
}
同期プリミティブ
マルチスレッドプログラミングにおいて、複数のスレッド間でデータの一貫性を保持したり、または実行のタイミングを同期したりするための基本的なツールや仕組みのことを同期プリミティブと言う。
スレッドは一つのプロセス内で複数存在し、それぞれのスレッドはプロセスに割り当てられたメモリ領域を共有するため、一つのリソースへ複数のスレッドが同時にアクセスしたり、値を更新しようとするとデータに不整合が発生することがある。この複数のスレッドが同時に一つの対象へアクセスを行うことを競合と言う。
セマフォ
Semaphore
共有リソースに対するアクセス権を、複数のスレッド間で一定数に制限、管理する仕組みのことをセマフォと言う。
データベースへの接続やファイルへのアクセスなど、限られたリソースに対する同時アクセス数を制限したい場合に利用される。
ここでセマフォ内部の機能について着目してみると、セマフォはリソースへのアクセス「権利数」を管理するカウンターとしての役目を果たしている。
リソースへのアクセス権には最大許可数があり、全てのスレッドがアクセス権を取得していない状態であれば、「取得可能数 = 最大許可数 」となる。
アクセス権が1つ取得された場合、セマフォは取得可能数のカウンターを1だけ減らす。反対にリソースへのアクセス権が1つ解放された場合、セマフォはカウンターを1だけ増やす。
このようなカウンター機能を内部に持つセマフォは、複数のスレッドからアクセス権の取得要求を受け付け、取得可能であれば権利を付与し、付与可能なアクセス権が無ければスレッドを待機させることができる。
Semaphore
java.util.concurrent.Semaphore
クラスを利用することでセマフォを実装することができる。
Semaphore
クラスの初期化では、セマフォが管理するアクセス権の最大許可数を指定する必要がある。
Semaphore semaphore = new Semaphore(最大許可数);
共有リソースへのアクセス権要求はacquire()
によって行われる。
semaphore.acquire()
// ⬇️ ここからクリティカルセクション ⬇️
クリティカルセクションにて、共有リソースを使った処理が終了したら、release()
によって使用していたアクセス権を解放することができる。
// ⬆️ ここまでクリティカルセクション ⬆️
semaphore.release();
このSemaphore
クラスのメソッドを使用すると、セマフォ機能を持つMySemaphore
を作成することができる。
import java.util.concurrent.Semaphore;
public class MySemaphore {
// 最大アクセス権許可数: 3
private final Semaphore semaphore = new Semaphore(3);
public void accessResourceAndDoSomething() throws InterruptedException {
String threadName = Thread.currentThread().getName();
// アクセス権をリクエスト(取得可能数が無ければここで待機)
System.out.println("【リクエスト】" + threadName + " (現在の取得可能数: " + semaphore.availablePermits() + ")");
semaphore.acquire();
// アクセス権が取得される
System.out.println("【取得】" + threadName + " (現在の取得可能数: " + semaphore.availablePermits() + ")");
try {
// クリティカルセクション
System.out.println(threadName + " :共有リソースを使った処理を実行(クリティカルセクション)");
// 擬似的にリソースを使った処理を挿入(これがないとクリティカルセクションが早く終了しすぎてログが見づらくなるので)
Thread.sleep(1000);
} finally {
// アクセス権を解放する
semaphore.release();
}
}
}
public class Main {
public static void main(String[] args) {
// セマフォは1つだけ用意する
final MySemaphore mySemaphore = new MySemaphore();
Runnable task = () -> {
try {
mySemaphore.accessResourceAndDoSomething();
} catch (InterruptedException e) {
}
};
// スレッドは10個用意する
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
Thread thread3 = new Thread(task);
Thread thread4 = new Thread(task);
Thread thread5 = new Thread(task);
Thread thread6 = new Thread(task);
Thread thread7 = new Thread(task);
Thread thread8 = new Thread(task);
Thread thread9 = new Thread(task);
Thread thread10 = new Thread(task);
thread1.start();
thread2.start();
thread3.start();
thread4.start();
thread5.start();
thread6.start();
thread7.start();
thread8.start();
thread9.start();
thread10.start();
// >> 【リクエスト】Thread-5 (現在の取得可能数: 3)
// >> 【リクエスト】Thread-8 (現在の取得可能数: 3)
// >> 【リクエスト】Thread-0 (現在の取得可能数: 3)
// >> 【リクエスト】Thread-9 (現在の取得可能数: 3)
// >> 【リクエスト】Thread-4 (現在の取得可能数: 3)
// >> 【リクエスト】Thread-3 (現在の取得可能数: 3)
// >> 【リクエスト】Thread-1 (現在の取得可能数: 3)
// >> 【リクエスト】Thread-7 (現在の取得可能数: 3)
// >> 【リクエスト】Thread-2 (現在の取得可能数: 3)
// >> 【リクエスト】Thread-6 (現在の取得可能数: 3)
// 以降の出力結果はJVMのスレッドスケジューラに依存するため実行する度に変わる
// (セマフォが同時に3つのスレッドにしかアクセス権を付与しないというのは毎回同じ)
// >> 【取得】Thread-1 (現在の取得可能数: 0)
// >> 【取得】Thread-3 (現在の取得可能数: 1)
// >> 【取得】Thread-5 (現在の取得可能数: 2)
// >> Thread-1 :共有リソースを使った処理を実行(クリティカルセクション)
// >> Thread-5 :共有リソースを使った処理を実行(クリティカルセクション)
// >> Thread-3 :共有リソースを使った処理を実行(クリティカルセクション)
// >> 【取得】Thread-6 (現在の取得可能数: 0)
// >> 【取得】Thread-7 (現在の取得可能数: 1)
// >> 【取得】Thread-4 (現在の取得可能数: 2)
// >> Thread-7 :共有リソースを使った処理を実行(クリティカルセクション)
// >> Thread-6 :共有リソースを使った処理を実行(クリティカルセクション)
// >> Thread-4 :共有リソースを使った処理を実行(クリティカルセクション)
// >> 【取得】Thread-8 (現在の取得可能数: 0)
// >> 【取得】Thread-0 (現在の取得可能数: 0)
// >> 【取得】Thread-9 (現在の取得可能数: 1)
// >> Thread-0 :共有リソースを使った処理を実行(クリティカルセクション)
// >> Thread-8 :共有リソースを使った処理を実行(クリティカルセクション)
// >> Thread-9 :共有リソースを使った処理を実行(クリティカルセクション)
// >> 【取得】Thread-2 (現在の取得可能数: 0)
// >> Thread-2 :共有リソースを使った処理を実行(クリティカルセクション)
}
}
出力結果を見てみると、クリティカルセクションに侵入可能なスレッド数がMySemaphore
によって3つに制限されていることがわかる。
取得可能数が
// >> 【取得】Thread-8 (現在の取得可能数: 0)
// >> 【取得】Thread-0 (現在の取得可能数: 0)
// >> 【取得】Thread-9 (現在の取得可能数: 1)
のようになっているのは、JVMが高速にアクティブスレッドを高速に切り替えることで並行処理を行い、Thread-7
、Thread-6
、Thread-4
のスレッドのアクセス権解放の直後、ログ出力前にアクティブスレッドが切り替わり、Thread-8
、Thread-0
、Thread-9
がアクセス権を取得しているためと思われる。
ミューテックス(排他制御)
Mutual Exclusion
同時に1つのスレッドのみが共有リソースへのアクセス権を取得できるようにした排他制御の仕組みをミューテクスと言う。
排他制御=Exlusive Control
1つしか用意されていないアクセス権をロックと表現し、またそのアクセス権を取得することをロックの獲得と言う。
共有リソースへのアクセス権取得はロックと表現される
排他制御は競合を防ぐ一方で、対象範囲をむやみに広げるとパフォーマンスに悪影響を与えるため、安全性とパフォーマンスとのバランスを考えながら使用する必要がある。
ReentrantLock
java.util.concurrent.locks
パッケージには、複数のスレッドからリソースへのアクセスを排他制御することが可能なReentrantLock
クラスがある。
ReentrantLock
クラスはLock
インターフェースを実装しているため、lock()
メソッドが定義されている。これにより共有リソースへのロックを行うことができる。
Lock lock = new ReentrantLock();
lock.lock();
取得したロックは必ずunlock()
によって解放する必要がある。ロック解放にはtry-finally
構文を利用する。
lock.unlock();
1度unlock()
を実行したスレッドが、再びロックを取得することもできる。
reentrant=再入可能
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyLock {
private final Lock lock = new ReentrantLock();
public void method() {
// ロックの取得
lock.lock();
try {
// クリティカルセクション:ロックされた領域での処理
} finally {
// ロックの解放
lock.unlock();
}
}
}
ロックを取得できなかった場合の処理、ロックを指定時間内に取得できなかった場合の処理なども設定できる。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyLock {
private final Lock lock = new ReentrantLock();
public void method() {
if (lock.tryLock()) {
try {
// ロックに成功した場合の処理
} finally {
lock.unlock();
}
} else {
// ロックに失敗した場合の処理
}
}
}
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class MyLock {
private final Lock lock = new ReentrantLock();
public void method() throws InterruptedException {
if (lock.tryLock(500, TimeUnit.MILLISECONDS)) {
try {
// 指定時間内にロックが取得できた場合の処理
} finally {
lock.unlock();
}
} else {
// 指定時間内にロックが取得できなかった場合の処理
}
}
}
tryLock()
がthrow
する可能性のあるInterruptedException
は、スレッドが中断されたときに発生する。
具体的には、スレッドがsleep()
、wait()
、join()
などによってブロック状態にあるとき、外部からそのスレッドに対してinterrupt()
が呼ばれると、スレッドは中断状態になり、InterruptedException
が発生する。
ReentrantReadWriteLock
java.util.concurrent.locks.ReentrantReadWriteLock
を使用すると、共有リソースに対して読み取り専用のロック(Read Lock)と書き込み専用のロック(Write Lock)を分けて管理することができる。
Read Lock は、複数のスレッドが同時に取得できる。
Write Lock は、1つのスレッドのみが取得することができ、また Write Lock を取得したスレッドはリソースへのアクセスを独占する(排他的)。つまり、Write Lock が取得されると、読み取り、書き込みを行おうとするスレッドは待機させられることになる。
このようにReadWriteLock
は、Write Lock がロックを独占することから読み取りが頻繁に行われるが、書き込み頻度は少ないと言うシチュエーションに適している。
ReentrantReadWriteLock
クラスはReadWriteLock
インターフェースを実装しているため、readLock()
、writeLock()
メソッドが定義されている。これらメソッドはいずれもLock
インターフェース型を返却する。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class MyLock {
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
private int value = 0;
public int readValue() {
// Read Lock の取得
Lock lock = rwLock.readLock();
lock.lock();
try {
// クリティカルセクション:ロックされた領域での処理
return value;
} finally {
// ロックの解放
lock.unlock();
}
}
public void writeValue(int value) {
// Write Lock の取得
Lock lock = rwLock.writeLock();
lock.lock();
try {
// クリティカルセクション:ロックされた領域での処理
this.value = value;
} finally {
// ロックの解放
lock.unlock();
}
}
}
デッドロック
デッドロックとは、以下の状態に陥ることを指す。
-
A
、B
という2つのリソースが存在する - スレッドXは、
A → B
という順番でリソースに対するロックを獲得する - スレッドYは、
B → A
という順番でリソースに対するロックを獲得する
この時スレッドXがA
のロックを獲得した直後に、スレッドYがB
に対するロックを獲得したとする。
この状況に陥ると、スレッドXとスレッドYがお互いのロック解除を永遠に待つこととなる。この状態をデッドロックという。
ReentrantLock
、ReentrantReadWriteLock
を使用する場合、デッドロックに気をつける必要がある。
モニター
ミューテックスの機能に加えて、スレッド間の通信や同期をするための高レベルの同期プリミティブとしてモニターがある。
モニターは、1つ以上のミューテックスと条件変数から構成される。モニター内部のクリティカルセクションは、同時に1つのスレッドしか実行できない(ミューテックス)。
またモニター内ではロックの取得と解放が暗黙的に行われる。
synchronized
Javaのモニターはオブジェクト単位で提供され、オブジェクトのsynchronized
メソッドやsynchronized
ブロックを使うことによって、スレッドはモニターを取得できる。
「モニターを取得する」とは、「オブジェクトに対する排他的なロックを取得する」ことを意味し、具体的な効果としては、そのオブジェクトに対する同時アクセスを一つのスレッドに限定することができる。
synchronized
によってモニターは取得できる。
モニターが取得されると、暗黙的なロックが行われる。
モニターを取得したスレッドだけが、synchronized
ブロック内のコードを実行することができる。
反対に、synchronized
ブロックを抜けるときに、モニターは解放される。
モニターを取得している間に実行されるコード範囲をクリティカルセクションという。クリティカルセクション内のコードは、同時に一つのスレッドしか実行することができない。
synchronized
修飾子
synchronized void method() {
// クリティカルセクション:ロックされた領域での処理
}
メソッドに対して修飾子として使用した場合、取得できるのはthis
オブジェクトに対するモニターになる。
public class MyClass {
// synchronizedMethod()が呼ばれると、オブジェクト(this)のモニター(ロック)がスレッドによって取得される
public synchronized void synchronizedMethod() {
// クリティカルセクション
} // ブロックを抜けると、モニターが解放される
}
ここで取得されるのはthis
のモニターであるため、インスタンスをまたがる排他制御は行われないないことに注意(MyClass
からインスタンスAとBが作成されている時、AとBの間では排他制御が行われない)。
synchronized
ブロック
取得したいモニターが自身のオブジェクト(this
)ではない場合、synchronized
ブロックを使用して取得する。
オブジェクトに対して使用する際、Javaでは慣例的にロック対象のオブジェクトの変数名をlock
にすることが多い。
synchronized(ロック対象のオブジェクト) {
// クリティカルセクション:ロックされた領域での処理
}
public class MyClass {
// モニターを取得したいオブジェクト(MyClass(this)ではない点に注目)
private final Object lock = new Object();
public void method() {
// このブロックに入るときに、lockオブジェクトのモニター(ロック)が取得される
synchronized (lock) {
// クリティカルセクション
} // ブロックを抜けると、モニターが解放される
}
}
synchronized
の利用
synchronized
を使用することで、共有リソースに対する複数スレッドからのアクセスを簡単に制御することができる。
public class Counter {
private int count = 0;
public synchronized void increment() {
count++;
}
public synchronized int getCount() {
return count;
}
}
条件変数
Condition Variable
スレッドが特定の条件を満たすまで待機したり、条件が満たされたことを他のスレッドに通知するための仕組みを条件変数と言う。
(個人的に初めここがよく理解できなかった。ある「変数」のことを条件変数と言っているのかと思っていたが、そうではなく、仕組み全体を条件変数と呼んでいる。)
java.lang.Object
クラスのwait()
、notify()
、notifyAll()
メソッドから構成される。
wait()
wait()
は、スレッドがモニターが取得した状態で実行されなればならない。モニターが取得されない状態でwait()
が実行されると、IllegalMonitorStateException
が発生する。
スレッドからwait()
が実行されると、スレッドは一時的にオブジェクトのモニターを解放し、指定された条件が満たされるまでスレッドをその場で待機させる。
スレッドは再びモニターのロックを取得するまで待機し続け、ロックが再取得された時に待機を解除する。
// 条件(ture=達成、 false=未達成)
boolean condition;
// モニターを取得
synchronized(lock) {
// 条件を確認
while (!condition) {
// 条件未達成なら、スレッドはモニターを解放し、条件達成まで待機する
lock.wait();
}
// 条件が達成されると、再びモニターを取得してスレッドの待機を解除する
}
notify()
wait()
によって待機しているスレッドのうち、1つのスレッドを再開させる。どのスレッドを再開させるかはJVMによって決定される。
モニターを取得したスレッドから実行しないとIllegalMonitorStateException
が発生する。
synchronized(lock) {
condition = true;
lock.notify();
}
notifyAll()
wait()
によって待機している全てのスレッドを再開させる。
モニターを取得したスレッドから実行しないとIllegalMonitorStateException
が発生する。
synchronized(lock) {
condition = true;
lock.notifyAll();
}
条件変数を実装する
public class MyConditionVariable {
private final Object lock = new Object();
// 条件(true:達成 false:未達成)
private boolean condition = false;
// 条件達成時のみ処理を実行し、条件未達時はスレッドを待機させる
public void awaitCondition() throws InterruptedException {
synchronized (lock) {
String threadName = Thread.currentThread().getName();
while (!condition) {
System.out.println(threadName + ": 条件未達成のため、待機します...");
lock.wait();
}
}
}
// 条件達成を待機中のうちの1つのスレッドに知らせる
public void signalCondition() {
synchronized (lock) {
condition = true;
lock.notify(); // 待機中のスレッドに通知
System.out.println(Thread.currentThread().getName() + ": 条件が達成されたため、待機中のスレッドがいれば通知します");
}
}
// 条件達成を待機中の全てのスレッドに通知する
public void signalConditionAll() {
synchronized (lock) {
condition = true;
System.out.println(Thread.currentThread().getName() + ": 条件が達成されたため、待機中のスレッドがいれば通知します");
lock.notifyAll(); // 待機中のスレッドに通知
}
}
}
public class Main {
public static void main(String[] args) {
MyConditionVariable myConditionVariable = new MyConditionVariable();
Runnable waitAndDoTask = () -> {
String threadName = Thread.currentThread().getName();
try {
System.out.println(threadName + ": 条件が達成されていれば、処理を実行したいです");
myConditionVariable.awaitCondition();
} catch (InterruptedException e) {
}
// 条件が満たされた後の処理
System.out.println(threadName + ": 条件が達成されたようなので、待機を解除し、処理を実行します");
};
Thread waitThread1 = new Thread(waitAndDoTask);
Thread waitThread2 = new Thread(waitAndDoTask);
waitThread1.start();
waitThread2.start();
try {
// ログの出力をわかりやすくするため、メインスレッドを1秒間待機させる
Thread.sleep(1000); // 3000ミリ秒 = 3秒
} catch (InterruptedException e) {
}
Runnable signalTask = () -> {
try {
System.out.println(Thread.currentThread().getName() + ": 条件達成のための処理をこれから実行します!(2秒かかる)");
// 条件達成までにかかる処理時間
Thread.sleep(2000);
myConditionVariable.signalConditionAll();
} catch (InterruptedException e) {
}
};
Thread signalThread = new Thread(signalTask);
signalThread.start();
// >> Thread-0: 条件が達成されていれば、処理を実行したいです
// >> Thread-1: 条件が達成されていれば、処理を実行したいです
// >> Thread-0: 条件未達成のため、待機します...
// >> Thread-1: 条件未達成のため、待機します...
// >> Thread-2: 条件達成のための処理をこれから実行します!(2秒かかる)
// >> Thread-2: 条件が達成されたため、待機中のスレッドがいれば通知します
// >> Thread-1: 条件が達成されたようなので、待機を解除し、処理を実行します
// >> Thread-0: 条件が達成されたようなので、待機を解除し、処理を実行します
}
}
Condition
より高機能な条件変数の機能を提供するものに、java.util.concurrent.locks.Condition
クラスがある。
Condition
クラスはLock
から取得することができ、2つは組み合わせて利用する。
Lock lock = new ReentrantLock();
Condition condition = lock.newCondition();
Condition
とLock
の組み合わせにより条件変数を実装すると、synchronized
では開発者が意識することのなかったロックの取得と解放を、より低級な記述によってカスタマイズしながら実装することができる。synchronized
の役割をLock
が代わりに果たし、Object
の待機、通知機能をCondition
が代わりに果たす。
await()
Object
のwait()
に相当するメソッド。
await()
を実行したCondition
がsignal()
またはsignalAll()
により signal を発信するまでスレッドを待機させる。
signal()
Object
のnotify()
に相当するメソッド。
ロックを取得した状態で実行する必要がない。
signalAll()
Object
のnotifyAll()
に相当するメソッド。
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
public class MyConditionVariable {
private final Lock lock = new ReentrantLock();
// Lock と紐づく Condition
private final Condition condition = lock.newCondition();
private boolean conditionMet = false; // met: 「meet=会う、(条件を)満たす」の過去形
public void awaitCondition() throws InterruptedException {
lock.lock();
try {
while (!conditionMet) {
// 条件達成までここで待機
condition.await();
}
// 条件達成時の処理
} finally {
lock.unlock();
}
}
public void signalCondition() {
lock.lock();
try {
conditionMet = true; // 条件達成
// 1つの待機中のスレッドに通知
condition.signal();
} finally {
lock.unlock();
}
}
public void signalAllConditions() {
lock.lock();
try {
conditionMet = true; // 条件達成
// すべての待機中のスレッドに通知
condition.signalAll();
} finally {
lock.unlock();
}
}
}
生産者/消費者問題
Condition
オブジェクトは一つのロックに対して複数の条件を設定することができる。
条件変数が複数の条件を管理する例として、生産者/消費者問題(Producer–consumer problem)と呼ばれる古典的な問題がある。
生産者/消費者問題では、キューに対してデータを生成して格納するスレッド(生産者、Producer)とキューからデータを取り出して利用するスレッド(消費者、Consumer)について考える。
キューには容量があるので、もし空きが無ければ生産者はデータを入れることができない。このとき生産者スレッドは空きができるまで待機することになるため、Condition
が管理する条件は「キューに空きができること」になる。条件が達成されたとき、待機していたスレッドは再開する。
一方、消費者はキューにデータが存在しない場合、データを取り出すことができない。生産者と同様に、消費者のスレッドもデータがキューに格納されるまで待機する。Condition
が管理する条件は「キューにデータが格納されていること」になる。
Condition
を利用することで、このような複数の条件に対応する待機スレッドを、条件ごとに管理することができる。
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerConsumer {
// キュー
private final Queue<Integer> queue = new LinkedList<>();
// キューの容量
private final int MAX_SIZE = 10;
private final Lock lock = new ReentrantLock();
// 一つの Lock に複数の Condition が紐づいている
// 「キューが満杯ではない」という条件
private final Condition notFull = lock.newCondition();
// 「キューが空ではない」という条件
private final Condition notEmpty = lock.newCondition();
// 生産者スレッド
public void produce() throws InterruptedException {
int value = 0;
// 生産者は生産を続ける
while (true) {
lock.lock();
try {
// 条件 = 「キューが満杯ではない」
while (queue.size() == MAX_SIZE) {
// 条件未達成の場合、Condition によって一時的にロックが解放され、スレッドは待機させられる
notFull.await();
}
// 条件が達成されると、Condition によってロックが再取得され、スレッドの待機は解除される
queue.add(value);
System.out.println("生産: " + value);
// 生産処理に1秒かかるようにする
Thread.sleep(1000);
// 次回生産用の値
value++;
// キューが空ではなくなったため、signal を発信
notEmpty.signal();
} finally {
lock.unlock();
}
}
}
// 消費者スレッド
public void consume() throws InterruptedException {
// 消費者は消費を続ける
while (true) {
lock.lock();
try {
// 条件 = 「キューが空ではない」
while (queue.isEmpty()) {
// 条件未達成の場合、Condition によって一時的にロックが解放され、スレッドは待機させられる
notEmpty.await();
}
// 条件が達成されると、Condition によってロックが再取得され、スレッドの待機は解除される
int value = queue.poll();
System.out.println("消費: " + value);
// 生産処理に1秒かかるようにする
Thread.sleep(1000);
// キューが満杯ではなくなったため、signal を発信
notFull.signal();
} finally {
lock.unlock();
}
}
}
}
public class Main {
public static void main(String[] args) {
ProducerConsumer pc = new ProducerConsumer();
Thread producerThread = new Thread(() -> {
try {
pc.produce();
} catch (InterruptedException e) {
}
});
Thread consumerThread = new Thread(() -> {
try {
pc.consume();
} catch (InterruptedException e) {
}
});
producerThread.start();
consumerThread.start();
// >> 生産: 0
// >> 生産: 1
// >> 生産: 2
// >> 生産: 3
// >> 生産: 4
// >> 生産: 5
// >> 生産: 6
// >> 生産: 7
// >> 生産: 8
// >> 生産: 9
// >> 消費: 0
// >> 消費: 1
// >> 消費: 2
// >> 消費: 3
// >> 消費: 4
// >> 消費: 5
// >> 消費: 6
// >> 消費: 7
// >> 消費: 8
// >> 消費: 9
// >> 生産: 10
// >> 生産: 11
// >> 生産: 12
// >> 生産: 13
// >> 生産: 14
// >> 生産: 15
// >> 生産: 16
// >> 生産: 17
// >> 生産: 18
// >> 生産: 19
// >> 消費: 10
// >> 消費: 11
// >> 消費: 12
// >> 消費: 13
// >> 消費: 14
// >> 消費: 15
// >> 消費: 16
// >> 消費: 17
// >> 消費: 18
// >> 消費: 19
// >> 生産: 20
// >> 生産: 21
.
.
.
}
}
volatile
synchronized
と似たものにvolatile
と言う修飾子がある。
volatile
は「揮発性」を意味し、値が変化しやすいことをコンパイラに通知する修飾子。
volatile
で宣言されたフィールドは、コンパイラによって最適化されず、常にメインメモリからアクセスされることが保証される。
メモリの種類
メモリには以下の種類があり、それぞれアクセス速度と容量が異なる。
- メインメモリ : 補助記憶装置からプログラムが展開される主要なメモリ
- キャッシュメモリ : メインメモリよりも高速でCPUに近い位置にある
- レジスタ : 最も高速なメモリでCPU内部にある
アクセス速度順に並べると、
遅い < メインメモリ < キャッシュメモリ < レジスタ < 速い
容量はその逆。
小さい < レジスタ < キャッシュメモリ < メインメモリ < 大きい
という関係になっている。
コンパイル
Javaのプログラムは、実行されるまでに2回のコンパイルを経ることがある。
事前コンパイル
ソースコード(Main.java
)から中間バイトコード(Main.class
)への静的コンパイルを指す。
「実行前に」 javac によって行われる。
プラットフォーム(OS)に依存しない中間コードが生成される。
実行時コンパイル
JVM によって「実行時に」行われる動的コンパイル。
事前コンパイルされた中間コードの一部が機械語に変換される。
パフォーマンス向上を目的として行われる。
JVM
JVMのJava実行エンジンには インタプリタモードと、 JITコンパイラモード (JIT = Just-In-Time)がある。
インタプリタモード
中間コードを1行づつ解釈して実行する。
JITコンパイラモード
中間コードの一部を、実行時コンパイルによって機械語に変換しながら実行するモード。
機械語はCPUが直接理解することのできる形式のため、中間コードのままインタプリタで実行させるよりも実行速度が向上する。
この処理を最適化と言う。
最適化
JITコンパイラは、実行されるコードのうち頻繁に利用される部分に対して実行時コンパイルを行い、更にキャッシュメモリを利用して 最適化 を行う。
通常フィールドは、メインメモリ上に展開されて、プログラム内で更新されることがある。
一方で、プログラム内で頻繁に利用される一部のフィールドなどは、最適化によってキャッシュメモリ上に展開される。
キャッシュされたフィールドは、メインメモリ上のフィールドと同期されないため、最新の値が反映されず、複数のスレッドが同一のフィールドを更新したときに矛盾が生じることがある。
volatile
の効果
複数のスレどから更新される可能性のあるフィールドに対してvolatile
修飾子を使用して宣言しておくと、そのフィールドが他のスレッドによって変更される可能性があることをコンパイラに対して示すことになるため、コンパイラがそのフィールドへのアクセスを最適化しないようにすることができる。
volatile
を付与したフィールドは、常にメインメモリから読み込まれ、変更が即座にメインメモリ上に反映されることが保証される。
volatile
で宣言したフィールドは、キャッシュされずに常にメインメモリから読み込まれる
複数のスレッド間で最新の値にアクセスできることから、「変数の可視性が確保される」とも表現される。
ただし、複数のスレッド間からの更新処理の競合を防ぐものではないので、「原子性は確保されない」と表現される。
volatile
は競合自体を防ぐものではない
class MyClass {
// volatileを使ったフィールド
volatile boolean flag = false;
}
MyClass myClass = new MyClass();
// スレッドAからのアクセス
myClass.flag = true; // メインメモリ上で更新が即反映される
// スレッドBからのアクセス
if (myClass.flag) {
// volatileを使用しない場合、キャッシュされていたflag(falseが格納)にアクセスされることがある
// volatileを使用することで、必ずメインメモリから読み出されるため、矛盾が発生しない
}
synchronzed
とvolatile
synchronzed
とvolatile
は、どちらもマルチスレッド環境でデータの一貫性を確保するための手段であるが、それぞれ違いがある。
synchronized
排他制御を実現するためにメソッドやブロックに対して使用される。
あるスレッドがsynchronized
ブロックに入ると、他のスレッドはロックしたインスタンスやメソッドへアクセスすることができなくなる。
メソッドやブロックに対して使用することができるため、トランザクションのようなある程度まとまった単位で処理の原子性を確保することができる。デッドロックの危険がある。
volatile
複数のスレッドからの可視性を確保するために、変数に対して使用される。volatile
を使用した変数はキャッシュメモリではなく、必ずメインメモリを介してアクセスされる。単一な操作に対しては効果があるものの、synchronized
と比べて、局所的な制御という感じで、複数の処理を排他制御するなどはできない。volatile
を使用しても、複数のスレッドが同時に同じフィールドを変更しようとする競合は防ぐことができない。
class MyClass {
// キャッシュされないようにできる
volatile boolean flag = false;
}
class MyClass {
private int i1 = 0;
private int i2 = 0;
public synchronized void incrementCounters() {
// 複数のスレッドから呼び出されたとしても、競合しない
i1++;
i2++;
}
}
// 組み合わせて使用することもできる
class MyClass {
private volatile int i1 = 0;
private volatile int i2 = 0;
public void incrementCounters() {
synchronized (this) {
i1++;
i2++;
}
}
}
複数の文を排他制御したり、複雑な処理を同期処理したい場合にはsynchronized
を使用し、単に一つの変数の可視性を確保したい場合にはvolatile
を使用する。
アトミック変数
java.util.concurrent.atomic
パッケージ
スレッドセーフなプログラミング手法には他にも、java.util.concurrent.atomic
パッケージに含まれるAtomicBoolean
やAtomicInteger
などのクラス群を利用する方法もある。
これらのクラス群を使用した型は、値の取得、演算、再代入までの操作を、他のスレッドに割り込まれることなく実行できる(スレッド同士が競合しない)。これはアトミックな操作と表現とされる(Atomic=原子の、不可分の)。
元々、Javaのlong
、double
型以外の基本型(int
、short
、byte
、char
、float
)はアトミックに操作できるため、AtomicLong
、AtomicDouble
が主に利用される。
Javaのlong
、double
のデータ型は、64bitのメモリ領域を必要とする。64bitのデータ型は、ハードウェア上では2回にわたる32bitの操作として扱われることがあり、この2回の操作の間に他のスレッドに割り込まれる可能性があるため、アトミックな操作が保証されない。
import java.util.concurrent.atomic.AtomicInteger;
AtomicInteger atomicInt = new AtomicInteger(10);
// 値の取得
int currentValue = atomicInt.get();
// 値の設定(インクリメント)
int newValue = atomicInt.incrementAndGet();
// アトミックな更新
// 10と等しい場合、20を代入する。10と等しい場合、メソッドはtrueを返却する
boolean isUpdated = atomicInt.compareAndSet(10, 20);
バリアー
CyclicBarrier
複数のスレッドで実行される処理において、それぞれの実行タイミングを一致させるような制御をスレッドの同期化という。
同期化を行うには、java.util.concurrent.CyclicBarrier
クラスを使用する。
CyclicBarrier
は同期化支援機能を持つクラスであり、バリアー という概念を扱う。
複数のスレッドが並列実行されているときに、全てのスレッドが特定のポイントに到達するまで待機させるポイントを バリアーポイント と言う。
また全てのスレッドがバリアーポイントに到達した時、バリアーの解除と同時に実行される処理をバリアーアクションと言う。
CompletableFuture.allOf()
と混同してしまいがちだが、CompletableFuture
が非同期処理の「完了」に焦点を当てた高級なフレームワークであるのに対して、CyclicBarrier
はスレッド間の「同期」に焦点を当てた低級な同期プリミティブである。
CompletableFuture
では非同期処理の完了結果を次の非同期処理に渡し、複数の非同期処理を連携させることができる。
CyclicBarrier
では非同期処理の途中の好きな場所にバリアーポイントを設置し、複数の非同期処理間のタイミングを同期させることができる。
全てのスレッドのデータ集約、ログ出力などの共通処理がバリアーアクションとされることが多い。
CyclicBarrier
を利用するには、まず初めに同期させるスレッド数、バリアーアクションを指定する必要がある。
CyclicBarrier barrier = new CyclicBarrier(int parties, Runnable barrierAction)
// parties = スレッド数
生成したバリアーを利用して、バリアーポイントを作成することができる
barrier.await();
以下ではスレッドを3つ作成して実行している。
実行の順番は前述の通りスレッドスケジューラが管理しているため実行する度に変わるが、バリアーポイントを設置したことによって3つのスレッドの間で同期が行われ、バリアーアクションがバリアーポイント通過後に実行されている。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Main {
public static void main(String[] args) {
// バリアーの生成
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("バリアーアクションの実行");
});
Runnable task = () -> {
try {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " バリアーポイントへ到達");
// バリアーポイントで待機
barrier.await();
System.out.println(threadName + " バリアー解除");
} catch (InterruptedException | BrokenBarrierException e) {
}
};
// スレッドの生成
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
Thread thread3 = new Thread(task);
// スレッドの開始
thread1.start();
thread2.start();
thread3.start();
// >> Thread-2 バリアーポイントへ到達
// >> Thread-0 バリアーポイントへ到達
// >> Thread-1 バリアーポイントへ到達
// >> バリアーアクションの実行
// >> Thread-0 バリアー解除
// >> Thread-2 バリアー解除
// >> Thread-1 バリアー解除
}
}
バリアーポイントの設置のために利用したawait()
は到着したスレッドのインデックスを返すため、バリアーポイントへの到着順序に応じた処理を実行することもできる。インデックスの最小値は 0
、最大値は getParties() - 1
。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Main2 {
public static void main(String[] args) {
// スレッド数を10に変更
CyclicBarrier barrier = new CyclicBarrier(10, () -> {
System.out.println("バリアーアクションの実行");
});
Runnable task = () -> {
try {
String threadName = Thread.currentThread().getName();
System.out.println(threadName + " バリアーポイントへ到達");
// 到着インデックスを取得する
int index = barrier.await();
if (index == barrier.getParties() - 1) {
System.out.println("最初にバリアーポイントへ到着したのは " + threadName + " です");
} else if (index == 0) {
System.out.println("最後にバリアーポイントへ到着したのは " + threadName + " です");
}
} catch (InterruptedException | BrokenBarrierException e) {
}
};
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
Thread thread3 = new Thread(task);
Thread thread4 = new Thread(task);
Thread thread5 = new Thread(task);
Thread thread6 = new Thread(task);
Thread thread7 = new Thread(task);
Thread thread8 = new Thread(task);
Thread thread9 = new Thread(task);
Thread thread10 = new Thread(task);
thread1.start();
thread2.start();
thread3.start();
thread4.start();
thread5.start();
thread6.start();
thread7.start();
thread8.start();
thread9.start();
thread10.start();
// >> Thread-2 バリアーポイントへ到達
// >> Thread-6 バリアーポイントへ到達
// >> Thread-1 バリアーポイントへ到達
// >> Thread-4 バリアーポイントへ到達
// >> Thread-3 バリアーポイントへ到達
// >> Thread-7 バリアーポイントへ到達
// >> Thread-8 バリアーポイントへ到達
// >> Thread-9 バリアーポイントへ到達
// >> Thread-5 バリアーポイントへ到達
// >> Thread-0 バリアーポイントへ到達
// >> バリアーアクションの実行
// >> 最初にバリアーポイントへ到着したのは Thread-2 です
// >> 最後にバリアーポイントへ到着したのは Thread-7 です
}
}
CyclicBarrier
オブジェクトは再利用可能なオブジェクトなので、バリアーポイントの設置、バリアーアクションの登録は何度でも繰り返し行うことができる。
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class Main2 {
public static void main(String[] args) {
CyclicBarrier barrier = new CyclicBarrier(3, () -> {
System.out.println("バリアーアクションの実行");
});
Runnable task = () -> {
try {
System.out.println(Thread.currentThread().getName() + " バリアーポイントへ到達");
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
}
};
Thread thread1 = new Thread(task);
Thread thread2 = new Thread(task);
Thread thread3 = new Thread(task);
Thread thread4 = new Thread(task);
Thread thread5 = new Thread(task);
Thread thread6 = new Thread(task);
thread1.start();
thread2.start();
thread3.start();
try {
// メインメソッドを1秒間待機させることで、thread1~3がバリアーポイントへ到着する時間を与える
Thread.sleep(1000);
} catch (InterruptedException e) {
}
thread4.start();
thread5.start();
thread6.start();
// >> Thread-2 バリアーポイントへ到達
// >> Thread-0 バリアーポイントへ到達
// >> Thread-1 バリアーポイントへ到達
// >> バリアーアクションの実行
// >> Thread-3 バリアーポイントへ到達
// >> Thread-5 バリアーポイントへ到達
// >> Thread-4 バリアーポイントへ到達
// >> バリアーアクションの実行
}
}
InterruptedException
InterruptedException
は、スレッドが
- スリープ(
Thead.sleep()
) - 待機(
Object.wait()
、Condition.await()
) - または他のブロッキング操作(
Thread.join()
)
を実行しているときに、他のスレッドによって割り込みが発生したことを示すためにthrow
される検査例外のことを指す。
Thread
には以下のメソッドが定義されている。
public void interrupt()
実行したThread
のスレッドに対して割り込みを発生させる。
public static boolean interrupted()
割り込みステータスと呼ばれる、Thread
クラスが内部で保持する自身のスレッドに対して割り込みが発生したかどうかの情報がinterrupted()
の実行によって削除される。そのため、2回連続で実行した場合、2回目の実行結果は必ずfalse
になる。
public boolean isInterrupted()
割り込みステータスが削除されない。
(この辺りの知識が非常に複雑でうまく理解できなかった。また折を見て内容を更新したい。)
段階的に整理したいので、まずはスレッドが待機している状態を作る。
Thread waitThread = new Thread(() -> {
try {
System.out.println("waitThreadを10秒間待機させます...");
Thread.sleep(10000);
System.out.println("waitThreadの待機を解除します");
} catch (InterruptedException e) {
System.out.println("waitThreadへの割り込みが発生しました");
}
});
waitThread.start();
// >> waitThreadを10秒間待機させます...
// ----- 10秒間何も出力されない -----
// >> waitThreadの待機を解除します
次に、この待機中のスレッドに割り込みを発生させる。
Thread waitThread = new Thread(() -> {
try {
System.out.println("waitThreadを10秒間待機させます...");
Thread.sleep(10000);
System.out.println("waitThreadの待機を解除します");
} catch (InterruptedException e) {
System.out.println("waitThreadへの割り込みが発生しました");
}
});
waitThread.start();
try {
// waitThread が確実に待機状態に入るための時間を与える
Thread.sleep(5000);
// メインスレッド上から、現在待機中の waitThread への割り込みを発生させる
waitThread.interrupt();
} catch (InterruptedException e) {
// 今回ここはあまり意識しなくて良いところ
System.out.println("メインスレッドへの割り込みが発生しました");
}
// >> waitThreadを10秒間待機させます...
// ----- 5秒間何も出力されない -----
// >> waitThreadへの割り込みが発生しました
Thread
クラスに定義されていたisInterrupted()
メソッドの使い方を確認してみる。
Thread waitThread = new Thread(() -> {
try {
int i = 0;
// 割り込みが発生しない限り、ループを抜けない
while (!Thread.currentThread().isInterrupted()) {
i++;
System.out.println(i + "回目のループ");
Thread.sleep(2000);
}
System.out.println("waitThreadの: isInterrupted true");
} catch (InterruptedException e) {
System.out.println("waitThreadへの割り込みが発生しました");
}
});
waitThread.start();
try {
Thread.sleep(5000);
System.out.println("interrupt()");
waitThread.interrupt();
if (waitThread.isInterrupted()) {
System.out.println("isInterrupted() : true");
} else {
System.out.println("isInterrupted() : false");
}
// waitThread の終了を待つ(この処理が無いと、メインスレッドが waitThread より先に終了する)
waitThread.join();
} catch (InterruptedException e) {
System.out.println("メインスレッドへの割り込みが発生しました");
}
System.out.println("メインスレッドが終了");
// >> 1回目のループ
// >> 2回目のループ
// >> 3回目のループ
// >> interrupt()
// >> isInterrupted() : true
// >> waitThreadへの割り込みが発生しました
// >> メインスレッドが終了
InterruptedException
発生時の処理
Thread
は内部で割り込みステータスと呼ばれる、割り込みが発生したかどうかの情報を管理している。
InterruptedException
が発生した場合、この割り込みステータスはクリアされるため、もしスレッド生成元に割り込みがあったことを伝えたい場合には、割り込みがあったスレッドからさらに割り込みを行う必要がある。
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
InterruptedException
が発生すると、割り込みステータスはクリアされる
割り込みを受け取った場合に適切な処理を行います。これは、割り込みフラグをクリアするか(Thread.interrupted()を呼び出す)、例外処理を行うか、必要に応じてリソースを解放するなどの対策を含みます
参考
・Head First Java 第2版 ―頭とからだで覚えるJavaの基本
・独習Java 新版
・徹底攻略Java SE 11 Gold問題集[1Z0-816]対応
・マルチスレッド・プログラミングの道具箱