はじめに
自己紹介
皆さん、こんにちは、斉藤賢哉と申します。私はこれまで、25年以上に渡って企業システムの開発に携わってきました。特にアーキテクトとして、ミッションクリティカルなシステムの技術設計や、Javaフレームワーク開発などの豊富な経験を有しています。
様々なセミナーでの登壇や雑誌への技術記事寄稿の実績があり、また以下のような書籍も執筆しています。
いずれもJava EE(Jakarta EE)を中心にした企業システム開発のための書籍です。中でも 「アプリケーションアーキテクチャ設計パターン」は、(Javaに限定されない)比較的普遍的なテーマを扱っており、内容的にはまだまだ陳腐化していないため、興味のある方は是非手に取っていただけると幸いです(中級者向け)。
Udemy講座のご紹介
この記事の内容は、私が講師を務めるUdemy講座『Java Advanced編』の一部の範囲をカバーしたものです。『Java Advanced編』はこちらのリンクから購入できます(セールス対象外のためいつも同じ価格)。また定価の約30%OFFで購入可能なクーポンをQiita内で定期的に発行していますので、興味のある方は、ぜひ私の他の記事をチェックしてみてください。
この講座は、以下のような皆様にお薦めします。
- Javaの基本的なスキルを習得済みで、さらなるレベルアップを目指している方
- 将来的なキャリアとして、希少性の高い上級エンジニアやアーキテクトを志向している方
- フリーランスエンジニアとして付加価値の更なる向上を図っている方
- 「Oracle認定Javaプログラマ」の資格取得を目指している方
この記事を含むシリーズ全体像
この記事はJava SEの一部の機能・仕様を取り上げたものですが、一連のシリーズになっており、シリーズ全体でJava SEを網羅しています。また認定資格である「Oracle認定Javaプログラマ」(Silver、Gold)の範囲もカバーしています。シリーズの全体像および「Oracle認定Javaプログラマ」の範囲との対応関係については、以下を参照ください。
3.1 スレッドの仕組みと操作
チャプターの概要
このチャプターでは、非同期・並列処理の考え方や、Javaに言語仕様として備わっているスレッドの仕組みと操作方法について学びます。
3.1.1 スレッドの基本
プロセスとスレッド
プロセスとは、コンピューターにおけるプログラムの実行単位のことです。またスレッドとは、プロセス内で命令を逐次的に実行する単位です。
Javaではメインクラスを実行するとJVMという1つのプロセスが立ち上がりますが、内部的には1つのスレッドが起動され処理が行われます。
メインメソッドによって起動されるデフォルトのスレッドを、メインスレッドと呼びます。
マルチスレッドによる並列処理
コンピュータにおけるタスクの制御方式には、逐次と並列があります。
逐次処理とはシーケンシャル処理とも呼ばれ、タスクを「順番に処理」することです。
また、並列処理とはパラレル処理とも呼ばれ、タスクを「同時に処理」することです。並列処理によってコアの利用率を上げ、プログラム実行時のスループットを高めることが可能になります。
並列処理を実現するためには、1つのプログラムの中で複数のスレッドを起動し、タスクを同時に処理します。これをマルチスレッドと呼びます。
Javaではマルチスレッドを容易に実現するための仕組みが、言語仕様として備わっています。
マルチスレッドは主にサーバーサイドにおいて、複数のリクエストを同時に処理したり、ネットワークIOを効率化したりするために利用されます。このような機能はミドルウェアやフレームワークが内部的に有しているケースが一般的ですが、それらを利用する上でも、マルチスレッドのメカニズムを理解しておくことは非常に重要です。
並行処理と並列処理
コンピューターでタスクを実行するとき、ネットワーク通信などにおいて「待ち状態」が発生すると、CPUをフルに活用することができません。そのような「待ち状態」が発生したとき、タスクを他のタスクに切り替えると、CPUの活用が促進され、システム全体の処理性能を向上させることができます。このように複数の異なるタスクを瞬時に切り替えることで、同時に実行しているかのように見せる技術を並行処理と呼びます。
それに対して並列処理は、複数のタスクを「実際に」同時に処理することを意味します。
並行処理と並列処理は、どのような関係性になるのでしょうか。広義の並行処理は、外形的に同時に実行しているように見えさえすれば方式は問うものではないため、並行処理は並列処理の上位概念に位置付けられる、と考えるのが妥当でしょう。
スレッドと非同期呼び出し
プログラムの呼び出し方式には、同期と非同期があります。
通常のメソッド呼び出しのように、呼び出し先の処理が完了し、その結果を受け取るまで待機する方式を「同期呼び出し」と言います。メソッドの呼び出し元がその応答を待機することを「ブロッキング」と呼ぶことがあります。呼び出し先の処理が終了すると、その実行結果は戻り値として返され、呼び出し元の待機は解放されます。
一方、呼び出し先からの処理の終了を待機しない方式を「非同期呼び出し」と言います。スレッドの起動は、一種の非同期呼び出しです。
あるスレッドFooから別のスレッドBarを起動すると、Fooは、Barの処理を待たずして処理を続けることができます。このときFooとBarはマルチスレッドの状態となり、並列処理が行われます。ただしFooが後からBarの実行結果を受け取るためには、同期呼び出しよりも複雑な仕組みが必要です。
なおチャプター3.2で取り上げるExecutorフレームワークを利用すると、起動したスレッドの実行結果を容易に受け取ることが可能です。
スレッドの実装方法(1)
それでは、Javaによるスレッドプログラミングを具体的に見ていきましょう。
スレッドはJava SEのクラスライブラリを利用して作成します。スレッドにはそのスレッド上で実行されるタスクを割り当てる必要がありますが、スレッドとタスクの作成方法には以下の2つがあります。
- スレッドを表すクラスを作成し、その中にタスクを直接実装する方法
- スレッドとタスクを別々のクラスとして実装する方法
ここではまず1.の方法を取り上げます。
以下に、非同期タスクを実装するためのスレッド(MyThreadクラス)のコードを示します。このスレッドは数値を属性として持ち、それを1000億回足し込んで計算結果をコンソールに表示する、というタスクが実装されています。
public class MyThread extends Thread { //【1】
private int data; //【2】
public MyThread(int data) { //【3】
this.data = data;
}
@Override
public void run() { //【4】
System.out.println("[ MyThread ] start");
long total = 0;
for (long i = 0; i < 100_000_000_000L; i++) { // 1000億回
total += data;
}
System.out.println("[ MyThread ] finish, total => " + total);
}
}
スレッドは、java.lang.Threadを継承して作成します【1】。
そしてThreadクラスから継承されたrun()メソッドをオーバーライドして、タスクの処理内容を実装します【4】。run()メソッドの宣言は引数、戻り値を持つことはできず、チェック例外をthrows句に指定することもできません。
run()メソッドは引数を取らないため、生成されるスレッドごとに動作を切り替えるためにはフィールドを用います。このスレッドでは、dataフィールド【2】と、それを初期化するためのコンストラクタ【3】を定義しています。
このようにして作成したスレッドを、メインスレッド(Mainクラス)からは以下のように起動します。
public class Main {
public static void main(String[] args) { // メインスレッド
MyThread t = new MyThread(3); //【1】スレッドを生成する
System.out.println("[ Main ] start");
t.start(); //【2】スレッドを起動する
System.out.println("[ Main ] finish");
}
}
まずnew演算子によって、既出のMyThreadクラスのインスタンスを生成します【1】。
次にスレッドを起動するために、生成したインスタンスのstart()メソッドを呼び出します【2】。もちろんスレッドの生成と起動を1ラインにまとめてnew MyThread(3).start()
としても構いません。
start()メソッドはThreadクラスに定義されたメソッドで、このメソッドを呼び出すと当該のスレッドが非同期に起動されます。ここでスレッドのrun()メソッドを呼び出すと、当該のスレッドが同期で呼び出されてしまい、マルチスレッドにならないため注意してください。
MainクラスとMyThreadクラスの呼び出し関係は、以下の図のような処理シーケンスになります。このようにMainクラスはMyThreadクラスを呼び出した後、応答を待機しないため、両者は並列で処理が進みます。
Mainクラスを実行するとMyThreadクラスが非同期に起動され、コンソールには以下のように表示されます。
[ Main ] Start
[ Main ] Finish
[ MyThread ] Start
[ MyThread ] Finish, total => 300000000000
スレッドの実装方法(2)
ここではもう一つの方法である「スレッドとタスクを別々のクラスとして実装する方法」を取り上げます。
まずタスクはjava.lang.Runnableインタフェースをimplementsして作成します。Runnableインタフェースには唯一run()メソッドが定義されており、その宣言は以下のとおりです(Threadクラスのrun()と同様)。
- void run()
既出のMyThreadに実装されたものと同じタスクは、以下のように作成します。
public class MyRunnable implements Runnable {
private int data;
public MyRunnable(int data) {
this.data = data;
}
@Override
public void run() {
// 既出のMyThreadクラスのrun()メソッドと同じ内容
........
}
}
run()メソッドの実装は、前項で取り上げたThreadクラスを使用する方法と同じです。
この方法では、作成したタスクをスレッドに割り当てて実行します。具体的には以下のようにします。
MyRunnable r = new MyRunnable(3); //【1】
Thread t = new Thread(r); //【2】スレッドを生成する
t.start();
まずタスクのインスタンスを生成します【1】。
そしてタスクのインスタンスをThreadクラスのコンストラクタに指定して、スレッドを生成します【2】。
なおタスクはRunnableをimplementsしたクラスとして個別に作成するのではなく、以下のように匿名クラスとして実装することもできます。
int data = 3;
Thread t = new Thread(new Runnable() {
@Override
public void run() {
// 既出のMyThreadクラスのrun()メソッドと同じ内容
........
}
});
このコードは、ラムダ式(チャプター4.1参照)を使うとさらに簡潔に記述することができます。
int data = 3;
Thread t = new Thread(() -> {
// 既出のMyThreadクラスのrun()メソッドと同じ内容
........
});
このようにスレッドには2つの作成方法がありますが、その違いは「スレッドとタスクを一体で管理するか否か」であり、どちらが優位ということはありません。
本チャプターでは基本的に「スレッドを表すクラスを作成し、その中にタスクを直接実装する方法」を採用しますが、「スレッドとタスクを別々のクラスとして実装する方法」については以降のチャプターでも登場します。
ThreadクラスのコンストラクタとAPI
java.lang.Threadは、マルチスレッドを実現するための最も基本的なクラスです。
Threadクラスには名前、タスクといった属性があり、それらを初期化するために以下のようなコンストラクタが用意されています。
- Thread(String) … スレッド名を初期化する
- Thread(Runnable) … タスクを初期化する
- Thread(String, Runnable) … スレッド名とタスクを初期化する
またThreadクラスの主要なAPIには、以下のようなものがあります。
API(メソッド) | 説明 |
---|---|
long getId() | スレッドIDを取得する |
String getName() | スレッド名を取得する |
void setName(String) | スレッド名を設定する |
void join() | このスレッドが終了するのを待機する |
void start() | このスレッドを非同期に起動する |
void interrupt() | このスレッドに割り込む |
static void sleep(long) | 自身のスレッドが、指定された時間(ミリ秒)一時停止する |
static Thread currentThread() | 自身のスレッド(現在実行中のスレッド)のインスタンスを返す |
ClassLoader getContextClassLoader() | このスレッドのコンテキストクラスローダー(チャプター9.1参照)を返す。 |
なおThreadクラスにはrun()メソッドもありますが、このメソッドを直接呼び出すことはないため、この表には記載していません。
スレッドの一時停止
Threadクラスのsleep()メソッド(スタティックメソッド)により、実行中のスレッドを指定されたミリ秒分、一時停止させることができます。
try {
Thread.sleep(5000); // 5000ミリ秒間、一時停止
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
なお一時停止中に外部からinterrupt()メソッドを呼び出すと割り込みが発生し、InterruptedExceptionという例外が発生します。これはチェック例外なので例外ハンドリングが必要です。
複数スレッドによる並列処理
ここではメインスレッドから複数のスレッドを起動し、それらを並列に実行するときの挙動を見ていきましょう。
まず非同期タスクを表すスレッドとして、以下のMyParaThreadクラスを作成します。
public class MyParaThread extends Thread {
private long timer;
public MyParaThread(String name, long timer) {
super(name);
this.timer = timer;
}
@Override
public void run() {
System.out.println("[ MyParaThread ] start, name => " + getName());
sleepAWhile(timer); //【1】指定されたミリ秒間、一時停止するユーティリティ
System.out.println("[ MyParaThread ] finish, name => " + getName());
}
}
このスレッドのタスクは、timerフィールドに設定されたミリ秒間、処理を一時停止するだけの簡易な処理を行うものです。一時停止するためには既出のsleep()メソッドを使いますが、例外ハンドリングが必要のためここではユーティリティ化しています【1】。
次にこのスレッドの起動元になるメインスレッドです。メインスレッドでは、以下のようにMyParaThreadのインスタンスを2つ生成し、並列で起動します。
// 1つ目のスレッドを生成し、起動する
MyParaThread t1 = new MyParaThread("foo", 8000L);
t1.start();
// 2つ目のスレッドを生成し、起動する
MyParaThread t2 = new MyParaThread("bar", 5000L);
t2.start();
このメインスレッドを実行すると、以下のような処理シーケンスで並列処理が行われます。
スレッドの待ち合わせ
前項の例では、メインスレッドから2つのスレッドを起動すると、メインスレッドは直ちに終了します。
スレッドの起動元において、スレッドを起動した後その終了を待機し、終わり次第、後続の処理を行いたい場合があります。このような場合は、Threadクラスのjoin()メソッドを使います。
前項のメインスレッドのコード(pro.kensait.java.advanced.lsn_3_1_1.parallel.Main)で、2つ目のスレッドを起動した後、以下のような命令を追加します。
t1.join(); // t1の処理終了を待機する
t2.join(); // t2の処理終了を待機する
// 待機後の後続処理
........
このようにスレッドの起動元で、起動したスレッドのjoin()メソッドを呼び出すと、その終了を待機することができます。
このメインスレッドを実行すると、以下のような処理シーケンスで並列処理が行われます。
スレッドの状態遷移
ここでは、スレッドが生成されてから廃棄されるまでのライフサイクルについて説明します。以下の状態遷移図を見てください。
まずstart()メソッドを呼び出すと、「初期状態」から「実行可能状態」に遷移しますが、必ずしも直ちに実行されるわけではありません。「実行可能状態」とはCPUの割り当てを待機する状態で、割り当てが行われるとrun()メソッドが呼び出され「実行状態」になります。「実行状態」になるとタスクが実行されますが、sleep()メソッドによる一時停止、join()メソッドによる待ち合わせ、wait()メソッドによる待機、同期化によるロック解放待ちがあると、スレッドは「待機状態」1になりタスクは中断します。「待機状態」からは、sleep()メソッドの終了、join()メソッドの待ち合わせ終了、notify()メソッドによる再開、ロック取得によって「実行可能状態」に戻り、再びCPUが割り当てられるとタスクが再開されます。最終的にはrun()メソッドが終了すると「終了状態」になります。
3.1.2 スレッドセーフと同期化
スレッドセーフとは
マルチスレッド環境では、複数のスレッド間で1つのインスタンスを共有している場合、複数スレッドから同時に共有インスタンスへのアクセスがあると、問題が発生する可能性があります。
ここではまず、この問題が発生するメカニズムを具体例に基づいて説明します。
以下に3つのクラスのコードを示します。
Sharedクラスは、複数スレッド間で共有されるクラスを表します。
次にMyThreadクラスはスレッドを表すクラスで、フィールドとしてShared型を持ちます。
そしてMainクラスはメインスレッドで、このスレッド(MyThreadクラス)を複数生成して起動します。
public class Shared {
private int data; //【1】
public Shared(int data) {
this.data = data;
}
//【2】dataフィールドへの更新処理
public void addData(int num) {
int tmp = this.data + num;
sleepAWhile(10); //【3】10ミリ秒間、一時停止
this.data = tmp;
}
public int getData() {
return data;
}
}
public class MyThread extends Thread {
private Shared shared; //【4】共有インスタンス
private int num; // 加算値
public MyThread(Shared shared, int num) {
this.shared = shared;
this.num = num;
}
@Override
public void run() {
for (int i = 0; i < 10; i++) {
sleepRandomTime(400, 500); // 400~500ミリ秒間、一時停止する
shared.addData(num); //【5】
System.out.println(num + "加算する => " + shared.getData());
}
}
}
public class Main {
public static void main(String[] args) throws Exception {
//【6】共有インスタンスを生成する
Shared shared = new Shared(0);
//【7】1つ目のスレッドを生成し、起動する
MyThread t1 = new MyThread(shared, 1);
t1.start();
//【8】2つ目のスレッドを生成し、起動する
MyThread t2 = new MyThread(shared, 2);
t2.start();
// 待ち合わせする
t1.join();
t2.join();
System.out.println("最終的な値 => " +
shared.getData()); //【9】 本来は30になるはずだが…
}
}
これらのクラスを順番に見ていくと、まずSharedクラスはdataフィールドを保持し【1】、addData()メソッドで指定された値を加算します【2】。
MyThreadクラスは、このSharedクラスをフィールドとして保持しています【4】。そしてrun()メソッドの中で10回分、SharedクラスのaddData()メソッドを呼び出す【5】ことで、加算値(num)を足し込んでいます。
Mainクラスは、Sharedインスタンスを生成したら【6】、2つのMyThreadを生成し、起動しています【7、8】。このときそれぞれのスレッドに、生成済みのSharedインスタンスを渡しています。このようにすると同一のSharedインスタンスが、複数のスレッド間で共有されることになります。
また2つのMyThreadを生成するときの第二引数には、それぞれ加算値として1と2を渡しています【7、8】。
従ってMainクラスを実行すると、1の加算が10回、2の加算が10回、それぞれ行われるため、Sharedインスタンスのdataフィールドの値は最終的には30になるはずです【9】。
ところがその結果は毎回異なったものになり、30になる保証はありません。その理由は、SharedクラスのaddData()メソッドが「アトミック」ではないからです。アトミックとは、割り込みが発生しえない状態を意味します。
つまりあるスレッドがこのメソッドを呼び出し、dataフィールドの書き換えを実施しようとしている最中に、後から別のスレッドがこのメソッドを呼び出すと、処理の割り込みが発生し、先発のスレッドによる更新が失われてしまう可能性があるのです。なおaddData()メソッドの中で10ミリ秒間、一時停止している【3】のは、意図的にスレッド間の割り込みを発生しやすくするための処理です。
このように複数のスレッド間で1つのインスタンスを共有している場合、複数スレッドから同時に更新が行われると、状態が不正になる可能性があるため注意が必要です。繰り返しになりますが、更新されること自体が問題なのではなく、更新の過程において、スレッド間の割り込みが発生する点に問題があるのです。
従ってマルチスレッド環境では、共有インスタンスに対して以下のいずれかの対策を施すことで、不正な更新を回避する必要があります。
- 状態(フィールド)への更新処理を「同期化」する
- イミュータブルオブジェクトにする
このような対策が施されたインスタンスを「スレッドセーフである」と言います。
同期化とは
前項で説明した「マルチスレッド環境における不正な更新」を回避するための1つの方法が、共有インスタンスへの更新処理を「同期化」することです。
同期化とは、スレッドが何らかのメンバーにアクセスしたときにロックを取得し、処理を終えてロックが解放されるまで、別のスレッドからのアクセスを待機させることを意味します。同期化によるロックの仕組みは「排他制御」とも呼ばれます。
Javaで同期化を実現するためには、synchronizedキーワードを使用します。
synchronizedキーワードは、修飾子としてメソッドに付与するケースと、メソッド内でフィールドを指定してブロックとして使うケースがあります。
まずsynchronizedを修飾子として使い、フィールドを更新するためのメソッドを同期化するには、以下のようにします。
synchronized 戻り値型 メソッド名(....) {
....同期化された処理....
}
またsynchronizedをブロックとして使い、フィールドへの更新を同期化するには、以下のようにします。
synchronized (フィールド名) {
....同期化された処理....
}
それでは前項のSharedクラスを再び取り上げ、addData()メソッドを同期化してみましょう。
synchronized public void addData(int num) {
int tmp = this.data + num;
sleepAWhile(10); // 10ミリ秒間、一時停止
this.data = tmp;
}
メソッドにsynchronized修飾子を付与すると、メソッド全体が同期化されます。このようにすれば、Sharedインスタンスが複数スレッド間で共有されても、addData()メソッド内で割り込みは発生しないため、状態が不正になることはありません。
デッドロックとその発生メカニズム
スレッドが複数の共有インスタンを保持しており、それぞれの更新を同期化する場合は、その順番に留意する必要があります。
例えばスレッドt1とt2があり、それぞれ共有インスタンスAとBを保持しているものとします。そしてt1がA→Bという順番でロックを取得し、逆にt2はB→Aという順番でロックを取得するものとします。このときt1とt2が完全に同じタイミングで起動されると、t1はAのロックを、t2はBのロックをそれぞれ取得します。次にt1はBへのロック取得を試みますが、t2によって待機させられます。またt2はAへのロック取得を試みますが、t1によって待機させられます。
このように2つのスレッドが複数のインスタンスを共有するとき、ロックの順番が異なることによりお互いがお互いのロック解放待ちになり処理が停止してしまうことを、デッドロックと言います。
デッドロックの発生をコードで確認してみましょう。
まずはスレッドのコードを示します。
public class DeadlockThread extends Thread {
private String name;
private Shared shared1; // 共有インスタンス1
private Shared shared2; // 共有インスタンス2
private int num; // 加算値
public DeadlockThread(String name, Shared share1, Shared share2,
int num) {
this.name = name;
this.shared1 = share1;
this.shared2 = share2;
this.num = num;
}
@Override
public void run() {
while (true) {
//【1】まず共有インスタンス1をロックする
synchronized (shared1) {
// 共有インスタンス1の値にスレッドが保持している加算値を足し込む
System.out.println("[ " + name + ", shared1 ] " + shared1.getData());
shared1.addData(num);
System.out.println("[ " + name + ", shared1 ] " + shared1.getData());
//【2】共通インスタンス1にロックをかけたまま、共有インスタンス2をロックする
synchronized (shared2) {
// 共有インスタンス2の値にスレッドが保持している加算値を足し込む
System.out.println("[ " + name + ", shared2 ] " + shared2.getData());
shared2.addData(num);
System.out.println("[ " + name + ", shared2 ] " + shared2.getData());
}
}
}
}
}
run()メソッドでは、まずsynchronizedブロックによって共有インスタンス1(shared1)のロックを取り、加算値を足し込みます【1】。
そしてそのロックを保持したまま共有インスタンス2(shared2)のロックを取り、同じように加算値を足し込みます【2】。
次にこのスレッドを起動するメインスレッドのコードです。
//【1】初期値が0の共有インスタンスを生成する
Shared instanceA = new Shared(0);
//【2】初期値が10000の共有インスタンスを生成する
Shared instanceB = new Shared(10000);
//【3】加算値が5のスレッド"t1"を生成し、起動する
DeadlockThread t1 = new DeadlockThread("t1", instanceA, instanceB, 5);
t1.start();
//【4】加算値が7のスレッド"t2"を生成し、"t1"から1秒遅れて起動する
sleepAWhile(1000);
DeadlockThread t2 = new DeadlockThread("t2", instanceB, instanceA, 7);
t2.start();
このコードを実行すると、スレッド"t1と"t2"の間で、以下の図のようなメカニズムでデッドロックが発生します。
メインスレッドでは、まず2つの共有インスタンス、instanceAとinstanceBを生成しています【1、2】。
次に1つ目のスレッドt1を生成しますが、コンストラクタにはinstanceA、instanceBの順に共有インスタンスを指定しています【3】。
さらに2つ目のスレッドt2を生成しますが、コンストラクタには、t1とは逆に、instanceB、instanceAの順に指定しています【4】。
このように2つの共有インスタンスを持つ2つのスレッドを生成し、それぞれを起動します。するとしばらくは順調に処理が進んでいきますが、ロックの順番が異なることに起因していずれはデッドロックが発生し、処理が中断してしまいます。
このように複数のインスタンスをスレッド間で共有する場合は、ロックの取得順を統一するようにしないと、深刻な不具合を引き起こす可能性があるため、十分な注意が必要です。
ReentrantReadWriteLockによる明示的なロック
ReentrantReadWriteLockを利用すると、明示的にロックを取得したり解放したりすることができます。また書き込みロックと読み込みロック、2つのロックを切り替えて使用することができます。
ReentrantReadWriteLockクラスは、Java SEのクラスライブラリ(java.util.concurrent.locks.ReentrantReadWriteLock)として提供されます。
ここでは既出のSharedクラスを、ReentrantReadWriteLockクラスによって排他制御を行うように修正します。
public class Shared {
private int data;
private ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); //【1】
public Shared(int data) {
this.data = data;
}
// dataフィールドの書き込み
public void addData(int num) {
WriteLock writelock = lock.writeLock();
writelock.lock(); //【2】
try {
int tmp = this.data + num;
sleepAWhile(10); // 10ミリ秒間、一時停止
this.data = tmp;
} finally {
writelock.unlock(); //【3】
}
}
// dataフィールドの読み込み
public int getData() {
ReadLock readLock = lock.readLock();
readLock.lock(); //【4】
try {
return data;
} finally {
readLock.unlock(); //【5】
}
}
}
まずReentrantReadWriteLockのオブジェクトを生成し、フィールドに設定しておきます【1】。
dataフィールドに書き込みを行うためのaddData()メソッドでは、処理の先頭でwriteLock()メソッドを呼び出し、書き込みロックを取得します【2】。書き込みロックを取得すると、他のスレッドからは、書き込みロック、読み込みロック、いずれも取得できずに待機が発生します。
書き込み処理が終わったら、確実にロックを解放するために、finallyブロックでunlock()メソッドを呼び出し【3】、ロックを解放します。
同じようにdataフィールドの読み込みを行うgetData()メソッドでは、処理の先頭でreadLock()メソッドを呼び出し、読み込みロックを取得します【4】。読み込みロックを取得すると、他のスレッドからは、書き込みロックは取得できませんが、読み込みロック同士の競合は許容されます。
読み込み処理が終わったら、finallyブロックでロックを解放する【5】点は、書き込みロックと同様です。
イミュータブルによるスレッドセーフの実現
スレッドセーフにするためのもう1つの方法は、共有インスタンスをイミュータブルにする、というものです。イミュータブルとは『Java Basic編』で既出のとおり、一度インスタンスを生成したら、その後は値の変更ができないことを表します。
クラスをイミュータブルにするためには、フィールドはすべてfinal、アクセサメソッドはゲッターのみにし、一度のコンストラクタ呼び出しですべてのフィールドを初期化できるようにします。
イミュータブルであれば、そもそもフィールドを書き換えることはできないため、マルチスレッド環境であっても不正な更新は発生しません。ただし実はこのようにするだけでは、完全にスレッドセーフになるとは限りません。というのもゲッターによって参照型変数が返される場合は、その値を書き換える処理がアトミックではないと、不正な更新が発生する可能性が否定できないためです。
【図3-1-9】イミュータブルオブジェクトであっても発生する不正な更新
そこで共有インスタンスをイミュータブルにするだけではなく、内部で保持するフィールドも、すべてイミュータブルにします。具体的にはすべてのフィールドを、Java SEのクラスライブラリが提供するStringクラス、ラッパークラス、BigDecimalクラスなどのイミュータブルな型にするのです。こうすればゲッターでフィールドを取得した後、値の書き換えはできないため、完全なスレッドセーフを実現できます。
3.1.3 スレッド間通信
スレッド間通信とは
スレッド間通信とは、複数のスレッドが協調して動作するための仕組みです。
既出のjoinによるスレッドの待ち合わせや、チャプター3.2で取り上げるExecutorフレームワークも、スレッド間が協調するための仕組みを提供しますが、これらはいずれも「起動元のスレッド」と「起動されたスレッド」の間の協調を実現するための仕組みです。
ここで取り上げるスレッド間通信は、起動されたスレッド同士が共有するインスタンスを介して協調するため、その点が異なります。
ウェイトセットによるスレッド間通信
スレッドの実行中に任意のクラスでwait()メソッドを呼び出すと、実行中スレッドを「スレッドが実行されているインスタンスが保持するウェイトセット」に追加し、処理を待機させることができます。ウェイトセットとは待機スレッドを格納するための機構で、すべてのインスタンスが持っています。
wait()メソッドは、スレッドがロックを持った状態(synchronized修飾子の付与されたメソッド内など)で呼び出す必要があるため、注意が必要です。なおwait()メソッドは、java.lang.Objectで定義されたメソッドなので、あらゆるクラスに定義されています。
このようにスレッドを待機させた状態で、別のスレッドから当該インスタンスのnofity()メソッド、またはnotifyAll()メソッドを呼び出すと、ウェイトセットに通知が送られます。nofity()メソッドの場合は、ウェイトセット内の任意のスレッドが1つだけ再開され、notifyAll()メソッドの場合は、ウェイトセット内のすべてのスレッドが再開されます。なおnofity()メソッドとnotifyAll()メソッドも、java.lang.Objectで定義されたメソッドのため、あらゆるクラスが暗黙的に保持しています。
ウェイトセットの具体例
それではウェイトセットによるスレッド間通信を、具体例で説明します。
ここでは役割の異なる2つのスレッド、FooThreadとBarThreadがあり、それぞれがSharedクラスを共有します。この2つのスレッドはメインスレッドから同時に起動されますが、FooThreadで処理が行われていく途中で、BarThreadによって行われる計算処理の結果が必要になるものとします。ただしBarThreadの計算処理には時間がかかるため、FooThreadはその計算が終わるまで待機し、終わり次第その結果を受け取ります。
このような処理を行うためのコードを順に示します。
まずは複数スレッド間で共有されるクラス(Sharedクラス)です。
public class Shared {
private Integer data;
public Shared(Integer data) {
this.data = data;
}
//【1】計算結果が格納されるまで待機し、その後値を返す
public synchronized Integer getAndWaitData() {
if (data == null) {
try {
wait(); //【2】
} catch (InterruptedException ie) {
throw new RuntimeException(ie);
}
}
return data;
}
//【3】計算結果を格納し、通知する
public synchronized void setAndNotifyData(Integer data) {
this.data = data;
notify(); //【4】
}
}
次にFooThreadのrun()メソッドを示します。
@Override
public void run() {
System.out.println("[ FooThread ] start");
Integer result = shared.getAndWaitData(); //【5】
System.out.println("[ FooThread ] finish, result => " + result); //【6】
}
次にBarThreadのrun()メソッドです。
@Override
public void run() {
System.out.println("[ BarThread ] start");
// 5000ミリ秒かかる「比較的複雑な計算処理」を行い、結果が12345とする
sleepAWhile(5000);
int result = 12345;
// 共有インスタンスに結果を設定する
shared.setAndNotifyData(result); //【7】
System.out.println("[ BarThread ] finish");
}
最後にこれらのスレッドを起動するメインスレッドのコードを示します。
// 共有インスタンスを生成する
Shared shared = new Shared(0);
// FooThreadを生成し、起動する
new FooThread(shared).start();
// BarThreadを生成し、起動する
new BarThread(shared).start();
このプログラムのポイントは、2つのスレッド、FooThreadとBarThread間の協調を、共有するSharedインスタンスを介して行っている点にあります。
SharedクラスのgetAndWaitData()メソッド【1】はdataフィールドの値(計算結果)を返すためのものですが、null値の場合は計算が終わっていないと見なし、wait()メソッドを呼び出します【2】。
次にsetAndNotifyData()メソッド【3】は外部から受け取った計算結果を格納し、計算が終わったことをnotify()メソッドで通知します。
メインスレッドを実行すると、Sharedクラスを共有するFooThreadとBarThreadが生成され、それぞれが起動されます。まずFooThreadでは、SharedクラスのgetAndWaitData()メソッドを呼び出します【5】が、これによってFooThreadがウェイトセットに入り、待機状態に入ります。またBarThreadでは「比較的複雑な計算処理」が行われ、それが終わり次第、SharedクラスのsetAndNotifyData()メソッドを呼び出します【7】。これによってウェイトセットに入っていたFooThreadの処理が再開され、取得した計算結果をコンソールに表示します【6】。
【図3-1-11】ウェイトセットによるFooThreadとBarThread間の協調
3.1.4 スレッドローカル
スレッドローカルとは
スレッドローカルとは、各スレッドに固有のデータ格納領域を提供するための機能です。スレッドローカル上の変数は、同一のスレッド内からはどこからでもアクセス可能ですが、他のスレッドからは完全に分離されます。
Javaではスレッドローカルは、java.lang.ThreadLocalクラスによって実現されます。
スレッドローカルの典型的な使用方法
ここでは、スレッドローカルの典型的な使用方法を見ていきましょう。
まずThreadLocalクラスを保持するためのクラス(ThreadLocalHolderクラス)を作成します。
public class ThreadLocalHolder {
private static ThreadLocal<Map<String, String>> context =
new ThreadLocal<>() { //【1】
@Override
public Map<String, String> initialValue() { //【2】
return new HashMap<String, String>();
}
};
public static Map<String, String> get() {
return context.get();
}
}
このクラスでは、ThreadLocal>型のスタティックフィールド(context)を定義しています【1】。
ThreadLocalのインスタンスは、initialValue()メソッドをオーバーライドして生成しますが、ここでは匿名クラスを利用しています【2】。このようにすると、各スレッドに固有のMap型のデータ格納領域が作られます。
それでは作成したThreadLocalHolderクラスを利用して、スレッド内でデータを格納したり取得したりしてみましょう。
public class MyThread extends Thread {
@Override
public void run() {
ThreadLocalHolder.get().put("name", "Alice"); //【1】
Foo foo = new Foo();
foo.process(); //【2】
}
}
public class Foo {
public void process() {
String value = ThreadLocalHolder.get().get("name"); //【3】
System.out.println("[ Foo#process ] name => " + value);
}
}
まずMyThreadクラスのrun()メソッドでは、ThreadLocalHolderのget()メソッドにより、スレッドローカル上のマップ(Map型)を取得しています【1】。そして取得したマップに対して、特定のキー("name")で値を格納しています。
次にFooインスタンスを生成し、process()メソッドを呼び出します【2】。
Fooクラスのprocess()メソッドでは、スレッドローカル上のマップから同じキー("name")で値を取得しています【3】。するとrun()メソッドで格納した値("Alice")を取り出すことができます。
このようにスレッドローカルを使うと、同一のスレッド内であれば、どこからでもアクセス可能なデータ格納領域を作ることができます。
ただしスレッドローカルは、同一スレッド内限定のグローバル変数のような位置付けのため、多用すると処理の流れや構造が把握しにくいコードになります。また本コースでは取り上げませんが、Jakarta EE(旧Java EE)などの企業システム向けのJavaアプリケーション実行環境では、スレッドはプーリングされて使い回されるケースが多く、スレッドローカルを適切にクリアしないと、思わぬ不具合を引き起こす可能性があります。特にJakarta EEの場合は、スコープと呼ばれる独自の記憶領域が提供されるため、その機能を使う方が適切です。
このチャプターで学んだこと
このチャプターでは、以下のことを学びました。
- マルチスレッドによる並列処理の実現方法について。
- 並行処理と並列処理の概念について。
- スレッドによる非同期呼び出しについて。
- スレッドの実装方法について。
- スレッドの一時停止や待ち合わせについて。
- スレッドの状態遷移について。
- スレッドセーフの概念や同期化による排他制御について。
- デッドロックとその発生メカニズムについて。
- ReentrantReadWriteLockによる明示的なロックについて。
- イミュータブルによるスレッドセーフの実現について。
- ウェイトセットによるスレッド間通信の仕組みについて。
- スレッドローカルによるデータの受け渡し方法について。
-
厳密には"WAITING"、"TIMED_WAITING"、"BLOCKED"といった状態があるが、ここでは分かりやすさを重視して「待機状態」という名称で一まとめにしている。なおjoin()やwait()の場合は"WAITING"に、sleep()の場合は"TIMED_WAITING"に、同期化によるロック解放待ちの場合は"BLOCKED"に、それぞれ遷移する。 ↩