はじめに
この記事はExecutorフレームワークのありがたみを知るシリーズ第2弾、Executors#newFixedThreadPoolに関する記事です。
関連記事:
Executors#newSingleThreadExecutor編
Executors#newCachedThreadPool編
Callable/Future編
参考:
- 3.1 スレッドの仕組みと操作(マルチスレッド、並行処理、並列処理、非同期呼び出し、スレッドセーフ、同期化、デッドロック、ウェイトセット、スレッドローカルなど)~Java Advanced編
- 3.2 並行処理ユーティリティとExecutorフレームワーク(スレッドプール、ExecutorService、Callable、Futureなど)~Java Advanced編
- ExecutorServiceを使って、Javaでマルチスレッド処理
自作スレッドの問題点
Executors#newSingleThreadExecutor編と同様に、以下の「受け取った数値になんらかの計算を施す」タスクについて考えます。
public class MyTask implements Runnable{
private int value;
public MyTask(int value) {
this.value = value;
}
@Override
public void run() {
System.out.println("[start] value:" + value + " thread-id:" + Thread.currentThread().getId());
// 何か複雑な処理
System.out.println("[ end ] value:" + value+ " thread-id:" + Thread.currentThread().getId());
}
}
今回は、Executors#newFixedThreadPoolを使ってみます。
以下は自作スレッドの場合の問題点ですが、Executors#newFixedThreadPoolを使って解決できるものをまとめています。
スレッド数の管理ができない
メインスレッドから複数のスレッドを立ち上げて並列処理することを考えます。
何も制限がない時は以下のように書けます。
public static void main(String[] args) {
List<Integer> valueList = Arrays.asList(1,2,3,4,5);
for(int value: valueList) {
new Thread(new MyTask(value)).start();
}
}
この例では5個ですが、リストの要素が5万個あったらどうでしょう。
上記のコードはひたすらにスレッドをnewしまくります。OSが管理できるスレッド数の上限に達したり、メモリやスタック領域を圧迫して、新たなスレッドを作成することができなくなり、処理が止まります。
また、CPUコア数を無視した膨大なスレッド数だと、スレッドの切り替えにコストがかかり、並列処理をしているはずなのに直列よりも遅い、といった結果を招く恐れもあります。
スレッドのライフサイクル管理が面倒
上記の通り無制限にスレッドを作り続けるのは問題なので、スレッド数を3個に制限することを考えてみましょう。
リストの3つ目の要素まではスレッドをnewして続けます。さて、4つ目まできたら、先に開始した3つのうちどれかが終了するまで待ちます。どれかが終了したら、4つ目を処理させます。
「どれかが終了した」というのは、Object#wait, Object#notifyを使えば実現できるのですが、こういった低レベルなメソッドは普段使う機会もなく、いきなり使うのはかなり難易度が高いと思います。
(例えばnotifyを忘れる(単に書き忘れる/例外処理の中で実装が漏れる)と、デッドロックが発生します)
また、「待つ」という部分について。
もちろん、メインスレッドを待たせては本末転倒ですから、別の「監視スレッド」を立てておく必要がありますね。
この監視スレッドが、タスクの完了を検知し、次のタスクを実行させるイメージです。
Executors#newSingleThreadExecutor編でも言及したように、このようなスレッドを作らなければいけないとなると、複雑度が上がります。
また、全てのタスクが完了した後は、監視スレッドを含め、全てのスレッドを終了させる必要があります。
生き残っているスレッドがあると、アプリケーションを安全に終了できません。
スレッドの終了まで神経を使う必要があります。
自作スレッドプールのサンプル
複数のスレッドを使い回しながら並列処理をする、つまりこれはスレッドプールの仕組みです。
チャッピーに自作スレッドプールのサンプルを書いてもらいました。
public class SimpleThreadPool {
private final int maxThreads = 5;
private int runningThreads = 0;
private final Queue<Runnable> taskQueue = new LinkedList<>();
public synchronized void submit(Runnable task) {
taskQueue.add(task);
notifyAll(); // タスク追加を知らせる
}
public void start() {
// デーモンスレッド(監視係)を1つ作る
Thread dispatcher = new Thread(this::dispatchLoop);
dispatcher.setDaemon(true);
dispatcher.start();
}
private void dispatchLoop() {
while (true) {
Runnable task;
synchronized (this) {
while (taskQueue.isEmpty() || runningThreads >= maxThreads) {
try {
wait(); // タスク追加 or スレッド空きを待つ
} catch (InterruptedException e) {
return;
}
}
// ここに来たらタスクがあるしスレッド枠もある
task = taskQueue.poll();
runningThreads++;
}
// ワーカーを立ち上げる
Thread worker = new Thread(() -> {
try {
task.run();
} finally {
synchronized (SimpleThreadPool.this) {
runningThreads--;
SimpleThreadPool.this.notifyAll(); // 空きができた
}
}
});
worker.start();
}
}
}
まずは長くて読む気が失せますね。
また、読んでみるとわかりますが、「ここで処理が止まって、こっちが動いて、もしここが終わったら・・・」とかなりいろんな場所を行ったり来たりする必要があります。
かなりシンプルなコードにしてくれていますが、それでも理解に体力を使いますよね。
読むだけでこんなに辛いのに、まして書くことになったら・・・ゾッとしますね。
ExecutorServiceを使って解決する
それではExecutors#newFixedThreadExecutorを見てみます
スレッド数を管理する
Executors#newFixedThreadExecutorは固定数のスレッドを作成するものです。
以下の通り書けます。
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Integer> valueList = Arrays.asList(1,2,3,4,5);
for(int value: valueList) {
executor.submit(new MyTask(value));
}
}
// コンソールの出力:
// [start] value:1 thread-id:20
// [start] value:3 thread-id:22
// [start] value:2 thread-id:21
// [ end ] value:1 thread-id:20
// [ end ] value:2 thread-id:21
// [ end ] value:3 thread-id:22
// [start] value:4 thread-id:21
// [ end ] value:4 thread-id:21
// [start] value:5 thread-id:22
// [ end ] value:5 thread-id:22
// ※20,21,22の3つのスレッドで処理されている
newFixedThreadExecutorの引数に「3」を渡していますので、ここでは3つのスレッドを作成しています。
コンソールの出力から、確かに3つのスレッドで処理が進んでいることがわかります。
スレッド数の管理のために我々がやることは、引数にスレッド数を渡すことのみです。簡単ですね。
スレッドのライフサイクル管理をする
スレッドのライフサイクルの管理をする・・・必要はありません。
我々が考えるべきことは何もなく、とにかく何このスレッドを作るのかを決めて、引数として渡すだけです。
あとはすべてExecutorServiceがやってくれます。
そのおかげで、我々はMyTaskに集中できるというわけです。
おわりに
ぜひ一度、スレッドプールを作ろうとしてみて欲しいと思います。
作れなくて良いです。作ろうとしただけで、ExecutorServiceのありがたみが身に染みると思います。
(この記事執筆のために途中まで書きまして、やめました。すぐやめたくなりました。実際、ExecutorServiceだと何にも書く必要がないので、超楽です。神です)
解説が薄いのも、ExecutorServiceがなんでもやってくれるからです。