1
1

More than 3 years have passed since last update.

【Java発展学習16日目】マルチスレッドプログラミング

Last updated at Posted at 2021-07-21

マルチスレッドプログラミング(並列処理; multi-thread programming)

参考: 発展学習3日目(ラムダ式)
全ての処理は特定のスレッド上で実行され、main()メソッドはmainスレッド上で動作する。

ここで、サブスレッド(=スレッドクラス)は「Threadクラスのサブクラス」または「Runnableインタフェースの実装クラス」で定義され、
Runnableインタフェースは関数インタフェースであるため、ラムダ式を用いた記述が可能。

また、サブスレッドはスレッドクラスで再定義されたrun()メソッドの実行完了時に自動消滅され、JVMは全ユーザスレッドの実行終了を待機する。
スレッドの実行終了を待機しない場合は、Thread#setDaemon()メソッドを用いてデーモンスレッドに変更する必要がある。

ただし、スレッドの実行は1度のみに限定され、消滅したスレッドを再度Thread#start()で呼び出すことはできない。

定義

Threadクラスを用いた並列処理
// Runnableインタフェースを利用したスレッド生成
Thread(Runnable target)
// パラメータ
// target: 「Runnableインタフェースの実装クラス」または「関数オブジェクト」

// スレッドの実行
// -> Runnableインタフェースで定義されたrun()メソッドが呼び出される
// <- ThreadクラスはRunnableインタフェースの実装クラスであり、run()メソッドのオーバーライドが必要
void Thread#start()

// デーモンスレッドとしてセット
void Thread#setDaemon(boolean on)
// パラメータ
// on: デーモンスレッドとするかどうかを表すフラグ

// 指定スレッドの終了待機
// -> join()メソッドを実行したスレッドの処理が完了するまで、
//    joinメソッドを記述したスレッドのjoin()以降の処理は実行されない
void Thread#join()

サンプルコード

Async.java
import java.util.Scanner;
import java.util.concurrent.*;

class MainThread {
    // mainスレッドで実行する処理
    public static void main(String[] args) {
        System.out.println("start to count..");

        // 新規スレッドの生成
        Thread t1 = new SubThreadA();
        Thread t2 = new Thread(new SubThreadB());
        // ラムダ式を利用する場合
        Thread t3 = new Thread(() -> {
            System.out.println("This sentence is written by lambda, executed by SubThreadC.");
        });

        // 新規作成したスレッドの実行
        // -> 指定したスレッドクラスのrun()メソッドが呼び出される
        t1.start();
        t2.start();
        t3.start();

        try {
            // 指定スレッドの終了待機
            t1.join();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }

        // サブスレッド(SubThreadA)が終了してから実行される処理
        try (
            Scanner stdIn = new Scanner(System.in);
        ) {
            // スキャン範囲に既に文字列が存在する場合は入力待機を行わない
            // -> 2行以上入力されている場合は1行しか読み取らない
            String input = stdIn.nextLine();

            System.out.println("your input: " + input);
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }
}

// -- Threadクラスの継承を利用する場合 --
class SubThreadA extends Thread{
    // サブスレッドで実行する処理
    // -> 呼び出し時にオーバーライドしたrun()メソッドが自動的に実行
    @Override
    public void run() {
        System.out.println("This procedure is executed by SubThreadA.");
        for (int i = 9; i >= 1; i--) {
            System.out.print(i + ".., ");

            try {
                // スリープ処理
                // -> 列挙型TimeUnitの定数を通じたThread.sleep()メソッドの実行
                TimeUnit.SECONDS.sleep(1);
            }
            // スレッド内で送出された例外は他のスレッドに伝播せず強制終了
            catch (InterruptedException e) { ; }
        }

        System.out.println(0);
    }
}

// -- Runnableインタフェースを利用する場合 --
class SubThreadB implements Runnable {
    // サブスレッドで実行する処理
    // -> 呼び出し時にオーバーライドしたrun()メソッドが自動的に実行
    @Override
    public void run() {
        System.out.println("This sentence is printed by SubThreadB.");
    }
}
実行結果
start to count..
This procedure is executed by SubThreadA.
This sentence is printed by SubThreadB.
9.., This sentence is written by lambda, executed by SubThreadC.
8.., 7.., 6.., ABC   // ここで改行  <- スキャナが読み取る範囲
5.., 4.., 3.., DEF   // ここで改行  <- スキャナが読み取らない範囲
2.., 1.., 0
your input: ABC      // サブスレッドの実行終了後にメインスレッドによって実行された処理結果

スレッドセーフ(thread safe)

複数スレッドが同時に共有インスタンスの読み書きを行ってしまう「スレッドの競合(conflict)」を避けるために、
スレッドを同期(synchronize; 調停, 排他制御)する設計。

スレッドの同期

スレッドの競合が起こりうる場合、synchronizedキーワードやObject#wait()/Object#notifyAll()メソッドを用いてスレッドを同期する。

また、スレッドの実行優先順位はプライオリティ値によって決定され、全てのスレッドの規定値は5(=Thread.NORM_PRIORITY)である。

ここで、競合スレッドのプライオリティ値が同値である場合は、基本的には先に実行したスレッドから動作するものの、
競合スレッドがメインスレッドとサブスレッドである場合はメインスレッドが優先される傾向があるため、
処理順序が重要である場合は、Object#wait()メソッドを用いて、後で動作させるスレッドをウェイトセットに退避させる必要がある。

定義

スレッドの同期
// synchronizedブロックの利用
synchronized (<共有インスタンス>) {
   ...  // 排他的に実行する処理
}

// synchronized修飾子の利用
public synchronized void method() {
   ...  // 排他的に実行する処理
}

// ウェイトセットの利用
// -- Object.wait(), Object.notifyAll()を利用する場合 --
synchronized (<共有インスタンス>) {
    <共有インスタンス>.wait();
    ...   // 待機させる処理
}
synchronized (<共有インスタンス>) {
    <共有インスタンス>.notifyAll();
    ...   // 優先的に実行する処理
}

// -- Object.wait()のみ利用する場合 --
synchronized (<共有インスタンス>) {
    <共有インスタンス>.wait(<待機時間[ミリ秒]>);
}

サンプルコード

AsyncConflict.java
class AsyncConflict {
    public static void main(String[] args) {
        ConflictTest test = new ConflictTest(1);

        Thread m = Thread.currentThread();
        Thread t1 = new SubThread1(test);
        Thread t2 = new Thread(new SubThread2(test));

        // スレッド情報の取得
        System.out.println("-- Thread Information(threadName: priority) --");
        System.out.println(m.getName() + ": " + m.getPriority());
        System.out.println(t1.getName() + ": " + t1.getPriority());
        System.out.println(t2.getName() + ": " + t2.getPriority());

        System.out.println("-- test --");

        System.out.println("initialized:" + test.id);

        // 共有インスタンスを扱う並列処理
        // -> 本来であれば「スレッドの競合」が発生するが、synchronizedブロックによって
        //    最初にsynchronizedブロックを呼び出したスレッド -> synchronizedブロックを呼び出す待機状態のスレッド -> ...
        //    というように、スレッド間の同期が行われる。
        //    ※この場合、待機順は実行順位とは無関係
        t1.start();
        t2.start();
        synchronized (test) {
            test.id *= 5;
            System.out.println("main thread: " + test.id + "(multiply 5)");
        }

        try {
            // サブスレッドの終了待機
            t1.join();
            t2.join();
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println("ended: " + test.id);
    }
}

// 共有インスタンスを保持するクラス
class ConflictTest {
    public int id;

    // コンストラクタ
    public ConflictTest(int id) {
        this.id = id;
    }
}

// サブスレッド(SubThread1)
class SubThread1 extends Thread {
    private ConflictTest test;

    // コンストラクタ
    public SubThread1(ConflictTest test) {
        this.test = test;
    }

    // サブスレッド(SubThread1)で動作する処理
    @Override
    public void run() {
        // 
        synchronized (test) {
            try {
                // 指定ミリ秒の待機(=ウェイトセットへの退避)
                // -> 同期状態が解除され、他スレッドの動作が許可される
                test.wait(1000);
            } catch (Exception e) { ; }

            test.id += 1;
            System.out.println("SubThread1: " + test.id + "(plus 1)");
        }
    }
}

// サブスレッド(SubThread2)
class SubThread2 implements Runnable {
    private ConflictTest test;

    // コンストラクタ
    public SubThread2(ConflictTest test) {
        this.test = test;
    }

    // サブスレッド(SubThread2)で動作する処理
    @Override
    public void run() {
        synchronized (test) {
            test.id *= 2;
            System.out.println("SubThread2: " + test.id + "(multiply 2)");
        }
    }
}
実行結果
-- Thread Information(threadName: priority) --
main: 5
Thread-0: 5
Thread-1: 5
-- test --
initialized:1
main thread: 5(multiply 5)   // 本来はSubThread1が実行されるが、wait()メソッドによって他スレッドが動作する
                             // -> 待機順はSubThread2,メインスレッドの順だが、実行順位とは無関係であり、
                             //    偶然メインスレッドの動作が優先された
                             // ※プライオリティ値が同値である場合、「メインスレッドが優先される」傾向がある
SubThread2: 10(multiply 2)   // SubThread1は依然として待機状態にあるため、SubThread2が実行される
SubThread1: 11(plus 1)       // 最後に待機状態にあったSubThread1が実行される
ended: 11                    // SubThread1, SubThread2の実行終了後に呼び出される処理

スレッドセーフなデータ型

参考: java.util.concurrent.atomicパッケージ
java.util.concurrent.atomicパッケージで用意された、スレッドの競合時に自動的に調停を行う主なデータ型は以下の通り。

クラス データ型
AtomicBoolean boolean
AtomicInteger integer
AtomicLong long

AtomicBooleanを用いて、サブスレッド上でフラグチェック(boolean値の読み取り)を行いながら、
メインスレッド上でフラグ操作(boolean値の書き換え)を行うことで、
サブスレッド上で動作する処理をメインスレッドから中断させることができる。

サンプルコード

AsyncStop.java
import java.util.Scanner;
import java.util.concurrent.*;
import java.util.concurrent.atomic.*;

class AsyncStop {
    public static void main(String[] args) {
        // サブスレッドの生成
        SubThreadStoppable t = new SubThreadStoppable();

        // サブスレッド上での処理実行
        t.start();

        // メインスレッド上での処理
        String input = "";
        try (
            Scanner stdIn = new Scanner(System.in);
        ) {
            // "1"が入力されるまでメインスレッドが稼働
            while (!input.equals("1")) {
                input = stdIn.nextLine();
            }

            // "1"が入力された場合は停止フラグをtrueに変更
            if (input.equals("1")) {
                t.stopFlg.set(true);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        try {
            // サブスレッドでの処理終了待機
            // -> 既にサブスレッドが消滅している場合はスキップされる
            t.join();

            System.out.println("stop all threads");
        }
        catch (InterruptedException e) { ; }
    }
}

class SubThreadStoppable extends Thread {
    // スレッドセーフな停止フラグ
    // <- メインスレッドによる「値の書き換え」とサブスレッドによる「値の読み取り」が競合する可能性があるため、
    //    スレッドセーフなAtomicBooleanを利用
    // <- 参照先の書き換えを禁止するためfinalキーワードを付与
    final AtomicBoolean stopFlg = new AtomicBoolean(false);

    // サブスレッド上で動作する処理
    @Override
    public void run() {
        for (int i = 9; i >= 1; i--) {
            // スレッドセーフな停止フラグ
            if (this.stopFlg.get()) {
                // 停止フラグがtrueである場合はループ処理を抜ける
                break;
            }

            System.out.println(i + ".., ");

            try {
                // スリープ処理
                TimeUnit.SECONDS.sleep(1);
            }
            catch (Exception e) { ; }
        }

        if (!this.stopFlg.get()) {
            System.out.println(0);

            System.out.println("input \"1\" to exit");
        }
    }
}
実行結果
9.., 8.., 7.., 1
stop all threads

マルチスレッドプログラミングのデザインパターン

参考1: マルチスレッドプログラミングのデザインパターン
参考2: Java multi-threaded future design pattern
参考3: CompletableFuture(Promiseパターン)
参考4: CyclicBarrier(Barrierパターン)
マルチスレッドプログラミングにおける主なデザインパターンは、以下の通り。

パターン 内容 API
Two-Phase Termination サブスレッドのループ処理内での停止フラグチェック ExecutorService#shutdown()
Timer / Scheduler TimerまたはSchedulerによる
実行タイミングの指定
ScheduledExecutorService
Timer
Thread Pool / WorkerThread スレッドプールまたはワーカースレッドによる
タスクの自動割り当て
ThreadPoolExecutorService
ScheduledThreadPoolExecutorService
Future スレッドの実行結果を格納 Future
Promise スレッドの実行結果を利用する処理の実行 CompletableFuture
Barrier 複数スレッドの終了待機後に処理を実行 CyclicBarrier
Phaser
ReadWrite Lock 読み込みの同時アクセスの許容
書き込み中の読み込みの禁止
ReadWriteLock
Semaphore 複数スレッドによる処理のロック Semaphore

用語集

用語 内容
ユーザスレッド(user thread) JVMによって終了待機が行われるスレッド
デーモンスレッド(daemon thread) JVMによって終了待機が行われないスレッド
ウェイトセット(wait set) synchronizedブロックの実行中に、他スレッドの処理を優先的に実行するために
共有インスタンスが用意するスレッドの退避場所
デッドロック(dead lock) 互いに相手の同期処理の完了を待機しているスレッド同士が、
スレッドセーフが施されていないプログラム中で待機し続ける現象
1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1