#マルチスレッド処理ではデータ共有に注意
- マルチスレッド処理ではデータがスレッド間で共有されているかどうかを意識する
- 共有データに複数のスレッドが同時に処理実施するとデータ矛盾が発生してしまう!
###NG例
- メインスレッドで用意したvalueフィールドを10万個のスレッドで並行してインクリメントする例(結果は10万を期待)
//NG例
public class SynchronizedNotUse {
//複数スレッドで共有するデータ
private int value = 0;
//10万個スレッド実行
public static void main(String[] args) {
final int TASK_NUM = 100000;
var th = new Thread[TASK_NUM];
var tb = new SynchronizedNotUse();
//スレッド生成、実行
for (var i = 0; i < TASK_NUM; i++) {
th[i] = new Thread(() -> {
tb.increment();
});
th[i].start();
}
//スレッド終了まで待機
for (var i = 0; i < TASK_NUM; i++) {
try {
th[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(tb.value); //毎回異なる
}
//valueフィールドをインクリメント
void increment() {
this.value++;
}
}
- いつも結果は10万を期待しているのに、毎回異なる値が出てしまう
- 問題なのはSynchronizedNotUseクラスのincrementメソッド
- incrementメソッドではインスタンスフィールドのvalueの値を++演算子で加算
- 一見して他スレッドは割り込めない気がするが、++演算子は内部的に
- 変数の現在地を取得、値を加算、演算結果の再代入
- 処理途中で他スレッドの割り込み発生すると結果が正しく反映されない!
- →synchronized命令を使う!
#synchronizedブロックで排他制御
- synchronizedブロック内で囲まれた処理は複数のスレッドで同時に呼ばれることがなくなる
- ほぼ同時でも後に呼ばれた方が先行処理(ロックを獲得)が終わるまで待ち状態になる
- ロック:特定の処理を占有すること
-
排他制御(同期処理):ロックで同時実行によるデータ不整合を防ぐこと
- 同期処理が正しく動作するのはロック対象のオブジェクトが同一のインスタンスの場合のみ
- 異なるロックオブジェクト間では同期できない
- ブロックを抜けたら自動でロック解放
public class SynchronizedUse {
private int value = 0;
public static void main(String[] args) {
final int TASK_NUM = 100000;
var th = new Thread[TASK_NUM];
var tb = new SynchronizedUse();
for (var i = 0; i < TASK_NUM; i++) {
th[i] = new Thread(() -> {
tb.increment();
});
th[i].start();
}
for (var i = 0; i < TASK_NUM; i++) {
try {
th[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(tb.value);
}
//ロック対象のオブジェクトを現在のインスタンスに指定
void increment() {
synchronized(this) {
this.value++;
}
}
}
##synchronized修飾子
- メソッド全体を同期化の対象にしたいときに使う
- あるスレッドがsynchronizedメソッド1を実行している間は別スレッドがsynchronizedメソッド2を実行できない
- メソッドを抜けたら自動でロック解放
- コンパイラがキャッシュしたコードを生成する(最適化)場合に、synchronizedでは元のメモリから読まれるので整合性が保証される
- cf: 変数の代入/取得のみの場合には
volatile
修飾子も利用できる
- cf: 変数の代入/取得のみの場合には
synchronized void increment() {
this.value++;
}
#明示的なロック
- 明示的なロック:明示的にロックの獲得/解除が必要
- 暗黙的なロック:ロックの獲得/解除を意識しなくていい(synchronized)
-
ReentrantLockクラス
- 明示的なロックを獲得した場合、その直後からtryブロックでくくる
- finallyブロックでロック解除を保証
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockBasic {
private int value = 0;
private final Lock lock = new ReentrantLock();
public static void main(String[] args) {
final int TASK_NUM = 100000;
var th = new Thread[TASK_NUM];
var tb = new LockBasic();
for (var i = 0; i < TASK_NUM; i++) {
th[i] = new Thread(() -> {
tb.increment();
});
th[i].start();
}
for (var i = 0; i < TASK_NUM; i++) {
try {
th[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(tb.value);
}
void increment() {
//ロック獲得
lock.lock();
try {
this.value++;
} finally {
//ロック解除
lock.unlock();
}
}
}
tryLockメソッド
-
ロック獲得可能か事前にチェックできる
* ロック待ち不要ならばスキップ
if (lock.tryLock(10,TimeUnit.SECONDS)) {
try {
//排他制御すべき処理
} finally {
lock.unlock();
}
}else{
//ロック獲得できない場合の処理
}
#ロックフリーな排他制御
-
単一の変数に対する代入、加減算などをハードウェアレベルでアトミックに実現
- cf:
synchronized
、java.util.concurrent.locks
はロックを前提とした排他制御
- cf:
-
アトミック:途中割り込みがないことを保証されている状態
- Javaではlong/double型以外の変数への読み書きはアトミックが保証
- long/double型は必ずしもアトミックではないのでマルチスレッド環境で割り込みの可能性がある(データ破損)
- AtomicIntegerクラスのgetAndIncrementメソッドを使うことで++演算子をアトミックに実施できる!
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicBasic {
private AtomicInteger value = new AtomicInteger();
public static void main(String[] args) {
final int TASK_NUM = 100000;
var th = new Thread[TASK_NUM];
var tb = new AtomicBasic();
for (var i = 0; i < TASK_NUM; i++) {
th[i] = new Thread(() -> {
tb.increment();
});
th[i].start();
}
for (var i = 0; i < TASK_NUM; i++) {
try {
th[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(tb.value);
}
void increment() {
value.getAndIncrement();
}
}
#スレッドプール
- アプリが利用するスレッドをあらかじめ用意しプールしておくこと
- プールしたスレッドは必要に応じて取り出し、使い終わったらプールに戻す
- スレッドを再利用することで生成/破棄のオーバーヘッド節約!
- Executorsクラス:スレッドプール生成
- executeメソッド:指定の処理をスレッド経由で実行
- shutdownメソッド:
- スレッドプールによりスレッド生成を意識する必要がなくなる!
public class ThreadPool implements Runnable {
@Override
public void run() {
for (var i = 0; i < 30; i++){
System.out.println(Thread.currentThread().getName() + ":" + i);
}
}
}
import java.util.concurrent.Executors;
public class ThreadPoolBasic {
public static void main(String[] args) {
//スレッドを10個準備したスレッドプール生成
var es = Executors.newFixedThreadPool(10);
//executeメソッドで呼び出して実行
es.execute(new ThreadPool());
es.execute(new ThreadPool());
es.execute(new ThreadPool());
es.shutdown();
}
}
#スケジュール実行
- ScheduledThreadServiceオブジェクト
- ExecutorsクラスのnewScheduledThreadPool/newSingleScheduledExecutorメソッドで生成
- scheduleメソッド:指定時間経過後、一度だけ処理実行
- scheduleAtFixedRate/scheduleWithFixedDelayメソッド:指定時間間隔でなんども処理実行
import java.time.LocalDateTime;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ThreadSchedule {
public static void main(String[] args) {
//スレッドプールの準備
var sche = Executors.newScheduledThreadPool(2);
//スケジュール実行登録
sche.scheduleAtFixedRate(() -> {
System.out.println(LocalDateTime.now());
}, 0, 5, TimeUnit.SECONDS);
//スケジュール実行を待ってメインスレッドを休止
try {
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//スレッドプールをシャットダウン
sche.shutdown();
}
}
#スレッドの処理結果を受け取る
- ExecutorService / ScheduledExecutorService + Callableインターフェース
- **Callable **:戻り値型にT型を割り当てる
- callメソッド:Callableインターフェースで実行すべきコード
-
submitメソッド:定義したCallableオブジェクトの処理を実行
- 戻り値はFuture型:非同期処理の結果
- getメソッド:実際の戻り値を取り出す
//別スレッドで乱数を求めミリ秒数分スレッド休止
//その値をメインスレッドで表示
import java.util.Random;
import java.util.concurrent.Callable;
//Callable<Integer> でInteger型を割り当てる
public class ThreadCallable implements Callable<Integer> {
@Override
//Callableインターフェースで実行すべきコード
public Integer call() throws Exception {
var rnd = new Random();
var num = rnd.nextInt(1000);
Thread.sleep(num);
return num;
}
}
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
public class ThreadCallableBasic {
public static void main(String[] args) {
//スレッド実行
var exe = Executors.newSingleThreadExecutor();
var r1 = exe.submit(new ThreadCallable());
var r2 = exe.submit(new ThreadCallable());
var r3 = exe.submit(new ThreadCallable());
//スレッド結果表示
//submitメソッドの戻り値はFuture<Integer>
try {
System.out.println("r1: " + r1.get());
System.out.println("r2: " + r2.get());
System.out.println("r3: " + r3.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
exe.shutdown();
}
}
#スレッド処理の後処理
- CompletableFutureクラス
- 非同期処理結果を受けた後処理を簡潔に書ける
- 非同期処理を直列に実行
- 非同期処理を並列に実行
- 実行すべき非同期処理をsupplyAsyncメソッドに渡す
//別スレッドで乱数を求めミリ秒数分スレッド休止
//その値を表示
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class FutureBasic {
public static void main(String[] args) {
//非同期処理実行
CompletableFuture.supplyAsync(() -> {
var r = new Random();
var num = r.nextInt(1000);
heavy(num);
return num;
})
//完了後の処理
.thenAcceptAsync((result) -> {
System.out.println(result);
});
System.out.println("...任意の後処理...");
heavy(7000);
}
//ダミー処理(重い)で指定時間のみ処理休止
public static void heavy(int num) {
try {
Thread.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
##成功/エラーの処理を振り分ける
- whenCompleteAsyncメソッド
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class FutureComplete {
public static void main(String[] args) {
CompletableFuture.supplyAsync(() -> {
var r = new Random();
var num = r.nextInt(1000);
heavy(num);
return num;
})
.whenCompleteAsync((result, ex) -> {
//成功
if (ex == null) {
System.out.println(result);
} else {
//失敗
System.out.println(ex.getMessage());
}
});
heavy(7000);
}
public static void heavy(int num) {
try {
Thread.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
##複数の非同期処理を直列に実行
-
CompleteFutureの真価発揮
-
supplyAsync:Supplier(引数なし戻値あり)
- 結果の値生成
-
thenApplyAsync:Function(引数あり戻値あり)
- 値の加工、変換
-
thenAcceptAsync:Consumer(引数あり戻値なし)
- 結果受け取り処理
-
supplyAsync:Supplier(引数なし戻値あり)
import java.util.Random;
import java.util.concurrent.CompletableFuture;
public class FutureSeq {
public static void main(String[] args) {
//処理1(乱数生成)
CompletableFuture.supplyAsync(() -> {
var r = new Random();
var num = r.nextInt(5000);
heavy(2000);
System.out.printf("処理1: %d\n", num);
return num;
})
//処理2(乱数倍)
.thenApplyAsync((data) -> {
var result = data * 2;
heavy(2000);
System.out.printf("処理2: %d\n", result);
return result;
})
//処理3(乱数さらに倍)
.thenAcceptAsync((data) -> {
var num = data * 2;
heavy(2000);
System.out.printf("処理3: %d\n", num);
});
heavy(7000);
}
public static void heavy(int num) {
try {
Thread.sleep(num);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}