シンクロナイザとは
自身の状態を使ってスレッドの進行を制御(調停)するオブジェクトのことをシンクロナイザ (synchronizer) と呼ぶ。
例えば、次のような実装があったとする。
package synchronizer;
import java.util.concurrent.TimeUnit;
public class SynchronizerTest {
public static void main(String[] args) {
runThread(() -> {
System.out.println("Thread 1 start");
System.out.println("Thread 1 end");
});
runThread(() -> {
System.out.println("Thread 2 start");
System.out.println("Thread 2 sleeping...");
TimeUnit.SECONDS.sleep(3);
System.out.println("Thread 2 awake");
System.out.println("Thread 2 end");
});
}
public static Thread runThread(ThrowingRunnable process) {
final Thread thread = new Thread(() -> {
try {
process.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
});
thread.start();
return thread;
}
interface ThrowingRunnable {
void run() throws Exception;
}
}
※以後、 runThread
と ThrowingRunnable
の記載は省略する
- 2つのスレッドを起動している
- スレッド1は標準出力に開始と終了のメッセージを書き出しているだけ
- スレッド2は途中で3秒間のスリープを入れている
Thread 1 start
Thread 1 end
Thread 2 start
Thread 2 sleeping...
Thread 2 awake
Thread 2 end
スレッド2がスリープしている間もスレッド1は普通に処理が流れるので、スレッド1が先に終わる形でメッセージが出力される
ここに、シンクロナイザの1つである CyclicBarrier
を以下のように導入してみる。
package synchronizer;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
public class SynchronizerTest {
public static void main(String[] args) {
final CyclicBarrier barrier = new CyclicBarrier(2);
runThread(() -> {
System.out.println("Thread 1 start");
barrier.await(); // ここで待機
System.out.println("Thread 1 end");
});
runThread(() -> {
System.out.println("Thread 2 start");
System.out.println("Thread 2 sleeping...");
TimeUnit.SECONDS.sleep(3);
System.out.println("Thread 2 awake");
barrier.await(); // ここで待機
System.out.println("Thread 2 end");
});
}
...
}
-
CyclicBarrier
を2で初期化してインスタンスを生成し、各スレッドの途中でawait
メソッドを呼び出すようにしている
Thread 1 start
Thread 2 start
Thread 2 sleeping...
Thread 2 awake
Thread 2 end
Thread 1 end
- 今度はスレッド1がスレッド2より後に終了している
-
CyclicBarrier
のawait
メソッドは、それを呼び出したスレッドをブロックする(処理が停止する) - そして、コンストラクタで指定された数と同じ回数
await
メソッドが呼ばれると、ブロックは解放されて処理が再開される- つまり、上記実装の場合は
await
が2回呼ばれるとブロックが解放される
- つまり、上記実装の場合は
- したがって、スレッド1はスレッド2のスリープが終わって2回目の
await
が呼ばれるまで処理がブロックされていたことになる - この結果、スレッド1の終了メッセージはスレッド2のスリープ終了より後に書き出されることになった
- スレッド2の終了メッセージより後になるか前になるかはタイミング次第
CyclicBarrier
のように、自身の状態(CyclicBarrier
の場合は await
が呼ばれた回数)を用いて複数のスレッドの実行を制御する機能を提供するオブジェクトをシンクロナイザと呼ぶ。
シンクロナイザを利用すると、複数のスレッドの実行タイミングを細かく制御できるようになる。
Java の標準APIでは、以下の5つのシンクロナイザが提供されている。
ここでは、これらシンクロナイザの使い方をメモする。
Semaphore
Semaphore (セマフォ)は、数に制限のあるリソースのスレッドへの払い出しを制御するような場面で利用できる。
package synchronizer;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
public static void main(String[] args) {
Semaphore semaphore = new Semaphore(2);
for (int i=0; i<10; i++) {
String name = "Thread[" + i + "]";
runThread(() -> {
System.out.println(name + " start");
semaphore.acquire();
System.out.println(name + " acquire");
TimeUnit.SECONDS.sleep(1);
System.out.println(name + " release");
semaphore.release();
});
}
}
}
-
Semaphore
を2で初期化している -
acquire
メソッドを呼び出した後に1秒スリープしてからrelease
メソッドを呼ぶスレッドを同時に10個起動している - この結果は以下のようになる
Thread[4] start
Thread[0] start
Thread[0] acquire
Thread[2] start
Thread[4] acquire
Thread[7] start
Thread[3] start
Thread[9] start
Thread[6] start
Thread[8] start
Thread[1] start
Thread[5] start
Thread[4] release
Thread[0] release
Thread[1] acquire
Thread[6] acquire
Thread[6] release
Thread[1] release
Thread[2] acquire
Thread[8] acquire
Thread[2] release
Thread[8] release
Thread[3] acquire
Thread[5] acquire
Thread[3] release
Thread[5] release
Thread[7] acquire
Thread[9] acquire
Thread[7] release
Thread[9] release
分かりやすいように図に整理すると以下のようになる。
よく見ると、黄色の部分(Semaphore
の acquire
を呼び出してから release
を呼び出すまでの区間)は、常に最大でも2つのスレッド分しか存在していないことが分かる
-
Semaphore
は、同時に利用可能なリソースの数を制御するときに利用できる - コンストラクタの引数で同時利用可能な許可の数(パーミット)を指定する
- 上記例では、パーミットには2を指定している
- パーミットの数が余っている状態で
acquire
メソッドを呼ぶと、パーミットが払い出される - パーミットの数に空きがない状態で
acquire
メソッドを呼ぶと、パーミットが解放されるまで呼び出し元のスレッドはブロックされる -
release
メソッドを呼ぶと、パーミットが解放される - つまり、データベースのコネクションプールのようなことを再現できる
InterruptedException をスローしない acquire
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
final Thread thread = runThread(() -> {
Thread th = Thread.currentThread();
System.out.println("[Thread] before acquireUninterruptibly (interrupted=" + th.isInterrupted() + ")");
semaphore.acquireUninterruptibly();
System.out.println("[Thread] after acquireUninterruptibly (interrupted=" + th.isInterrupted() + ")");
});
Thread.sleep(1000);
System.out.println("[main] interrupt");
thread.interrupt();
Thread.sleep(1000);
System.out.println("[main] release");
semaphore.release();
}
}
- パーミット数1で
Semaphore
を生成している- メインスレッドの方ですぐに
acquire
を呼んでパーミットの空きをなくしている
- メインスレッドの方ですぐに
- 別スレッドを起動して、
acquireUninterruptibly
でパーミットが空くのを待機する - メインスレッドの方で少し待ってから別スレッドに割り込みを行い、
Semaphore
を解放する
[Thread] before acquireUninterruptibly (interrupted=false)
[main] interrupt
[main] release
[Thread] after acquireUninterruptibly (interrupted=true)
-
acquire
メソッドはthrows InterruptedException
で宣言されているため、呼び出し元でInterruptedException
をハンドリングしなければならない- 実装例では
throws Exception
を宣言したThrowingThrowable
を使っているので省略できているが、実際はInterruptedException
のcatch
が必要になる
- 実装例では
- 一方で、
acquireUninterruptibly
にはthrows
句が宣言されておらず、呼び出し元でInterruptedException
のハンドリングを書かなくてもいい - パーミットに空きがない場合の挙動は
acquire
と同じで、呼び出し元のスレッドはブロックされる - しかし、スレッドが割り込まれてもブロックは継続される
- パーミットに空きができるとブロックは解放され処理は再開される
- このとき、
Thread
には割り込みされたことが記録されるので、必要であればisInterrupted
などで状態を参照することでハンドリングができる
パーミットに空きがない場合も待機しないようにする
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
runThread(() -> {
System.out.println("[thread] tryAcquire");
System.out.println("[thread] value = " + semaphore.tryAcquire());
});
System.out.println("[main] sleep");
Thread.sleep(1000);
System.out.println("[main] release");
semaphore.release();
}
}
- パーミット数1で
Semaphore
を生成している - メインスレッドで即座に
acquire
を呼び、パーミットの空きをなくしている - 別スレッドを起動して、
tryAcquire
でパーミットの取得を試みている - メインスレッドでは1秒スリープしてから、最初に取得したパーミットを解放している
[main] sleep
[thread] tryAcquire
value = false
[main] release
- パーミットに空きがない状態で
acquire
を呼んだ場合、呼び出し元のスレッドはブロックされる - しかし
tryAcquire
の場合、パーミットに空きがない場合はすぐにメソッドが戻される- このとき、戻り値として
false
が返される - パーミットに空きがあり取得できた場合は
true
が返される
- このとき、戻り値として
- tryAcquire(long, TimeUnit) を使えば、待機時間に上限を設けることもできる
複数のパーミットを同時に取得する
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(4);
for (int i=0; i<4; i++) {
int n = i+1;
runThread(() -> {
System.out.println("[" + n + "] start");
semaphore.acquire(n);
System.out.println("[" + n + "] acquire(" + n + ")");
System.out.println("[" + n + "] sleep");
Thread.sleep(100);
System.out.println("[" + n + "] awake");
System.out.println("[" + n + "] release(" + n + ")");
semaphore.release(n);
});
}
}
}
- パーミット数4で
Semaphore
を生成している - 4つのスレッドを起動し、それぞれパーミット数1,2,3,4で
acquire
とrelease
を行うようにしている
[3] start
[4] start
[1] start
[2] start
[3] acquire(3)
[1] acquire(1)
[3] sleep
[1] sleep
[1] awake
[3] awake
[1] release(1)
[3] release(3)
[4] acquire(4)
[4] sleep
[4] awake
[4] release(4)
[2] acquire(2)
[2] sleep
[2] awake
[2] release(2)
図で表現すると以下のようになる。
-
acquire(int)
およびrelease(int)
を使用すると、一度に複数のパーミットの取得や解放ができる
コンストラクタで指定したパーミット数より多い数で acquire
するとどうなるか?
実際に試すと、以下のようになる。
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(2);
System.out.println("before acquire(3)");
semaphore.acquire(3);
System.out.println("after acquire(3)");
}
}
before acquire(3)
特にエラーになることはなく、 acquire
はパーミットの空きが3になるのを待ち続ける。
ということは、ブロックは永久に解除されない状態になってしまうのかというとそうではなく、 release
を3回やれば動き出す。
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(2);
runThread(() -> {
semaphore.release(3);
});
System.out.println("before acquire(3)");
semaphore.acquire(3);
System.out.println("after acquire(3)");
}
}
before acquire(3)
after acquire(3)
コンストラクタでパーミット数を指定しているが、別にその値を上限として acquire
と release
の数が厳密にチェックされているわけではなさそう。
パーミット数に関係なく好きなだけ acquire
や release
ができるようになっている。
Semaphore
の Javadoc には以下のように記載されている。
~セマフォの適切な使用法は、アプリケーションでのプログラミング規約で確立されます。
release(int) | Semaphore (Javadoc)
これはつまり、パーミット数に収まる形で acquire
, release
の呼び出し回数を制御するのはアプリケーション側の役目だよ、ってことを言っているのかなと自分は解釈している。1
acquire と release は異なるスレッドでも構わない
release
は acquire
したスレッドで呼ばなければならないように思いそうになるが、そうではない。
package synchronizer;
import java.util.concurrent.Semaphore;
public class SemaphoreTest {
public static void main(String[] args) throws Exception {
final Semaphore semaphore = new Semaphore(1);
semaphore.acquire();
runThread(() -> {
semaphore.release();
semaphore.acquire();
System.out.println("acquire!");
});
}
}
- パーミット数1で
Semaphore
を生成している - メインスレッドで
acquire
を呼んでいる - 別スレッドを起動し、最初に
release
を呼んでからacquire
をしている
acquire!
-
release
メソッドはメインスレッドとは別のスレッドで呼ばれているが、問題なくパーミットが解放されて続くacquire
でパーミットを獲得できていることが分かる -
Semaphore
はパーミットとスレッドとの紐づきは見ておらず、単にパーミットの数だけで制御をしている
CountDownLatch
CountDownLatch は、特定の条件が満たされるまでスレッドの進行を防いでおきたいような場面で利用できる。
Latch (ラッチ)は、日本語で「外れ止め」や「かんぬき」という意味。
package synchronizer;
import java.util.concurrent.CountDownLatch;
public class CountDownLatchTest {
public static void main(String[] args) throws Exception {
final CountDownLatch latch = new CountDownLatch(3);
for (int i=0; i<3; i++) {
String name = "Thread[" + i + "]";
runThread(() -> {
System.out.println(name + " start");
Thread.sleep(500);
System.out.println(name + " end");
latch.countDown();
});
}
char[] labels = {'a', 'b', 'c'};
for (int i=0; i<3; i++) {
String name = "Thread[" + labels[i] + "]";
runThread(() -> {
System.out.println(name + " await...");
latch.await();
System.out.println(name + " end");
});
}
}
}
- コンストラクタで3を指定して
CountDownLatch
を生成している - スレッド3つ(1, 2, 3)を起動し、500ミリ秒スリープしてから
CountDownLatch
のcountDown
メソッドを呼ぶ - 別のスレッド3つ(a, b, c)を起動し、こちらは
await
で処理を待機させている
Thread[2] start
Thread[1] start
Thread[0] start
Thread[a] await...
Thread[b] await...
Thread[c] await...
Thread[2] end
Thread[1] end
Thread[0] end
Thread[a] end
Thread[b] end
Thread[c] end
- 後に起動した3つのスレッド(a, b, c)は、先に起動した3つのスレッド(1, 2, 3)の処理が終了するまで待機していることが分かる
-
CountDownLatch
は、まずコンストラクタでカウントの数を指定する- 内部のカウンターがこの値で初期化される
-
countDown
メソッドを呼ぶとカウントが1つ減る-
countDown
は待機させられることはなく、すぐ呼び出し元に処理が戻される - カウントの初期値以上の回数呼び出しても、例外になることはない
-
-
await
メソッドを呼ぶと、カウントが0になるまでスレッドがブロックされる - これにより、いくつかのスレッドの処理が全て終わるまで処理を待機させる、みたいな制御が実現できる
-
CountDownLatch
は一度カウントが減ると元に戻すことはできないので、再利用ができない- インスタンスを作り直す必要がある
CyclicBarrier
CyclicBarrier は、複数のスレッドの足並みを揃えさせるような場面で利用できる。
package synchronizer;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) {
final CyclicBarrier barrier = new CyclicBarrier(3);
for (int i=0; i<3; i++) {
String name = "Thread[" + i + "]";
int time = 100 * (i+1);
runThread(() -> {
System.out.println(name + " start");
System.out.println(name + " sleep " + time + "ms");
Thread.sleep(time);
System.out.println(name + " awake");
barrier.await();
System.out.println(name + " end");
});
}
}
}
- コンストラクタで3を指定して
CyclicBarrier
を生成している - 3つのスレッドを起動し、それぞれ少しスリープしてから
CyclicBarrier
のawait
を呼んで待機している
Thread[2] start
Thread[0] start
Thread[1] start
Thread[2] sleep 300ms
Thread[1] sleep 200ms
Thread[0] sleep 100ms
Thread[0] awake
Thread[1] awake
Thread[2] awake
Thread[0] end
Thread[1] end
Thread[2] end
-
Thread[0]
やThread[1]
はThread[2]
より短い時間だけスリープしているので、普通なら先にスレッドが終了するはずが、実際にはThread[2]
の終了を待機しているのが分かる -
CyclicBarrier
を使うと、複数のスレッドの処理があるところまで揃うのを待機させるような制御ができるようになる - コンストラクタで足並みを揃えたいスレッド(パーティ)の数を指定する
-
await
メソッドを呼ぶと、await
を呼んだスレッドの数がパーティ数に達するまで呼び出し元のスレッドがブロックされる- 上記例ではパーティ数3で初期化しているので、3つのスレッドから
await
が呼ばれるまで処理がブロックされる
- 上記例ではパーティ数3で初期化しているので、3つのスレッドから
-
await
の呼び出し回数がパーティ数に達すると、ブロックが解放されてそれまで待機していたスレッドの処理が一斉に再開される
再利用する
package synchronizer;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
program(barrier);
System.out.println("=====");
program(barrier);
}
private static void program(CyclicBarrier barrier) throws Exception {
runThread(() -> {
System.out.println("Thread sleep");
Thread.sleep(500);
System.out.println("Thread await");
barrier.await();
});
System.out.println("Main await");
barrier.await();
System.out.println("Main restart");
}
}
- パーティ数2で
CyclicBarrier
を生成している - 別スレッドを起動して少しスリープしてから
await
を呼び出している - メインスレッドではすぐに
await
を呼んで、別スレッドの終了を待機している - この処理を2回繰り返している
-
CyclicBarrier
は、2回とも同じインスタンスを利用している
Main await
Thread sleep
Thread await
Main restart
=====
Main await
Thread sleep
Thread await
Main restart
-
await
の呼び出し回数がパーティ数に達すると、CyclicBarrier
の状態は初期状態に戻る- このため、
CyclicBarrier
はインスタンスの再利用ができる - 名前に cyclic (循環式) がついている理由
- このため、
await がパーティ数に達したときに処理を挟む
package synchronizer;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(4, () -> {
String name = Thread.currentThread().getName();
System.out.println("[" + name + "] Tripped!");
});
for (int i=0; i<4; i++) {
int time = 100 * (i+1);
runThread(() -> {
String name = Thread.currentThread().getName();
System.out.println("[" + name + "] sleep " + time + "ms");
Thread.sleep(time);
System.out.println("[" + name + "] await");
barrier.await();
});
}
}
}
- パーティ数4で
CyclicBarrier
を生成している - コンストラクタの第二引数に
Runnable
を渡している - スレッドを4つ生成し、それぞれ少しスリープしてから
await
で待機している
[Thread-3] sleep 400ms
[Thread-1] sleep 200ms
[Thread-0] sleep 100ms
[Thread-2] sleep 300ms
[Thread-0] await
[Thread-1] await
[Thread-2] await
[Thread-3] await
[Thread-3] Tripped!
- コンストラクタの第二引数には
Runnable
を渡すことができる - この
Runnable
は、await
の呼び出し回数がパーティ数に達して処理が再開される前にコールバックされる- 処理が再開されることをトリップ(trip) と呼ぶ
- また、このトリップ時に呼ばれる処理のことを バリアー・アクション と呼ぶ
- トリップの処理は、トリップのトリガーとなった
await
の呼び出しを行ったスレッドで行われる
故障状態
package synchronizer;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
Thread th = runThread(() -> {
try {
System.out.println("Thread await");
barrier.await();
} catch (InterruptedException e) {
System.out.println("interrupted");
System.out.println("broken = " + barrier.isBroken());
}
});
Thread.sleep(500);
System.out.println("interrupt thread");
th.interrupt();
Thread.sleep(500);
try {
barrier.await();
} catch (BrokenBarrierException e) {
System.out.println("BrokenBarrierException: " + e.getMessage());
}
}
}
- パーティ数2で
CyclicBarrier
を生成している - 別スレッドを起動して
await
で待機する-
InterruptedException
をキャッチして、isBroken
でCyclicBarrier
の状態を確認している
-
- メインスレッドで少し待ってから、別起動したスレッドに割り込みを行う
Thread await
interrupt thread
interrupted
broken = true
BrokenBarrierException: null
- 特定の条件が満たされると、
CyclicBarrier
は故障状態になる - 故障状態になる条件としては、以下がある
-
await
中にスレッドが割り込まれた場合(上記実装例のケース) - await(long, TimeUnit) での待機でタイムアウトになった場合
- バリアー・アクションで例外がスローされた場合
-
-
CyclicBarrier
が故障状態かどうかは、isBroken
メソッドで確認できる - 故障状態の
CyclicBarrier
に対してawait
を呼ぶと、BrokenBarrierException
がスローされる
故障状態をリセットする
package synchronizer;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
public class CyclicBarrierTest {
public static void main(String[] args) throws Exception {
final CyclicBarrier barrier = new CyclicBarrier(2);
try {
barrier.await(1, TimeUnit.MICROSECONDS);
} catch (TimeoutException e) {
System.out.println("timeout");
}
System.out.println("broken = " + barrier.isBroken());
barrier.reset();
System.out.println("broken = " + barrier.isBroken());
}
}
- タイムアウトありで
await
を呼び出す- タイムアウト時間を1ミリ秒にして即座にタイムアウトさせることでバリアを故障状態にする
-
reset
メソッドを呼び出して前後で故障状態を確認している
timeout
broken = true
broken = false
-
reset
メソッドを呼ぶと、故障状態がクリアされてCyclicBarrier
が初期状態に戻る - 初期状態に戻った
CyclicBarrier
は再利用できる
Phaser
Phaser (フェーザー)を使うと、 CountDownLatch
や CyclicBarrier
と同様の制御をより柔軟に行うことができる。
特に、調停するスレッド数が動的に変わるようなときに利用できる。
package synchronizer;
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) throws Exception {
final Phaser phaser = new Phaser();
phaser.register(); // main スレッド分
runThreads(phaser, 3);
System.out.println("main await");
phaser.arriveAndAwaitAdvance();
System.out.println("main restart");
Thread.sleep(1000);
System.out.println("=====");
runThreads(phaser, 2);
System.out.println("main await");
phaser.arriveAndAwaitAdvance();
System.out.println("main restart");
}
private static void runThreads(Phaser phaser, int numberOfThreads) {
for (int i=0; i<numberOfThreads; i++) {
int n = i+1;
String name = "Thread[" + n + "]";
int time = 100 * n;
phaser.register();
runThread(() -> {
System.out.println(name + " sleep " + time + "ms");
Thread.sleep(time);
System.out.println(name + " awake");
phaser.arriveAndDeregister();
System.out.println(name + " end");
});
}
}
}
-
runThreads
メソッドでは、引数で受け取った数だけスレッドを生成して起動している- スレッド生成前に
register
メソッドを呼んでいる - スレッドの中では少しスリープしてから
arriveAndDeregister
メソッドを呼んでいる
- スレッド生成前に
- メインスレッドでは最初に
register
メソッドを1回だけ呼んでいる - その後、
runThreads
メソッドを呼び出してからarriveAndAwaitAdvance
メソッドを呼び出して待機している- このセットを、スレッド数を 3 -> 2 と変更して実行している
main await
Thread[2] sleep 200ms
Thread[3] sleep 300ms
Thread[1] sleep 100ms
Thread[1] awake
Thread[1] end
Thread[2] awake
Thread[2] end
Thread[3] awake
Thread[3] end
main restart
=====
main await
Thread[1] sleep 100ms
Thread[2] sleep 200ms
Thread[1] awake
Thread[1] end
Thread[2] awake
Thread[2] end
main restart
- メインスレッドは複数起動した別スレッドの処理が全て終了するまで待機していることがわかる
- 別スレッドの方は待機せずに終了している
- 1回目は3つのスレッドの終了を待っていたのに対して、2回目は2つのスレッドの終了を待っており、待機するスレッドの数が動的に変わっていることが分かる
Phaser の基本知識
パーティの登録 (register)
-
Phaser
は、CyclicBarrier
と同じで、まずは足並みを揃えるスレッドの数(パーティ数)を設定する必要がある -
CyclicBarrier
の場合はコンストラクタ引数で設定していたが、Phaser
の場合はregister
メソッドを使うことで動的にパーティ数を指定できる- 初期のパーティ数はコンストラクタでも指定できる
final Phaser phaser = new Phaser();
phaser.register(); // main スレッド分
...
for (int i=0; i<numberOfThreads; i++) {
...
phaser.register();
-
register
メソッドを呼ぶと、パーティ数が1つ加算される- 実装例の冒頭で
register
を呼んでいるのは、メインスレッド分を登録している - そして、スレッドを起動するループの中では、ループの都度
register
を呼ぶことで生成されるスレッド数と同じ分だけパーティを登録している - これにより、「メインスレッド+起動するスレッド数」だけのパーティが登録されたことになる
- 実装例の冒頭で
- bulkRegister メソッドを使えば、一度に複数のパーティを登録することもできる
パーティの到着 (arrive)
- 登録したパーティの処理が期待した場所まで到達した場合、それを
Phaser
に伝える必要がある-
CountDownLatch
のcountDown
,await
、CyclicBarrier
のawait
に該当する - これを 到着(arrive) と呼ぶ
-
- 到着を伝えるメソッドはいくつかあるが、実装例では以下2つのメソッドを利用している
-
arriveAndDeregister
- 到着を
Phaser
に伝え、登録しているパーティ数を1つ減らす - 呼び出し元のスレッドはブロックされない
- 到着を
-
arriveAndAwaitAdvance
- 到着を
Phaser
に伝え、他のパーティが全て到着するまで処理をブロックする
- 到着を
-
arriveAndDeregister
...
phaser.arriveAndAwaitAdvance();
System.out.println("main restart");
...
runThread(() -> {
...
phaser.arriveAndDeregister();
System.out.println(name + " end");
});
- 実装例では、動的に起動したスレッドは
arriveAndDeregister
を使うようにしていたので、スレッドの終了とともにパーティの数が1つ減るようになっている- これにより、1回目に3つのスレッドの終了を待機し終えた段階でパーティ数は12に戻るようになっている
- このおかげで、2回目は改めて
register
を使って新しいパーティ数を設定できるようになっている
フェーズ (phase)
package synchronizer;
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) throws Exception {
final Phaser phaser = new Phaser();
phaser.register(); // main スレッド分
System.out.println("phase = " + phaser.getPhase());
phaser.arriveAndAwaitAdvance();
System.out.println("phase = " + phaser.getPhase());
runThreads(phaser, 1);
phaser.arriveAndAwaitAdvance();
System.out.println("phase = " + phaser.getPhase());
}
private static void runThreads(Phaser phaser, int numberOfThreads) {
...
}
}
phase = 0
phase = 1
Thread[1] sleep 100ms
Thread[1] awake
Thread[1] end
phase = 2
-
Phaser
にはフェーズ(phase)と呼ばれる状態が存在する - フェーズは0から始まり、現在のフェーズで登録されているパーティが全て到着すると1つ進む(advance)
- フェーズが進むと到着しているパーティ数は0にリセットされるので、
CyclicBarrier
と同じように再利用ができる - フェーズ番号は
Integer.MAX_VALUE
に達すると0に戻る
到着を制御するメソッド
Phaser
には、到着を制御するメソッドが複数用意されている。
ざっくり機能の差を整理すると以下のようになっている。
メソッド | 到着通知 | ブロック | パーティ解除 | IntrruptedException | フェーズの指定 |
---|---|---|---|---|---|
arrive |
Yes | No | No | No | No |
arriveAndAwaitAdvance |
Yes | Yes | No | No | No |
arriveAndDeregister |
Yes | No | Yes | No | No |
awaitAdvance |
No | Yes | No | No | Yes |
awaitAdvanceInterruptibly |
No | Yes | No | Yes | Yes |
それぞれの使い方の詳細について、以下に記載する。
arrive
- arrive メソッドは、到着を通知してすぐに呼び出し元に処理を戻す
- パーティの解除も行わないので、到着を通知したいだけの場合に利用する
-
CountDownLatch
のcountDown
メソッドみたいな役割になる - 戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される
arriveAndAwaitAdvance
- arriveAndAwaitAdvance メソッドは、到着を通知して他のパーティが全て到着するまで処理をブロックする
- ブロック中にスレッドが割り込まれても、ブロックは継続される
- パーティの解除は行わない
- 戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される
arriveAndDeregister
- arriveAndDeregister メソッドは、到着を通知してパーティを解除したのち、すぐに呼び出し元に処理を戻す
- 戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される
awaitAdvance
- awaitAdvance メソッドは、到着は通知せず他のパーティが全て到着するまで処理をブロックする
- ブロック中にスレッドが割り込まれても、ブロックは継続される
- 引数に現在のフェーズ番号を渡す必要がある
- フェーズ番号が現在のフェーズ番号と一致しない場合は、すぐに処理を戻す
-
phaser.awaitAdvance(phaser.arriveAndDeregister())
のように、到着を通知するメソッドと組み合わせて利用することが想定されている- この場合、パーティを解除したうえでフェーズの終了を待機できるようになる
-
phaser.awaitAdvance(phaser.arrive())
はarriveAndAwaitAdvance()
と等価
- 戻り値として、次のフェーズ番号が返される
awaitAdvanceInterruptibly
- awaitAdvanceInterruptibly メソッドは、到着は通知せず他のパーティが到着するまで処理をブロックする
- ブロック中にスレッドが割り込まれると、
InterruptedException
がスローされる - 引数や戻り値は
awaitAdvance
と同じ - タイムアウト時間を指定できるメソッドも存在する
終了状態
package synchronizer;
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) throws Exception {
final Phaser phaser = new Phaser();
System.out.println("terminated = " + phaser.isTerminated());
phaser.register(); // main スレッド分
System.out.println("terminated = " + phaser.isTerminated());
phaser.arriveAndAwaitAdvance();
System.out.println("terminated = " + phaser.isTerminated());
phaser.arriveAndDeregister();
System.out.println("terminated = " + phaser.isTerminated());
phaser.register();
System.out.println("terminated = " + phaser.isTerminated());
}
}
terminated = false
terminated = false
terminated = false
terminated = true
terminated = true
- フェーズが次に進んだときにパーティが0になっている場合、
Phaser
は終了状態(termination state) になる - 終了状態になると
Phaser
は使えなくなる- すべての同期メソッドは、待機せずにすぐに戻るようになる
-
register
メソッドを使ってもパーティは登録できなくなる
終了状態になる条件を変更する
package synchronizer;
import java.util.concurrent.Phaser;
public class PhaserTest {
public static void main(String[] args) throws Exception {
final Phaser phaser = new Phaser() {
@Override
protected boolean onAdvance(int phase, int registeredParties) {
System.out.println("onAdvance phase=" + phase + ", registeredParties=" + registeredParties);
return false;
}
};
System.out.println("terminated = " + phaser.isTerminated());
phaser.register(); // main スレッド分
System.out.println("terminated = " + phaser.isTerminated());
phaser.arriveAndAwaitAdvance();
System.out.println("terminated = " + phaser.isTerminated());
phaser.arriveAndDeregister();
System.out.println("terminated = " + phaser.isTerminated());
phaser.register();
System.out.println("terminated = " + phaser.isTerminated());
}
}
-
Phaser
のサブクラスでonAdvance
をオーバーライドし、常にfalse
を返すように実装している
terminated = false
terminated = false
onAdvance phase=0, registeredParties=1
terminated = false
onAdvance phase=1, registeredParties=0
terminated = false
terminated = false
- onAdvance メソッドをオーバーライドして戻り値を調整することで、終了状態になる条件を任意のものに変更できる
-
onAdvance
メソッドは、フェーズが次に進むときにコールバックされる- フェーズが変わるときに任意の処理を実行したい場合も利用できる
- このメソッドが
true
を返すと、Phaser
は終了状態になる- デフォルトの実装では、引数で受け取る
registeredParties
(登録されたパーティの数)が0であればtrue
を返すようになっている
- デフォルトの実装では、引数で受け取る
-
false
を返すように実装すれば終了状態にはならない
Exchanger
Exchanger は、2つのスレッド間で処理を同期しつつ値の交換をする場合に利用できる。
package synchronizer;
import java.util.concurrent.Exchanger;
public class ExchangerTest {
public static void main(String[] args) throws Exception {
final Exchanger<String> exchanger = new Exchanger<>();
runThread(() -> {
System.out.println("Thread sleep");
Thread.sleep(500);
String receive = exchanger.exchange("from Thread");
System.out.println("Thread receive=" + receive);
});
System.out.println("Main exchange");
final String receive = exchanger.exchange("from Main");
System.out.println("Main receive=" + receive);
}
}
- メインスレッドと別スレッドの双方で
exchange
メソッドを呼び、戻り値を出力している - 別スレッドの方は、冒頭で少しだけスリープしている
Main exchange
Thread sleep
Thread receive=from Main
Main receive=from Thread
- 別スレッドが
exchange
メソッドを呼ぶまでメインスレッドが待機しているのが分かる - 片方が
exchange
の引数に渡した値が、他方のexchange
の戻り値として返されていることが分かる -
Exchanger
を使うと、exchange
で処理を同期しつつ、任意の値を双方で渡し合うような制御ができる
各シンクロナイザーの使い分け
あくまで個人的な考えをもとに描いたものなので、間違っている可能性あり。
【スレッド数が決まっている?】---{No}--->【スレッド数が決まるまで待てる?】
| |
{Yes} |
| |
+ <-------------------{Yes}-------------------+---{No}---> [Phaser]
|
V
【スレッド数が2つで、値のやり取りが必要?】---{Yes}---> [Exchanger]
|
{No}
|
V
【限られたリソースの取り合いを制御したい?】---{Yes}---> [Semaphore]
|
{No}
|
V
【関連する全てのスレッドが所定の位置に到達するまで全てのスレッドを待機させたい?】
| |
{No} +---{Yes}---> [CyclicBarrier]
|
V
[CountDownLatch]
以下、根拠となる個人的な考え方。
- スレッド数が決まっていれば
Phaser
でできることはCyclicBarrier
やCountDownLatch
でもできると思うので、Phaser
を使うかどうかはスレッド数が決まるまで処理の開始を待てないような場面なのかなと思った -
Exchanger
とSemaphore
は制御が特徴的なので、用途は限定的になるのかなと思った -
CyclicBarrier
とCountDownLatch
の大きな違いは以下かなと思う-
CyclicBarrier
は、パーティに属する全てのスレッドがawait
を呼ぶまで、パーティに属する全てのスレッドが待機する -
CountDownLatch
は、countDown
が所定の回数呼ばれるまでawait
しているスレッドだけが待機する (countDown
を呼んだスレッドは待たなくていい)
-
-
CyclicBarrier
は、パーティに属する全てのスレッドが所定の場所に揃うのを待つことが目的 -
CountDownLatch
は、待機終了のイベントが発火されるまで3スレッドを待たせることが目的
参考
- Java並行処理プログラミング ―その「基盤」と「最新API」を究める― | Brian Goetz, Joshua Bloch, Doug Lea |本 | 通販 | Amazon
- CountDownLatch (Java SE 17 & JDK 17)
- CyclicBarrier (Java SE 17 & JDK 17)
- Semaphore (Java SE 17 & JDK 17)
- Phaser (Java SE 17 & JDK 17)
- Exchanger (Java SE 17 & JDK 17)