LoginSignup
5
9

Javaのシンクロナイザ使い方メモ

Posted at

シンクロナイザとは

自身の状態を使ってスレッドの進行を制御(調停)するオブジェクトのことをシンクロナイザ (synchronizer) と呼ぶ。

例えば、次のような実装があったとする。

シンクロナイザ無し
package synchronizer;

import java.util.concurrent.TimeUnit;

public class SynchronizerTest {

    public static void main(String[] args) {
        runThread(() -> {
            System.out.println("Thread 1 start");
            System.out.println("Thread 1 end");
        });

        runThread(() -> {
            System.out.println("Thread 2 start");

            System.out.println("Thread 2 sleeping...");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("Thread 2 awake");

            System.out.println("Thread 2 end");
        });
    }

    public static Thread runThread(ThrowingRunnable process) {
        final Thread thread = new Thread(() -> {
            try {
                process.run();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        });
        thread.start();
        return thread;
    }

    interface ThrowingRunnable {
        void run() throws Exception;
    }
}

※以後、 runThreadThrowingRunnable の記載は省略する

  • 2つのスレッドを起動している
  • スレッド1は標準出力に開始と終了のメッセージを書き出しているだけ
  • スレッド2は途中で3秒間のスリープを入れている
実行結果
Thread 1 start
Thread 1 end
Thread 2 start
Thread 2 sleeping...
Thread 2 awake
Thread 2 end

スレッド2がスリープしている間もスレッド1は普通に処理が流れるので、スレッド1が先に終わる形でメッセージが出力される
ここに、シンクロナイザの1つである CyclicBarrier を以下のように導入してみる。

シンクロナイザ有り
package synchronizer;

import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;

public class SynchronizerTest {

    public static void main(String[] args) {
        final CyclicBarrier barrier = new CyclicBarrier(2);

        runThread(() -> {
            System.out.println("Thread 1 start");
            barrier.await(); // ここで待機
            System.out.println("Thread 1 end");
        });

        runThread(() -> {
            System.out.println("Thread 2 start");

            System.out.println("Thread 2 sleeping...");
            TimeUnit.SECONDS.sleep(3);
            System.out.println("Thread 2 awake");

            barrier.await(); // ここで待機
            System.out.println("Thread 2 end");
        });
    }

    ...
}
  • CyclicBarrier を2で初期化してインスタンスを生成し、各スレッドの途中で await メソッドを呼び出すようにしている
実行結果
Thread 1 start
Thread 2 start
Thread 2 sleeping...
Thread 2 awake
Thread 2 end
Thread 1 end
  • 今度はスレッド1がスレッド2より後に終了している
  • CyclicBarrierawait メソッドは、それを呼び出したスレッドをブロックする(処理が停止する)
  • そして、コンストラクタで指定された数と同じ回数 await メソッドが呼ばれると、ブロックは解放されて処理が再開される
    • つまり、上記実装の場合は await が2回呼ばれるとブロックが解放される
  • したがって、スレッド1はスレッド2のスリープが終わって2回目の await が呼ばれるまで処理がブロックされていたことになる
  • この結果、スレッド1の終了メッセージはスレッド2のスリープ終了より後に書き出されることになった
    • スレッド2の終了メッセージより後になるか前になるかはタイミング次第

CyclicBarrier のように、自身の状態(CyclicBarrier の場合は await が呼ばれた回数)を用いて複数のスレッドの実行を制御する機能を提供するオブジェクトをシンクロナイザと呼ぶ。

シンクロナイザを利用すると、複数のスレッドの実行タイミングを細かく制御できるようになる。

Java の標準APIでは、以下の5つのシンクロナイザが提供されている。

ここでは、これらシンクロナイザの使い方をメモする。

Semaphore

Semaphore (セマフォ)は、数に制限のあるリソースのスレッドへの払い出しを制御するような場面で利用できる。

package synchronizer;

import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);

        for (int i=0; i<10; i++) {
            String name = "Thread[" + i + "]";
            runThread(() -> {
                System.out.println(name + " start");

                semaphore.acquire();
                System.out.println(name + " acquire");

                TimeUnit.SECONDS.sleep(1);

                System.out.println(name + " release");
                semaphore.release();
            });
        }
    }
}
  • Semaphore を2で初期化している
  • acquire メソッドを呼び出した後に1秒スリープしてから release メソッドを呼ぶスレッドを同時に10個起動している
  • この結果は以下のようになる
実行結果
Thread[4] start
Thread[0] start
Thread[0] acquire
Thread[2] start
Thread[4] acquire
Thread[7] start
Thread[3] start
Thread[9] start
Thread[6] start
Thread[8] start
Thread[1] start
Thread[5] start
Thread[4] release
Thread[0] release
Thread[1] acquire
Thread[6] acquire
Thread[6] release
Thread[1] release
Thread[2] acquire
Thread[8] acquire
Thread[2] release
Thread[8] release
Thread[3] acquire
Thread[5] acquire
Thread[3] release
Thread[5] release
Thread[7] acquire
Thread[9] acquire
Thread[7] release
Thread[9] release

分かりやすいように図に整理すると以下のようになる。

image.png

よく見ると、黄色の部分(Semaphoreacquire を呼び出してから release を呼び出すまでの区間)は、常に最大でも2つのスレッド分しか存在していないことが分かる

  • Semaphore は、同時に利用可能なリソースの数を制御するときに利用できる
  • コンストラクタの引数で同時利用可能な許可の数(パーミット)を指定する
    • 上記例では、パーミットには2を指定している
  • パーミットの数が余っている状態で acquire メソッドを呼ぶと、パーミットが払い出される
  • パーミットの数に空きがない状態で acquire メソッドを呼ぶと、パーミットが解放されるまで呼び出し元のスレッドはブロックされる
  • release メソッドを呼ぶと、パーミットが解放される
  • つまり、データベースのコネクションプールのようなことを再現できる

InterruptedException をスローしない acquire

package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();

        final Thread thread = runThread(() -> {
            Thread th = Thread.currentThread();
            System.out.println("[Thread] before acquireUninterruptibly (interrupted=" + th.isInterrupted() + ")");
            semaphore.acquireUninterruptibly();
            System.out.println("[Thread] after acquireUninterruptibly (interrupted=" + th.isInterrupted() + ")");
        });

        Thread.sleep(1000);
        System.out.println("[main] interrupt");
        thread.interrupt();
        Thread.sleep(1000);

        System.out.println("[main] release");
        semaphore.release();
    }
}
  • パーミット数1で Semaphore を生成している
    • メインスレッドの方ですぐに acquire を呼んでパーミットの空きをなくしている
  • 別スレッドを起動して、 acquireUninterruptibly でパーミットが空くのを待機する
  • メインスレッドの方で少し待ってから別スレッドに割り込みを行い、 Semaphore を解放する
実行結果
[Thread] before acquireUninterruptibly (interrupted=false)
[main] interrupt
[main] release
[Thread] after acquireUninterruptibly (interrupted=true)
  • acquire メソッドは throws InterruptedException で宣言されているため、呼び出し元で InterruptedException をハンドリングしなければならない
    • 実装例では throws Exception を宣言した ThrowingThrowable を使っているので省略できているが、実際は InterruptedExceptioncatch が必要になる
  • 一方で、 acquireUninterruptibly には throws 句が宣言されておらず、呼び出し元で InterruptedException のハンドリングを書かなくてもいい
  • パーミットに空きがない場合の挙動は acquire と同じで、呼び出し元のスレッドはブロックされる
  • しかし、スレッドが割り込まれてもブロックは継続される
  • パーミットに空きができるとブロックは解放され処理は再開される
  • このとき、 Thread には割り込みされたことが記録されるので、必要であれば isInterrupted などで状態を参照することでハンドリングができる

パーミットに空きがない場合も待機しないようにする

package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();

        runThread(() -> {
            System.out.println("[thread] tryAcquire");
            System.out.println("[thread] value = " + semaphore.tryAcquire());
        });

        System.out.println("[main] sleep");
        Thread.sleep(1000);
        System.out.println("[main] release");
        semaphore.release();
    }
}
  • パーミット数1で Semaphore を生成している
  • メインスレッドで即座に acquire を呼び、パーミットの空きをなくしている
  • 別スレッドを起動して、 tryAcquire でパーミットの取得を試みている
  • メインスレッドでは1秒スリープしてから、最初に取得したパーミットを解放している
実行結果
[main] sleep
[thread] tryAcquire
value = false
[main] release
  • パーミットに空きがない状態で acquire を呼んだ場合、呼び出し元のスレッドはブロックされる
  • しかし tryAcquire の場合、パーミットに空きがない場合はすぐにメソッドが戻される
    • このとき、戻り値として false が返される
    • パーミットに空きがあり取得できた場合は true が返される
  • tryAcquire(long, TimeUnit) を使えば、待機時間に上限を設けることもできる

複数のパーミットを同時に取得する

package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(4);

        for (int i=0; i<4; i++) {
            int n = i+1;
            runThread(() -> {
                System.out.println("[" + n + "] start");
                semaphore.acquire(n);
                System.out.println("[" + n + "] acquire(" + n + ")");
                System.out.println("[" + n + "] sleep");
                Thread.sleep(100);
                System.out.println("[" + n + "] awake");
                System.out.println("[" + n + "] release(" + n + ")");
                semaphore.release(n);
            });
        }
    }
}
  • パーミット数4で Semaphore を生成している
  • 4つのスレッドを起動し、それぞれパーミット数1,2,3,4で acquirerelease を行うようにしている
実行結果
[3] start
[4] start
[1] start
[2] start
[3] acquire(3)
[1] acquire(1)
[3] sleep
[1] sleep
[1] awake
[3] awake
[1] release(1)
[3] release(3)
[4] acquire(4)
[4] sleep
[4] awake
[4] release(4)
[2] acquire(2)
[2] sleep
[2] awake
[2] release(2)

図で表現すると以下のようになる。

image.png

  • acquire(int) および release(int) を使用すると、一度に複数のパーミットの取得や解放ができる

コンストラクタで指定したパーミット数より多い数で acquire するとどうなるか?

実際に試すと、以下のようになる。

パーミット数以上の数でacquireした場合
package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(2);

        System.out.println("before acquire(3)");
        semaphore.acquire(3);
        System.out.println("after acquire(3)");
    }
}
実行結果
before acquire(3)

特にエラーになることはなく、 acquire はパーミットの空きが3になるのを待ち続ける。
ということは、ブロックは永久に解除されない状態になってしまうのかというとそうではなく、 release を3回やれば動き出す。

パーミット数以上にreleaseする
package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(2);

        runThread(() -> {
            semaphore.release(3);
        });

        System.out.println("before acquire(3)");
        semaphore.acquire(3);
        System.out.println("after acquire(3)");
    }
}
実行結果
before acquire(3)
after acquire(3)

コンストラクタでパーミット数を指定しているが、別にその値を上限として acquirerelease の数が厳密にチェックされているわけではなさそう。
パーミット数に関係なく好きなだけ acquirerelease ができるようになっている。

Semaphore の Javadoc には以下のように記載されている。

~セマフォの適切な使用法は、アプリケーションでのプログラミング規約で確立されます。
release(int) | Semaphore (Javadoc)

これはつまり、パーミット数に収まる形で acquire, release の呼び出し回数を制御するのはアプリケーション側の役目だよ、ってことを言っているのかなと自分は解釈している。1

acquire と release は異なるスレッドでも構わない

releaseacquire したスレッドで呼ばなければならないように思いそうになるが、そうではない。

package synchronizer;

import java.util.concurrent.Semaphore;

public class SemaphoreTest {
    public static void main(String[] args) throws Exception {
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();

        runThread(() -> {
            semaphore.release();
            semaphore.acquire();
            System.out.println("acquire!");
        });
    }
}
  • パーミット数1で Semaphore を生成している
  • メインスレッドで acquire を呼んでいる
  • 別スレッドを起動し、最初に release を呼んでから acquire をしている
実行結果
acquire!
  • release メソッドはメインスレッドとは別のスレッドで呼ばれているが、問題なくパーミットが解放されて続く acquire でパーミットを獲得できていることが分かる
  • Semaphore はパーミットとスレッドとの紐づきは見ておらず、単にパーミットの数だけで制御をしている

CountDownLatch

CountDownLatch は、特定の条件が満たされるまでスレッドの進行を防いでおきたいような場面で利用できる。
Latch (ラッチ)は、日本語で「外れ止め」や「かんぬき」という意味。

package synchronizer;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {
    public static void main(String[] args) throws Exception {
        final CountDownLatch latch = new CountDownLatch(3);

        for (int i=0; i<3; i++) {
            String name = "Thread[" + i + "]";
            runThread(() -> {
                System.out.println(name + " start");
                Thread.sleep(500);
                System.out.println(name + " end");
                latch.countDown();
            });
        }

        char[] labels = {'a', 'b', 'c'};
        for (int i=0; i<3; i++) {
            String name = "Thread[" + labels[i] + "]";
            runThread(() -> {
                System.out.println(name + " await...");
                latch.await();
                System.out.println(name + " end");
            });
        }
    }
}
  • コンストラクタで3を指定して CountDownLatch を生成している
  • スレッド3つ(1, 2, 3)を起動し、500ミリ秒スリープしてから CountDownLatchcountDown メソッドを呼ぶ
  • 別のスレッド3つ(a, b, c)を起動し、こちらは await で処理を待機させている
実行結果
Thread[2] start
Thread[1] start
Thread[0] start
Thread[a] await...
Thread[b] await...
Thread[c] await...
Thread[2] end
Thread[1] end
Thread[0] end
Thread[a] end
Thread[b] end
Thread[c] end
  • 後に起動した3つのスレッド(a, b, c)は、先に起動した3つのスレッド(1, 2, 3)の処理が終了するまで待機していることが分かる
  • CountDownLatch は、まずコンストラクタでカウントの数を指定する
    • 内部のカウンターがこの値で初期化される
  • countDown メソッドを呼ぶとカウントが1つ減る
    • countDown は待機させられることはなく、すぐ呼び出し元に処理が戻される
    • カウントの初期値以上の回数呼び出しても、例外になることはない
  • await メソッドを呼ぶと、カウントが0になるまでスレッドがブロックされる
  • これにより、いくつかのスレッドの処理が全て終わるまで処理を待機させる、みたいな制御が実現できる
  • CountDownLatch は一度カウントが減ると元に戻すことはできないので、再利用ができない
    • インスタンスを作り直す必要がある

CyclicBarrier

CyclicBarrier は、複数のスレッドの足並みを揃えさせるような場面で利用できる。

package synchronizer;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    public static void main(String[] args) {
        final CyclicBarrier barrier = new CyclicBarrier(3);

        for (int i=0; i<3; i++) {
            String name = "Thread[" + i + "]";
            int time = 100 * (i+1);

            runThread(() -> {
                System.out.println(name + " start");

                System.out.println(name + " sleep " + time + "ms");
                Thread.sleep(time);
                System.out.println(name + " awake");
                barrier.await();

                System.out.println(name + " end");
            });
        }
    }
}
  • コンストラクタで3を指定して CyclicBarrier を生成している
  • 3つのスレッドを起動し、それぞれ少しスリープしてから CyclicBarrierawait を呼んで待機している
実行結果
Thread[2] start
Thread[0] start
Thread[1] start
Thread[2] sleep 300ms
Thread[1] sleep 200ms
Thread[0] sleep 100ms
Thread[0] awake
Thread[1] awake
Thread[2] awake
Thread[0] end
Thread[1] end
Thread[2] end
  • Thread[0]Thread[1]Thread[2] より短い時間だけスリープしているので、普通なら先にスレッドが終了するはずが、実際には Thread[2] の終了を待機しているのが分かる
  • CyclicBarrier を使うと、複数のスレッドの処理があるところまで揃うのを待機させるような制御ができるようになる
  • コンストラクタで足並みを揃えたいスレッド(パーティ)の数を指定する
  • await メソッドを呼ぶと、 await を呼んだスレッドの数がパーティ数に達するまで呼び出し元のスレッドがブロックされる
    • 上記例ではパーティ数3で初期化しているので、3つのスレッドから await が呼ばれるまで処理がブロックされる
  • await の呼び出し回数がパーティ数に達すると、ブロックが解放されてそれまで待機していたスレッドの処理が一斉に再開される

再利用する

package synchronizer;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    public static void main(String[] args) throws Exception {
        final CyclicBarrier barrier = new CyclicBarrier(2);

        program(barrier);
        System.out.println("=====");
        program(barrier);
    }

    private static void program(CyclicBarrier barrier) throws Exception {
        runThread(() -> {
            System.out.println("Thread sleep");
            Thread.sleep(500);
            System.out.println("Thread await");
            barrier.await();
        });

        System.out.println("Main await");
        barrier.await();

        System.out.println("Main restart");
    }
}
  • パーティ数2で CyclicBarrier を生成している
  • 別スレッドを起動して少しスリープしてから await を呼び出している
  • メインスレッドではすぐに await を呼んで、別スレッドの終了を待機している
  • この処理を2回繰り返している
  • CyclicBarrier は、2回とも同じインスタンスを利用している
実行結果
Main await
Thread sleep
Thread await
Main restart
=====
Main await
Thread sleep
Thread await
Main restart
  • await の呼び出し回数がパーティ数に達すると、 CyclicBarrier の状態は初期状態に戻る
    • このため、 CyclicBarrier はインスタンスの再利用ができる
    • 名前に cyclic (循環式) がついている理由

await がパーティ数に達したときに処理を挟む

package synchronizer;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    public static void main(String[] args) throws Exception {
        final CyclicBarrier barrier = new CyclicBarrier(4, () -> {
            String name = Thread.currentThread().getName();
            System.out.println("[" + name + "] Tripped!");
        });

        for (int i=0; i<4; i++) {
            int time = 100 * (i+1);
            runThread(() -> {
                String name = Thread.currentThread().getName();
                System.out.println("[" + name + "] sleep " + time + "ms");
                Thread.sleep(time);
                System.out.println("[" + name + "] await");
                barrier.await();
            });
        }
    }
}
  • パーティ数4で CyclicBarrier を生成している
  • コンストラクタの第二引数に Runnable を渡している
  • スレッドを4つ生成し、それぞれ少しスリープしてから await で待機している
実行結果
[Thread-3] sleep 400ms
[Thread-1] sleep 200ms
[Thread-0] sleep 100ms
[Thread-2] sleep 300ms
[Thread-0] await
[Thread-1] await
[Thread-2] await
[Thread-3] await
[Thread-3] Tripped!
  • コンストラクタの第二引数には Runnable を渡すことができる
  • この Runnable は、 await の呼び出し回数がパーティ数に達して処理が再開される前にコールバックされる
    • 処理が再開されることをトリップ(trip) と呼ぶ
    • また、このトリップ時に呼ばれる処理のことを バリアー・アクション と呼ぶ
  • トリップの処理は、トリップのトリガーとなった await の呼び出しを行ったスレッドで行われる

故障状態

package synchronizer;

import java.util.concurrent.CyclicBarrier;

public class CyclicBarrierTest {

    public static void main(String[] args) throws Exception {
        final CyclicBarrier barrier = new CyclicBarrier(2);

        Thread th = runThread(() -> {
            try {
                System.out.println("Thread await");
                barrier.await();
            } catch (InterruptedException e) {
                System.out.println("interrupted");
                System.out.println("broken = " + barrier.isBroken());
            }
        });

        Thread.sleep(500);
        System.out.println("interrupt thread");
        th.interrupt();

        Thread.sleep(500);
        try {
            barrier.await();
        } catch (BrokenBarrierException e) {
            System.out.println("BrokenBarrierException: " + e.getMessage());
        }
    }
}
  • パーティ数2で CyclicBarrier を生成している
  • 別スレッドを起動して await で待機する
    • InterruptedException をキャッチして、 isBrokenCyclicBarrier の状態を確認している
  • メインスレッドで少し待ってから、別起動したスレッドに割り込みを行う
実行結果
Thread await
interrupt thread
interrupted
broken = true
BrokenBarrierException: null
  • 特定の条件が満たされると、 CyclicBarrier故障状態になる
  • 故障状態になる条件としては、以下がある
    • await 中にスレッドが割り込まれた場合(上記実装例のケース)
    • await(long, TimeUnit) での待機でタイムアウトになった場合
    • バリアー・アクションで例外がスローされた場合
  • CyclicBarrier が故障状態かどうかは、 isBroken メソッドで確認できる
  • 故障状態の CyclicBarrier に対して await を呼ぶと、 BrokenBarrierException がスローされる

故障状態をリセットする

package synchronizer;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class CyclicBarrierTest {

    public static void main(String[] args) throws Exception {
        final CyclicBarrier barrier = new CyclicBarrier(2);

        try {
            barrier.await(1, TimeUnit.MICROSECONDS);
        } catch (TimeoutException e) {
            System.out.println("timeout");
        }

        System.out.println("broken = " + barrier.isBroken());
        barrier.reset();
        System.out.println("broken = " + barrier.isBroken());
    }
}
  • タイムアウトありで await を呼び出す
    • タイムアウト時間を1ミリ秒にして即座にタイムアウトさせることでバリアを故障状態にする
  • reset メソッドを呼び出して前後で故障状態を確認している
実行結果
timeout
broken = true
broken = false
  • reset メソッドを呼ぶと、故障状態がクリアされて CyclicBarrier が初期状態に戻る
  • 初期状態に戻った CyclicBarrier は再利用できる

Phaser

Phaser (フェーザー)を使うと、 CountDownLatchCyclicBarrier と同様の制御をより柔軟に行うことができる。
特に、調停するスレッド数が動的に変わるようなときに利用できる。

package synchronizer;

import java.util.concurrent.Phaser;

public class PhaserTest {
    public static void main(String[] args) throws Exception {
        final Phaser phaser = new Phaser();

        phaser.register(); // main スレッド分

        runThreads(phaser, 3);
        System.out.println("main await");
        phaser.arriveAndAwaitAdvance();
        System.out.println("main restart");

        Thread.sleep(1000);
        System.out.println("=====");

        runThreads(phaser, 2);
        System.out.println("main await");
        phaser.arriveAndAwaitAdvance();
        System.out.println("main restart");
    }

    private static void runThreads(Phaser phaser, int numberOfThreads) {
        for (int i=0; i<numberOfThreads; i++) {
            int n = i+1;
            String name = "Thread[" + n + "]";
            int time = 100 * n;

            phaser.register();

            runThread(() -> {
                System.out.println(name + " sleep " + time + "ms");
                Thread.sleep(time);
                System.out.println(name + " awake");
                phaser.arriveAndDeregister();
                System.out.println(name + " end");
            });
        }
    }
}
  • runThreads メソッドでは、引数で受け取った数だけスレッドを生成して起動している
    • スレッド生成前に register メソッドを呼んでいる
    • スレッドの中では少しスリープしてから arriveAndDeregister メソッドを呼んでいる
  • メインスレッドでは最初に register メソッドを1回だけ呼んでいる
  • その後、 runThreads メソッドを呼び出してから arriveAndAwaitAdvance メソッドを呼び出して待機している
    • このセットを、スレッド数を 3 -> 2 と変更して実行している
実行結果
main await
Thread[2] sleep 200ms
Thread[3] sleep 300ms
Thread[1] sleep 100ms
Thread[1] awake
Thread[1] end
Thread[2] awake
Thread[2] end
Thread[3] awake
Thread[3] end
main restart
=====
main await
Thread[1] sleep 100ms
Thread[2] sleep 200ms
Thread[1] awake
Thread[1] end
Thread[2] awake
Thread[2] end
main restart
  • メインスレッドは複数起動した別スレッドの処理が全て終了するまで待機していることがわかる
    • 別スレッドの方は待機せずに終了している
  • 1回目は3つのスレッドの終了を待っていたのに対して、2回目は2つのスレッドの終了を待っており、待機するスレッドの数が動的に変わっていることが分かる

Phaser の基本知識

パーティの登録 (register)

  • Phaser は、 CyclicBarrier と同じで、まずは足並みを揃えるスレッドの数(パーティ数)を設定する必要がある
  • CyclicBarrier の場合はコンストラクタ引数で設定していたが、 Phaser の場合は register メソッドを使うことで動的にパーティ数を指定できる
registerでパーティ数を動的に加算する
        final Phaser phaser = new Phaser();

        phaser.register(); // main スレッド分

        ...

        for (int i=0; i<numberOfThreads; i++) {
            ...
            phaser.register();
  • register メソッドを呼ぶと、パーティ数が1つ加算される
    • 実装例の冒頭で register を呼んでいるのは、メインスレッド分を登録している
    • そして、スレッドを起動するループの中では、ループの都度 register を呼ぶことで生成されるスレッド数と同じ分だけパーティを登録している
    • これにより、「メインスレッド+起動するスレッド数」だけのパーティが登録されたことになる
  • bulkRegister メソッドを使えば、一度に複数のパーティを登録することもできる

パーティの到着 (arrive)

  • 登録したパーティの処理が期待した場所まで到達した場合、それを Phaser に伝える必要がある
    • CountDownLatchcountDown, awaitCyclicBarrierawait に該当する
    • これを 到着(arrive) と呼ぶ
  • 到着を伝えるメソッドはいくつかあるが、実装例では以下2つのメソッドを利用している
    • arriveAndDeregister
      • 到着を Phaser に伝え、登録しているパーティ数を1つ減らす
      • 呼び出し元のスレッドはブロックされない
    • arriveAndAwaitAdvance
      • 到着を Phaser に伝え、他のパーティが全て到着するまで処理をブロックする
到着を通知する
        ...
        phaser.arriveAndAwaitAdvance();
        System.out.println("main restart");

        ...

            runThread(() -> {
                ...
                phaser.arriveAndDeregister();
                System.out.println(name + " end");
            });
  • 実装例では、動的に起動したスレッドは arriveAndDeregister を使うようにしていたので、スレッドの終了とともにパーティの数が1つ減るようになっている
    • これにより、1回目に3つのスレッドの終了を待機し終えた段階でパーティ数は12に戻るようになっている
    • このおかげで、2回目は改めて register を使って新しいパーティ数を設定できるようになっている

フェーズ (phase)

package synchronizer;

import java.util.concurrent.Phaser;

public class PhaserTest {
    public static void main(String[] args) throws Exception {
        final Phaser phaser = new Phaser();
        phaser.register(); // main スレッド分

        System.out.println("phase = " + phaser.getPhase());

        phaser.arriveAndAwaitAdvance();
        System.out.println("phase = " + phaser.getPhase());

        runThreads(phaser, 1);
        phaser.arriveAndAwaitAdvance();
        System.out.println("phase = " + phaser.getPhase());
    }

    private static void runThreads(Phaser phaser, int numberOfThreads) {
        ...
    }
}
実行結果
phase = 0
phase = 1
Thread[1] sleep 100ms
Thread[1] awake
Thread[1] end
phase = 2
  • Phaser にはフェーズ(phase)と呼ばれる状態が存在する
  • フェーズは0から始まり、現在のフェーズで登録されているパーティが全て到着すると1つ進む(advance)
  • フェーズが進むと到着しているパーティ数は0にリセットされるので、 CyclicBarrier と同じように再利用ができる
  • フェーズ番号は Integer.MAX_VALUE に達すると0に戻る

到着を制御するメソッド

Phaser には、到着を制御するメソッドが複数用意されている。

ざっくり機能の差を整理すると以下のようになっている。

メソッド 到着通知 ブロック パーティ解除 IntrruptedException フェーズの指定
arrive Yes No No No No
arriveAndAwaitAdvance Yes Yes No No No
arriveAndDeregister Yes No Yes No No
awaitAdvance No Yes No No Yes
awaitAdvanceInterruptibly No Yes No Yes Yes

それぞれの使い方の詳細について、以下に記載する。

arrive

  • arrive メソッドは、到着を通知してすぐに呼び出し元に処理を戻す
  • パーティの解除も行わないので、到着を通知したいだけの場合に利用する
  • CountDownLatchcountDown メソッドみたいな役割になる
  • 戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される

arriveAndAwaitAdvance

  • arriveAndAwaitAdvance メソッドは、到着を通知して他のパーティが全て到着するまで処理をブロックする
  • ブロック中にスレッドが割り込まれても、ブロックは継続される
  • パーティの解除は行わない
  • 戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される

arriveAndDeregister

  • arriveAndDeregister メソッドは、到着を通知してパーティを解除したのち、すぐに呼び出し元に処理を戻す
  • 戻り値として、現在のフェーズ番号(到着したフェーズ番号)が返される

awaitAdvance

  • awaitAdvance メソッドは、到着は通知せず他のパーティが全て到着するまで処理をブロックする
  • ブロック中にスレッドが割り込まれても、ブロックは継続される
  • 引数に現在のフェーズ番号を渡す必要がある
    • フェーズ番号が現在のフェーズ番号と一致しない場合は、すぐに処理を戻す
    • phaser.awaitAdvance(phaser.arriveAndDeregister()) のように、到着を通知するメソッドと組み合わせて利用することが想定されている
      • この場合、パーティを解除したうえでフェーズの終了を待機できるようになる
      • phaser.awaitAdvance(phaser.arrive())arriveAndAwaitAdvance() と等価
  • 戻り値として、次のフェーズ番号が返される

awaitAdvanceInterruptibly

終了状態

package synchronizer;

import java.util.concurrent.Phaser;

public class PhaserTest {
    public static void main(String[] args) throws Exception {
        final Phaser phaser = new Phaser();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.register(); // main スレッド分

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.arriveAndAwaitAdvance();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.arriveAndDeregister();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.register();

        System.out.println("terminated = " + phaser.isTerminated());
    }
}
実行結果
terminated = false
terminated = false
terminated = false
terminated = true
terminated = true
  • フェーズが次に進んだときにパーティが0になっている場合、 Phaser終了状態(termination state) になる
  • 終了状態になると Phaser は使えなくなる
    • すべての同期メソッドは、待機せずにすぐに戻るようになる
    • register メソッドを使ってもパーティは登録できなくなる

終了状態になる条件を変更する

package synchronizer;

import java.util.concurrent.Phaser;

public class PhaserTest {
    public static void main(String[] args) throws Exception {
        final Phaser phaser = new Phaser() {
            @Override
            protected boolean onAdvance(int phase, int registeredParties) {
                System.out.println("onAdvance phase=" + phase + ", registeredParties=" + registeredParties);
                return false;
            }
        };

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.register(); // main スレッド分

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.arriveAndAwaitAdvance();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.arriveAndDeregister();

        System.out.println("terminated = " + phaser.isTerminated());

        phaser.register();

        System.out.println("terminated = " + phaser.isTerminated());
    }
}
  • Phaser のサブクラスで onAdvance をオーバーライドし、常に false を返すように実装している
実行結果
terminated = false
terminated = false
onAdvance phase=0, registeredParties=1
terminated = false
onAdvance phase=1, registeredParties=0
terminated = false
terminated = false
  • onAdvance メソッドをオーバーライドして戻り値を調整することで、終了状態になる条件を任意のものに変更できる
  • onAdvance メソッドは、フェーズが次に進むときにコールバックされる
    • フェーズが変わるときに任意の処理を実行したい場合も利用できる
  • このメソッドが true を返すと、 Phaser は終了状態になる
    • デフォルトの実装では、引数で受け取る registeredParties (登録されたパーティの数)が0であれば true を返すようになっている
  • false を返すように実装すれば終了状態にはならない

Exchanger

Exchanger は、2つのスレッド間で処理を同期しつつ値の交換をする場合に利用できる。

package synchronizer;

import java.util.concurrent.Exchanger;

public class ExchangerTest {

    public static void main(String[] args) throws Exception {
        final Exchanger<String> exchanger = new Exchanger<>();

        runThread(() -> {
            System.out.println("Thread sleep");
            Thread.sleep(500);
            String receive = exchanger.exchange("from Thread");
            System.out.println("Thread receive=" + receive);
        });

        System.out.println("Main exchange");
        final String receive = exchanger.exchange("from Main");
        System.out.println("Main receive=" + receive);
    }
}
  • メインスレッドと別スレッドの双方で exchange メソッドを呼び、戻り値を出力している
  • 別スレッドの方は、冒頭で少しだけスリープしている
実行結果
Main exchange
Thread sleep
Thread receive=from Main
Main receive=from Thread
  • 別スレッドが exchange メソッドを呼ぶまでメインスレッドが待機しているのが分かる
  • 片方が exchange の引数に渡した値が、他方の exchange の戻り値として返されていることが分かる
  • Exchanger を使うと、 exchange で処理を同期しつつ、任意の値を双方で渡し合うような制御ができる

各シンクロナイザーの使い分け

あくまで個人的な考えをもとに描いたものなので、間違っている可能性あり。

【スレッド数が決まっている?】---{No}--->【スレッド数が決まるまで待てる?】
   |                                             |
 {Yes}                                           |
   |                                             |
   + <-------------------{Yes}-------------------+---{No}---> [Phaser]
   |
   V
【スレッド数が2つで、値のやり取りが必要?】---{Yes}---> [Exchanger]
   |
  {No}
   |
   V
【限られたリソースの取り合いを制御したい?】---{Yes}---> [Semaphore]
   |
  {No}
   |
   V
【関連する全てのスレッドが所定の位置に到達するまで全てのスレッドを待機させたい?】
   |                            |
  {No}                          +---{Yes}---> [CyclicBarrier]
   |
   V
 [CountDownLatch]

以下、根拠となる個人的な考え方。

  • スレッド数が決まっていれば Phaser でできることは CyclicBarrierCountDownLatch でもできると思うので、 Phaser を使うかどうかはスレッド数が決まるまで処理の開始を待てないような場面なのかなと思った
  • ExchangerSemaphore は制御が特徴的なので、用途は限定的になるのかなと思った
  • CyclicBarrierCountDownLatch の大きな違いは以下かなと思う
    • CyclicBarrier は、パーティに属する全てのスレッドが await を呼ぶまで、パーティに属する全てのスレッドが待機する
    • CountDownLatch は、 countDown が所定の回数呼ばれるまで await しているスレッドだけが待機する (countDown を呼んだスレッドは待たなくていい)
  • CyclicBarrier は、パーティに属する全てのスレッドが所定の場所に揃うのを待つことが目的
  • CountDownLatch は、待機終了のイベントが発火されるまで3スレッドを待たせることが目的

参考

  1. 厳密には、この文章は acquirerelease が同じスレッドから呼ばれる必要はないという説明の中での記載だが、パーミット数のコントロールも結局同じ話なのかと自分は解釈した。

  2. メインスレッドの分が残っている。

  3. countDown が所定回数呼ばれるまで。

5
9
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
9