マルチスレッドプログラミング(並列処理; multi-thread programming)
参考: 発展学習3日目(ラムダ式)
全ての処理は特定のスレッド上で実行され、main()
メソッドはmainスレッド上で動作する。
ここで、サブスレッド(=スレッドクラス)は「Thread
クラスのサブクラス」または「Runnable
インタフェースの実装クラス」で定義され、
Runnable
インタフェースは関数インタフェースであるため、ラムダ式を用いた記述が可能。
また、サブスレッドはスレッドクラスで再定義されたrun()
メソッドの実行完了時に自動消滅され、JVMは全ユーザスレッドの実行終了を待機する。
スレッドの実行終了を待機しない場合は、Thread#setDaemon()
メソッドを用いてデーモンスレッドに変更する必要がある。
ただし、スレッドの実行は1度のみに限定され、消滅したスレッドを再度Thread#start()
で呼び出すことはできない。
定義
// Runnableインタフェースを利用したスレッド生成
Thread(Runnable target)
// パラメータ
// target: 「Runnableインタフェースの実装クラス」または「関数オブジェクト」
// スレッドの実行
// -> Runnableインタフェースで定義されたrun()メソッドが呼び出される
// <- ThreadクラスはRunnableインタフェースの実装クラスであり、run()メソッドのオーバーライドが必要
void Thread#start()
// デーモンスレッドとしてセット
void Thread#setDaemon(boolean on)
// パラメータ
// on: デーモンスレッドとするかどうかを表すフラグ
// 指定スレッドの終了待機
// -> join()メソッドを実行したスレッドの処理が完了するまで、
// joinメソッドを記述したスレッドのjoin()以降の処理は実行されない
void Thread#join()
サンプルコード
import java.util.Scanner;
import java.util.concurrent.*;
class MainThread {
// mainスレッドで実行する処理
public static void main(String[] args) {
System.out.println("start to count..");
// 新規スレッドの生成
Thread t1 = new SubThreadA();
Thread t2 = new Thread(new SubThreadB());
// ラムダ式を利用する場合
Thread t3 = new Thread(() -> {
System.out.println("This sentence is written by lambda, executed by SubThreadC.");
});
// 新規作成したスレッドの実行
// -> 指定したスレッドクラスのrun()メソッドが呼び出される
t1.start();
t2.start();
t3.start();
try {
// 指定スレッドの終了待機
t1.join();
}
catch (InterruptedException e) {
e.printStackTrace();
}
// サブスレッド(SubThreadA)が終了してから実行される処理
try (
Scanner stdIn = new Scanner(System.in);
) {
// スキャン範囲に既に文字列が存在する場合は入力待機を行わない
// -> 2行以上入力されている場合は1行しか読み取らない
String input = stdIn.nextLine();
System.out.println("your input: " + input);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
// -- Threadクラスの継承を利用する場合 --
class SubThreadA extends Thread{
// サブスレッドで実行する処理
// -> 呼び出し時にオーバーライドしたrun()メソッドが自動的に実行
@Override
public void run() {
System.out.println("This procedure is executed by SubThreadA.");
for (int i = 9; i >= 1; i--) {
System.out.print(i + ".., ");
try {
// スリープ処理
// -> 列挙型TimeUnitの定数を通じたThread.sleep()メソッドの実行
TimeUnit.SECONDS.sleep(1);
}
// スレッド内で送出された例外は他のスレッドに伝播せず強制終了
catch (InterruptedException e) { ; }
}
System.out.println(0);
}
}
// -- Runnableインタフェースを利用する場合 --
class SubThreadB implements Runnable {
// サブスレッドで実行する処理
// -> 呼び出し時にオーバーライドしたrun()メソッドが自動的に実行
@Override
public void run() {
System.out.println("This sentence is printed by SubThreadB.");
}
}
start to count..
This procedure is executed by SubThreadA.
This sentence is printed by SubThreadB.
9.., This sentence is written by lambda, executed by SubThreadC.
8.., 7.., 6.., ABC // ここで改行 <- スキャナが読み取る範囲
5.., 4.., 3.., DEF // ここで改行 <- スキャナが読み取らない範囲
2.., 1.., 0
your input: ABC // サブスレッドの実行終了後にメインスレッドによって実行された処理結果
スレッドセーフ(thread safe)
複数スレッドが同時に共有インスタンスの読み書きを行ってしまう「スレッドの競合(conflict)」を避けるために、
スレッドを**同期(synchronize; 調停, 排他制御)**する設計。
スレッドの同期
スレッドの競合が起こりうる場合、synchronized
キーワードやObject#wait()
/Object#notifyAll()
メソッドを用いてスレッドを同期する。
また、スレッドの実行優先順位はプライオリティ値によって決定され、全てのスレッドの規定値は5(=Thread.NORM_PRIORITY
)である。
ここで、競合スレッドのプライオリティ値が同値である場合は、基本的には先に実行したスレッドから動作するものの、
競合スレッドがメインスレッドとサブスレッドである場合はメインスレッドが優先される傾向があるため、
処理順序が重要である場合は、Object#wait()
メソッドを用いて、後で動作させるスレッドをウェイトセットに退避させる必要がある。
定義
// synchronizedブロックの利用
synchronized (<共有インスタンス>) {
... // 排他的に実行する処理
}
// synchronized修飾子の利用
public synchronized void method() {
... // 排他的に実行する処理
}
// ウェイトセットの利用
// -- Object.wait(), Object.notifyAll()を利用する場合 --
synchronized (<共有インスタンス>) {
<共有インスタンス>.wait();
... // 待機させる処理
}
synchronized (<共有インスタンス>) {
<共有インスタンス>.notifyAll();
... // 優先的に実行する処理
}
// -- Object.wait()のみ利用する場合 --
synchronized (<共有インスタンス>) {
<共有インスタンス>.wait(<待機時間[ミリ秒]>);
}
サンプルコード
class AsyncConflict {
public static void main(String[] args) {
ConflictTest test = new ConflictTest(1);
Thread m = Thread.currentThread();
Thread t1 = new SubThread1(test);
Thread t2 = new Thread(new SubThread2(test));
// スレッド情報の取得
System.out.println("-- Thread Information(threadName: priority) --");
System.out.println(m.getName() + ": " + m.getPriority());
System.out.println(t1.getName() + ": " + t1.getPriority());
System.out.println(t2.getName() + ": " + t2.getPriority());
System.out.println("-- test --");
System.out.println("initialized:" + test.id);
// 共有インスタンスを扱う並列処理
// -> 本来であれば「スレッドの競合」が発生するが、synchronizedブロックによって
// 最初にsynchronizedブロックを呼び出したスレッド -> synchronizedブロックを呼び出す待機状態のスレッド -> ...
// というように、スレッド間の同期が行われる。
// ※この場合、待機順は実行順位とは無関係
t1.start();
t2.start();
synchronized (test) {
test.id *= 5;
System.out.println("main thread: " + test.id + "(multiply 5)");
}
try {
// サブスレッドの終了待機
t1.join();
t2.join();
}
catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("ended: " + test.id);
}
}
// 共有インスタンスを保持するクラス
class ConflictTest {
public int id;
// コンストラクタ
public ConflictTest(int id) {
this.id = id;
}
}
// サブスレッド(SubThread1)
class SubThread1 extends Thread {
private ConflictTest test;
// コンストラクタ
public SubThread1(ConflictTest test) {
this.test = test;
}
// サブスレッド(SubThread1)で動作する処理
@Override
public void run() {
//
synchronized (test) {
try {
// 指定ミリ秒の待機(=ウェイトセットへの退避)
// -> 同期状態が解除され、他スレッドの動作が許可される
test.wait(1000);
} catch (Exception e) { ; }
test.id += 1;
System.out.println("SubThread1: " + test.id + "(plus 1)");
}
}
}
// サブスレッド(SubThread2)
class SubThread2 implements Runnable {
private ConflictTest test;
// コンストラクタ
public SubThread2(ConflictTest test) {
this.test = test;
}
// サブスレッド(SubThread2)で動作する処理
@Override
public void run() {
synchronized (test) {
test.id *= 2;
System.out.println("SubThread2: " + test.id + "(multiply 2)");
}
}
}
-- Thread Information(threadName: priority) --
main: 5
Thread-0: 5
Thread-1: 5
-- test --
initialized:1
main thread: 5(multiply 5) // 本来はSubThread1が実行されるが、wait()メソッドによって他スレッドが動作する
// -> 待機順はSubThread2,メインスレッドの順だが、実行順位とは無関係であり、
// 偶然メインスレッドの動作が優先された
// ※プライオリティ値が同値である場合、「メインスレッドが優先される」傾向がある
SubThread2: 10(multiply 2) // SubThread1は依然として待機状態にあるため、SubThread2が実行される
SubThread1: 11(plus 1) // 最後に待機状態にあったSubThread1が実行される
ended: 11 // SubThread1, SubThread2の実行終了後に呼び出される処理
スレッドセーフなデータ型
参考: java.util.concurrent.atomicパッケージ
java.util.concurrent.atomic
パッケージで用意された、スレッドの競合時に自動的に調停を行う主なデータ型は以下の通り。
クラス | データ型 |
---|---|
AtomicBoolean | boolean |
AtomicInteger | integer |
AtomicLong | long |
AtomicBoolean
を用いて、サブスレッド上でフラグチェック(boolean値の読み取り)を行いながら、
メインスレッド上でフラグ操作(boolean値の書き換え)を行うことで、
サブスレッド上で動作する処理をメインスレッドから中断させることができる。
サンプルコード
import java.util.Scanner;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;
class AsyncStop {
public static void main(String[] args) {
// サブスレッドの生成
SubThreadStoppable t = new SubThreadStoppable();
// サブスレッド上での処理実行
t.start();
// メインスレッド上での処理
String input = "";
try (
Scanner stdIn = new Scanner(System.in);
) {
// "1"が入力されるまでメインスレッドが稼働
while (!input.equals("1")) {
input = stdIn.nextLine();
}
// "1"が入力された場合は停止フラグをtrueに変更
if (input.equals("1")) {
t.stopFlg.set(true);
}
} catch (Exception e) {
e.printStackTrace();
}
try {
// サブスレッドでの処理終了待機
// -> 既にサブスレッドが消滅している場合はスキップされる
t.join();
System.out.println("stop all threads");
}
catch (InterruptedException e) { ; }
}
}
class SubThreadStoppable extends Thread {
// スレッドセーフな停止フラグ
// <- メインスレッドによる「値の書き換え」とサブスレッドによる「値の読み取り」が競合する可能性があるため、
// スレッドセーフなAtomicBooleanを利用
// <- 参照先の書き換えを禁止するためfinalキーワードを付与
final AtomicBoolean stopFlg = new AtomicBoolean(false);
// サブスレッド上で動作する処理
@Override
public void run() {
for (int i = 9; i >= 1; i--) {
// スレッドセーフな停止フラグ
if (this.stopFlg.get()) {
// 停止フラグがtrueである場合はループ処理を抜ける
break;
}
System.out.println(i + ".., ");
try {
// スリープ処理
TimeUnit.SECONDS.sleep(1);
}
catch (Exception e) { ; }
}
if (!this.stopFlg.get()) {
System.out.println(0);
System.out.println("input \"1\" to exit");
}
}
}
9.., 8.., 7.., 1
stop all threads
マルチスレッドプログラミングのデザインパターン
参考1: マルチスレッドプログラミングのデザインパターン
参考2: Java multi-threaded future design pattern
参考3: CompletableFuture(Promiseパターン)
参考4: CyclicBarrier(Barrierパターン)
マルチスレッドプログラミングにおける主なデザインパターンは、以下の通り。
パターン | 内容 | API |
---|---|---|
Two-Phase Termination | サブスレッドのループ処理内での停止フラグチェック | ExecutorService#shutdown() |
Timer / Scheduler |
Timer またはScheduler による実行タイミングの指定 |
ScheduledExecutorService Timer
|
Thread Pool / WorkerThread | スレッドプールまたはワーカースレッドによる タスクの自動割り当て |
ThreadPoolExecutorService ScheduledThreadPoolExecutorService
|
Future | スレッドの実行結果を格納 | Future |
Promise | スレッドの実行結果を利用する処理の実行 | CompletableFuture |
Barrier | 複数スレッドの終了待機後に処理を実行 |
CyclicBarrier Phaser
|
ReadWrite Lock | 読み込みの同時アクセスの許容 書き込み中の読み込みの禁止 |
ReadWriteLock |
Semaphore | 複数スレッドによる処理のロック | Semaphore |
用語集
用語 | 内容 |
---|---|
ユーザスレッド(user thread) | JVMによって終了待機が行われるスレッド |
デーモンスレッド(daemon thread) | JVMによって終了待機が行われないスレッド |
ウェイトセット(wait set) | synchronizedブロックの実行中に、他スレッドの処理を優先的に実行するために 共有インスタンスが用意するスレッドの退避場所 |
デッドロック(dead lock) | 互いに相手の同期処理の完了を待機しているスレッド同士が、 スレッドセーフが施されていないプログラム中で待機し続ける現象 |