はじめに
Executorフレームワークのありがたみを知るシリーズを2025アドベントカレンダーの中で書きまして、散々スレッドの話をしたので、スレッドセーフの話も書いてみようかと思った次第です。
Executorフレームワークのありがたみを知るシリーズ
- Executors#newSingleThreadExecutor編
- Executors#newFixedThreadPool編
- Executors#newCachedTheadPool編
- Callable/Future編
- ExecutorCompletionService編
- CompletableFuture編
参考:
3.1 スレッドの仕組みと操作(マルチスレッド、並行処理、並列処理、非同期呼び出し、スレッドセーフ、同期化、デッドロック、ウェイトセット、スレッドローカルなど)~Java Advanced編
Java並行処理の神髄!java.util.concurrent徹底解説
スレッドセーフとは
複数のスレッドで共有しても問題ないインスタンスのことを「スレッドセーフ」と呼びます。
複数のスレッドでインスタンスを共有することで発生する問題
発生する問題はいろいろあるのですが、以下いくつか例を上げていますので、「危なそう」というのを感じていただければ。
単純なインクリメントの例
インクリメント(count++)するタスクを作成し、このタスクを100スレッドで1万回並列処理してみます。
コードはこちら
public class Counter {
public int count = 0;
public void increment() {
count++;
}
}
public class IncrementTask implements Runnable {
private final Counter counter;
public IncrementTask(Counter counter) {
this.counter = counter;
}
@Override
public void run() {
counter.increment();
}
}
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
ExecutorService executor = Executors.newFixedThreadPool(100);
Counter counter = new Counter(); // スレッド間で共有するインスタンス
for(int i = 0; i< 10000; i++) {
executor.submit(new IncrementTask(counter));
}
executor.shutdown();
while (!executor.isTerminated()) {
Thread.sleep(10); // 全件終わるまで待つ
}
System.out.println(counter.count);
}
}
count++を1万回呼んでいるので、出力結果は「10000」になるはずですよね。
実際の結果は以下です。
> 9768
実際の数値は実行ごとに異なりますが、何度やっても「10000」にはなりません。
これは、count++で行われることを分解してみるとわかります。
count++を省略しないで書くと以下の通りです。
count = count + 1;
つまり、
- countの値を取得する
- countに1を足した値を計算する
- countに結果を格納する
という3ステップある事がわかります。
ここで、2つのスレッドが同時に上記を実行することを考えてみましょう。
例えば、以下のように進みます。
- スレッドAがcountの値を取得する。この時のcountの値は10
- スレッドBがcountの値を取得する。この時のcountの値は10
- スレッドAが計算する。結果は11
- スレッドBが計算する。結果は11
- スレッドAが結果を格納する。countの値は11になる
- スレッドBが結果を格納する。countの値は11になる
この場合、「1を足す」が2回行われているはずなのに、カウントは「1」しか進んでいません。
上記のような動きが1万回のうち何度が発生するため、結果が1万よりも少なくなってしまうというわけです。
キャッシュによる影響
フラグがtrueの間はずっと処理を続ける、フラグがfalseになったら処理をやめる、というタスクを考えます。
public class VisibilityTask implements Runnable {
public static boolean running = true;
@Override
public void run() {
while (running) {
// フラグが折れるまで処理を繰り返す
}
System.out.println("処理おわり");
}
}
public class App {
public static void main(String[] args) throws InterruptedException, ExecutionException {
Thread thread = new Thread(new VisibilityTask());
thread.start();
Thread.sleep(1000); // 少し待つ
VisibilityTask.running = false;
System.out.println("running を false にしたよ");
thread.join();
}
特に問題なく実装できていそうですね。
しかし、実際に動かしてみると、「処理おわり」はいつまで待っても出力されません。
> running を false にしたよ
// いつまで待っても「処理おわり」は出力されない
これは、CPUやJVMによるキャッシュが影響しています。
メインスレッドでrunningの値を書き換えていますが、これはメインメモリの値を書き換えにいっています。
対して、タスク側ではwhileの変数としてrunninngを繰り返し見ています。タスク側ではrunningの値は書き換えられませんので、「いちいちメインメモリを見にいく必要がない」と判断され、キャッシュが使われるのです。
そのため、メインメモリの値がとっくに書き換わっていても、いつまでも古い値を参照してしまいます。
再順序化による影響
続いて、以下のタスクを考えます。
public class Variables {
public static int a = 0;
public static int b = 0;
public static int x = 0;
public static int y = 0;
}
public class TaskX implements Runnable{
@Override
public void run() {
Variables.a = 1;
Variables.x = Variables.b;
}
}
public class TaskY implements Runnable{
@Override
public void run() {
Variables.b = 1;
Variables.y = Variables.a;
}
}
TaskXでは「aを1にした後、xにbを代入」します
TaskYでは「bを1にした後、yにaを代入」します
TaskX, TaskYが並列で実行された時、xとyの組み合わせはどんなものがあるでしょうか。
-
(x, y) = (0, 1):TaskX → TaskYの順で実行された場合、こうなります -
(x, y) = (1, 0):TaskY → TaskXの順で実行された場合、こうなります -
(x, y) = (1, 1):TaskXの前半とTaskYの前半がどちらも完了してからTaskXの後半とTaskYの後半が実行された場合、こうなります -
(x, y) = (0, 0):TaskXとTaskYがどのような順番で実行されてもこの組み合わせにはなりません。
(x, y) = (0, 0)があり得ないことを、実際に実行して確かめてみましょう。
以下のコードで確認します。
public static void main(String[] args) throws InterruptedException, ExecutionException {
for (int i = 0; i < 10_000_000; i++) { // たくさん回す
Variables.a = 0;
Variables.b = 0;
Variables.x = 0;
Variables.y = 0;
Thread t1 = new Thread(new TaskX());
Thread t2 = new Thread(new TaskY());
t1.start();
t2.start();
t1.join();
t2.join();
if (Variables.x == 0 && Variables.y == 0) {
// ここには来ないはずだが・・・?
System.out.println("(x, y) = (0, 0)");
break;
}
}
}
結果、以下の出力がありました
> (x, y) = (0, 0)
なぜこれが起きるかというと、「命令の再順序化」が関係しています。
CPUやコンパイラの解釈により、「意味が変わらないように処理の順序を入れ替える」ことがあり、これを再順序化と言います。高速化・効率化のために行われるものです。
詳しくは以下のページが詳しかったのでぜひ
メモリオーダリングの基本概念とプログラム動作への影響
実装上はx,yに値が入るタイミングではすでにa,bいずれかに1が入っているはず、と考えられるのですが、a,bへの代入とx,yへの代入の順序は入れ替えられることがあります。
シングルトンが担保できない
続いては、「アプリケーションのライフサイクルの中で一回だけ実行されるもの」を考えてみます。
代表的なのが「シングルトン」ですね。コンストラクタが呼ばれるのを一回のみにすることで、インスタンス化を一回のみとする設計です。
シングルトン自体は以下のように実装できます。
public class Singleton {
private static Singleton instance;
private Singleton() {
}
public static Singleton getInstance() {
if(instance == null) {
instance = new Singleton();
}
return instance;
}
}
コンストラクタを外から呼べないようにして、static変数に唯一のインスタンスを持つようにします。
インスタンスがなければ作る、あればそれを返す、とすることで、一回のみインスタンス化される、というのを実現します。
さて、複数スレッドでこのクラスを呼んだ場合どうなるでしょうか?
public static void main(String[] args) throws InterruptedException {
Set<Singleton> createdInstance = Collections.synchronizedSet(new HashSet<>());
List<Thread> threads = new ArrayList<>();
for (int i = 0; i < 1000; i++) {
threads.add(new Thread(
() -> createdInstance.add(Singleton.getInstance())));
}
for (Thread t : threads) {
t.start();
}
for (Thread t : threads) {
t.join();
}
System.out.println(createdInstance.size());
}
結果は、
> 2
シングルトンにしたのに、インスタンスが2個できてしまいました。
以下のような流れで複数個できてしまいます。
- スレッドAで、「インスタンスがあるか」をチェックする(ないのでif文の中に進む)
- スレッドBで、「インスタンスがあるか」をチェックする(ないのでif文の中に進む)
- スレッドAでコンストラクタを呼ぶ
- スレッドBでコンストラクタを呼ぶ
- スレッドAでstatic変数にインスタンスを格納する
- (これ以降にやってくるスレッドCでは、「インスタンスがあるか」の判定が「ある」になるのでもうコンストラクタは呼ばれない
スレッドセーフにする方法
マルチスレッドで発生する摩訶不思議な事象を解決するために、「スレッドセーフ」にする方法を知っておきましょう
同期化する
メソッドのシグネチャにsynchronizedをつけることで、「同期メソッド」とすることができます。
同時に複数のスレッドがそのメソッドを呼んだ場合、一つずつ実行され、他は順番が回ってくるのを待ちます。
インクリメントの例では、以下のように書けます
public synchronized void increment() {
count++;
}
この場合、このメソッドで必ずブロックが発生してしまうので、このメソッドの実行に時間がかかる場合、速度が気になってきます。
その場合、同期化するべき部分をより限定することができます。
キーワードは同じくsynchronizedですが、メソッド内部の一部のロジックだけをロック対象とすることができます。
public void increment() {
synchronized(this) {
count++;
}
}
Atomicを使う
java.util.concurrent.atomicパッケージでは、原子的な更新を可能とするオブジェクトが提供されています。
原子的な更新とは「途中で割り込まれない単一の(不可分な)操作」を指します。Atomicをもっと噛み砕くと、それ以上分割できない最小単位での更新が可能、つまり他スレッドに割り込まれない更新が可能、ということです。
synchronizedでは少なからずロックが発生してしまいますが、Atomicの場合はロックが発生しません。
java.util.concurrentパッケージ配下なこともあって、並列処理にはうってつけのクラスです。
インクリメントの例は以下のように書けます。
public class Counter {
public AtomicInteger count = new AtomicInteger();
public void increment() {
count.incrementAndGet();
}
}
volatileを使ってキャッシュの利用を制限する
変数宣言時にvolatileを付与すると、その変数に対してはキャッシュが利用されなくなります。
フラグを折っても気づかず走り続ける問題については、このフラグにvolatileをつけることで解決できます。
public static volatile boolean running = true;
イミュータブルにする
イミュータブルとは、その英語の通り、「不変クラス」を指します。
これがなぜスレッドセーフになるかというと、マルチスレッドでの問題は全て「変更時」に発生するからです。
変更するときに問題が起きるなら、変更できなくしてしまおう、という、最強の戦略です。
おわりに
マルチスレッドによる問題は、コードを読んだだけでは気付けない、というのが嫌ですよね。
しかも事象の発生がランダムなので、発生条件もよくわからない、となりがちです。
「こういう問題が起きることがあるんだ」というのを知っているだけでも、問題発生時の解決スピードが段違いに変わると思います。