はじめに
この記事はExecutorフレームワークのありがたみを知るシリーズ第3弾、Executors#newCachedThreadPoolに関する記事です。
関連記事:
参考:
- 3.1 スレッドの仕組みと操作(マルチスレッド、並行処理、並列処理、非同期呼び出し、スレッドセーフ、同期化、デッドロック、ウェイトセット、スレッドローカルなど)~Java Advanced編
- 3.2 並行処理ユーティリティとExecutorフレームワーク(スレッドプール、ExecutorService、Callable、Futureなど)~Java Advanced編
- ExecutorServiceを使って、Javaでマルチスレッド処理
FixedThreadPoolとCachedThreadPoolの違い
両者とも、「複数のスレッドを起動したい」場合に使うもので、解決できる問題は似ています。
解決できる課題はExecutors#newFixedThreadPool編にまとめました。
今回は構成を変えて、両者の比較に重きを置いた説明をしていきます。
FixedThreadPool
決まったスレッド数のスレッドを起動して、大量のタスクを固定数のスレッドで捌きます。
スレッド数が常に一定なので、リソースを食い潰さないように処理を続けることができます。
新しいタスクが投入された場合、空いているスレッドがあればそのスレッドが処理をし、空いているスレッドがなければ「待ち行列(タスクキュー)」にタスクが積まれます。
CachedThreadPool
スレッド数は動的に変動します。必要に応じてスレッド数を増やすことで大量のタスクを捌きます。
タスクが完了して60秒経過すると、そのスレッドは破棄されます。
新しいタスクが投入された場合、空いているスレッドがあればそのスレッドが処理をし、空いているスレッドがなければ新たにスレッドを作成して処理します。
違いのまとめ
| 項目 | FixedThreadPool | CachedThreadPool |
|---|---|---|
| スレッド数 | 一定 | 動的 |
| スレッドの寿命 | 終わらない | タスク完了後60秒で死ぬ |
| タスクキュー | 無限容量のキュー | キューなし(実際の実装では容量0のキューを置くことがある) |
| 渋滞時の挙動 | キューにたまる | スレッドを増やす |
それぞれの向いているもの
FixedThreadPool
スレッド数が一定のため、メモリやCPUなどのリソース消費を一定に制御することができる。
そのため、重い処理を安定して実行したい場合に適している
- CPUに高負荷な並列処理
- CPUコア数に合わせてスレッド数を決めることで効率的な処理が見込める
- 画像処理、数値シミュレーション、機械学習など
- 一定量のタスクが継続して流れてくるようなバッチ処理
- タスク数が予測できるものはそれに合わせてスレッド数を決めておくことができる
- 定期的なファイル読み込み、定期的にAPIを実行→結果を処理、など
- Webサーバーの内部処理
- 一定のスレッドで処理することでスループットを安定させることができる
- アプリ内のバックグラウンド処理、DBの更新処理(別スレッドに逃して安定化させる)
CachedThreadPool
スレッド数が動的なため、処理要求が少ないときは少ないスレッドで対応、処理要求が多くなったらスレッドを増やして対応できる。
ただし、あまりにも多くの要求が集中すると、メモリやCPUを圧迫するリスクがある。
そのため、軽い処理を大量に捌く場合に適している。
- すぐに終わるタスクを大量に捌く処理
- スレッドがすぐにプールに返却されるので、効率よくスレッドの再利用ができる
- ログ書き込み、軽いメトリクス送信、通知処理など
- タスク数の増減が大きい処理
- 需要に応じてスレッド数を変化できるので、負荷が低いときはスレッドを減らしてコストを下げることができる
- 時間帯によってアクセス数が大きく変わるサービス、レートに波がある外部連携、など
- I/O待ちが多く実行時間が長いがCPUを使わない処理
- I/O待ち中はCPUを使わないため、スレッドを増やせる
- スレッドを増やして捌く量を増やすことで待ち時間を隠蔽することができる
- ファイルダウンロード、外部API呼び出し、など
Executors#newCachedThreadPoolの使い方
Executors#newSingleThreadExecutor編と同様に、以下の「受け取った数値になんらかの計算を施す」タスクについて考えます。
public class MyTask implements Runnable{
private int value;
public MyTask(int value) {
this.value = value;
}
@Override
public void run() {
System.out.println("[start] value:" + value + " thread-id:" + Thread.currentThread().getId());
// 何か複雑な処理
System.out.println("[ end ] value:" + value+ " thread-id:" + Thread.currentThread().getId());
}
}
Executors#newCachedThreadPoolの使い方は例によって簡単で、このメソッドを呼び出すだけです。
public static void main(String[] args) throws InterruptedException {
ExecutorService executor = Executors.newCachedThreadPool();
List<Integer> valueList = Arrays.asList(1,2,3,4,5);
for(int value: valueList) {
executor.submit(new MyTask(value));
}
}
// コンソールの出力:
// [start] value:5 thread-id:24
// [start] value:1 thread-id:20
// [start] value:2 thread-id:21
// [ end ] value:2 thread-id:21
// [start] value:4 thread-id:23
// [ end ] value:4 thread-id:23
// [start] value:3 thread-id:22
// [ end ] value:5 thread-id:24
// [ end ] value:1 thread-id:20
// [ end ] value:3 thread-id:22
// ※5個のスレッドで処理が回っている
上記に書いた特徴の通り、「空いているスレッドがなければ新たにスレッドを作る」「60秒タスクが来なかったらスレッドを破棄する」といった処理、自分でパッと書けますでしょうか?
FixedThreadExecutorと比べても、さらにライフサイクル管理が面倒そうですね。
この面倒な処理を完全に隠蔽してくれるのがExecutors#newCachedThreadPoolです。
おわりに
CachedThreadExecutorを使って効率よく捌くために、タスク側を細かく分解して設計する、といった発想があります。
こういう考え方は割と好きです。
一見すると手段(CachedThreadExecutor)が目的になっていないか、と突っ込まれそうなものですが、もっと踏み込んで「タスク」も手段の一部であると考えていると理解しています。
手段への深い理解、目的への深い理解があっての発想だと思っています。