Javaで通常の(シングルスレッドで)プログラムを書くと、CPUコアは1つしか使いません。複数コアを使うには、明示的にスレッドや並列処理を使う必要があります。
ここでは、1~1,000,000の合計値を求めるロジックを例に、Javaで複数コアを使うためのサンプルを紹介します。
Javaで複数コアを使うための方法として以下を紹介します。
-
Thread/Runnable を使う方法
- 特徴:
new Thread(runnable).start()で直接スレッドを起動 - メリット: シンプルで仕組み理解に最適
- デメリット: 戻り値なし、スレッド管理が煩雑
- 特徴:
-
Thread/Callable/FutureTaskを使う方法
- 特徴: Callableは戻り値を返せる
- メリット: 戻り値を自然に扱える
- デメリット: ExecutorServiceより冗長
-
ExecutorService を使う方法
- 特徴: スレッドプールを管理。
submit()でタスク投入、Future.get()で結果取得 - メリット: スレッド管理が自動化され、大規模処理でも安定
- デメリット:
shutdownを忘れるとスレッドが残る
- 特徴: スレッドプールを管理。
-
ForkJoinPool を使う方法
- 特徴: 再帰的にタスクを分割し、work-stealingで効率的に並列化
- メリット: CPU バウンド処理で最速クラス
- デメリット: コードが複雑、分割戦略を設計する必要あり
-
parallelStream を使う方法
- 特徴: Stream APIに
.parallel()を付けるだけで並列化、内部で ForkJoinPoolを利用 - メリット: 圧倒的に短いコード。データ並列処理に強い
- デメリット: 副作用のある処理に弱い、順序保証がない
- 特徴: Stream APIに
0. シングルスレッド(通常のJavaコード)
通常の(シングルスレッド)で1~1,000,000の合計値を求めるコードは以下となります。
package parallel;
public class SingleSum {
private static final int N_MAX = 1_000_000;
public static void main(String[] args) {
long sum = 0;
for (int i = 0; i < N_MAX; i++) {
sum += (i + 1);
}
System.out.println("1~" + N_MAX + " の合計 = " + sum);
}
}
上記コードの実行結果は以下となります。
1~1000000 の合計 = 500000500000
1. Thread/Runnableを使う方法
Thread/Runnableを使って、1~1,000,000の合計値を求めるコードは以下となります。
package parallel;
public class ParallelSum01ThreadRunnable {
private static final int N_MAX = 1_000_000;
public static void main(String[] args) throws Exception {
int numThreads = Runtime.getRuntime().availableProcessors();
System.out.println("論理プロセッサ数=" + numThreads);
long[] partial = new long[numThreads];
Thread[] threads = new Thread[numThreads];
int chunk = N_MAX / numThreads;
for (int i = 0; i < numThreads; i++) {
final int idx = i;
final int start = i * chunk + 1;
final int end = (i == numThreads - 1) ? N_MAX : (i + 1) * chunk;
System.out.println((idx + 1) + "個目の論理プロセッサ:" + start + "~" + end + "の合計を計算");
threads[i] = new Thread(() -> {
long sum = 0;
for (int x = start; x <= end; x++) {
sum += x;
}
partial[idx] = sum;
});
threads[i].start();
}
// 全スレッド終了待ち
for (Thread t : threads) {
t.join();
}
// 合計
long total = 0;
for (long p : partial) {
total += p;
}
System.out.println("1~" + N_MAX + " の合計 = " + total);
}
}
上記コードの実行結果は以下となります。
論理プロセッサ数=4
1個目の論理プロセッサ:1~250000の合計を計算
2個目の論理プロセッサ:250001~500000の合計を計算
3個目の論理プロセッサ:500001~750000の合計を計算
4個目の論理プロセッサ:750001~1000000の合計を計算
1~1000000 の合計 = 500000500000
2. Thread/Callable/FutureTaskを使う方法
Thread/Callable/FutureTaskを使って、1~1,000,000の合計値を求めるコードは以下となります。
package parallel;
import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;
public class ParallelSum02ThreadCallableFutureTask {
private static final int N_MAX = 1_000_000;
public static void main(String[] args) throws Exception {
int numThreads = Runtime.getRuntime().availableProcessors();
System.out.println("論理プロセッサ数=" + numThreads);
FutureTask<Long>[] tasks = new FutureTask[numThreads];
Thread[] threads = new Thread[numThreads];
int chunk = N_MAX / numThreads;
for (int i = 0; i < numThreads; i++) {
final int start = i * chunk + 1;
final int end = (i == numThreads - 1) ? N_MAX : (i + 1) * chunk;
System.out.println((i + 1) + "個目の論理プロセッサ:" + start + "~" + end + "の合計を計算");
Callable<Long> callable = () -> {
long sum = 0;
for (int x = start; x <= end; x++) {
sum += x;
}
return sum;
};
tasks[i] = new FutureTask<>(callable);
threads[i] = new Thread(tasks[i]);
threads[i].start();
}
long total = 0;
for (FutureTask<Long> task : tasks) {
total += task.get(); // 部分和を取得
}
System.out.println("合計 = " + total);
}
}
上記コードの実行結果は以下となります。
論理プロセッサ数=4
1個目の論理プロセッサ:1~250000の合計を計算
2個目の論理プロセッサ:250001~500000の合計を計算
3個目の論理プロセッサ:500001~750000の合計を計算
4個目の論理プロセッサ:750001~1000000の合計を計算
合計 = 500000500000
3. ExecutorServiceを使う方法
ExecutorServiceを使って、1~1,000,000の合計値を求めるコードは以下となります。
package parallel;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class ParallelSum03ExecutorService {
private static final int N_MAX = 1_000_000;
public static void main(String[] args) throws Exception {
int numThreads = Runtime.getRuntime().availableProcessors();
System.out.println("論理プロセッサ数=" + numThreads);
ExecutorService exec = Executors.newFixedThreadPool(numThreads);
List<Callable<Long>> tasks = new ArrayList<>();
int chunkSize = N_MAX / numThreads;
for (int i = 0; i < numThreads; i++) {
int start = i * chunkSize + 1;
int end = (i == numThreads - 1) ? N_MAX : (i + 1) * chunkSize;
System.out.println((i + 1) + "個目の論理プロセッサ:" + start + "~" + end + "の合計を計算");
tasks.add(() -> {
long sum = 0;
for (int x = start; x <= end; x++) {
sum += x;
}
return sum;
});
}
// 並列実行
List<Future<Long>> results = exec.invokeAll(tasks);
// 結果を集約
long total = 0;
int n = 0;
for (Future<Long> f : results) {
System.out.println((n + 1) + "個目の論理プロセッサで計算した合計:" + f.get());
total += f.get();
n++;
}
exec.shutdown();
System.out.println("1~" + N_MAX + " の合計 = " + total);
}
}
上記コードの実行結果は以下となります。
論理プロセッサ数=4
1個目の論理プロセッサ:1~250000の合計を計算
2個目の論理プロセッサ:250001~500000の合計を計算
3個目の論理プロセッサ:500001~750000の合計を計算
4個目の論理プロセッサ:750001~1000000の合計を計算
1個目の論理プロセッサで計算した合計:31250125000
2個目の論理プロセッサで計算した合計:93750125000
3個目の論理プロセッサで計算した合計:156250125000
4個目の論理プロセッサで計算した合計:218750125000
1~1000000 の合計 = 500000500000
4. ForkJoinPoolを使う方法
ForkJoinPoolを使って、1~1,000,000の合計値を求めるコードは以下となります。
package parallel;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;
public class ParallelSum04ForkJoinPool {
private static final int N_MAX = 1_000_000;
private static int cnt = 0;
// タスク:start 〜 end の合計を計算
static class SumTask extends RecursiveTask<Long> {
private static final int THRESHOLD = 100_000; // 分割しない最小サイズ
private final int start;
private final int end;
SumTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
int length = end - start + 1;
// 小さい範囲なら直接計算
if (length <= THRESHOLD) {
long sum = 0;
for (int i = start; i <= end; i++) {
sum += i;
}
return sum;
}
// 大きい範囲なら分割
int mid = (start + end) / 2;
SumTask left = new SumTask(start, mid);
SumTask right = new SumTask(mid + 1, end);
System.out.println("左" + cnt + ":" + start + "~" + mid + "を計算");
System.out.println("右" + cnt + ":" + (mid + 1) + "~" + end + "を計算");
cnt++;
// 左を非同期実行
left.fork();
// 右を現在スレッドで実行
long rightResult = right.compute();
// 左の結果を待つ
long leftResult = left.join();
return leftResult + rightResult;
}
}
public static void main(String[] args) {
ForkJoinPool pool = new ForkJoinPool();
SumTask task = new SumTask(1, N_MAX);
long result = pool.invoke(task);
System.out.println("1~" + N_MAX + " の合計 = " + result);
}
}
上記コードの実行結果は以下となります。
左0:1~500000を計算
右0:500001~1000000を計算
左1:500001~750000を計算
右1:750001~1000000を計算
左2:750001~875000を計算
右2:875001~1000000を計算
左3:875001~937500を計算
右3:937501~1000000を計算
左4:1~250000を計算
右4:250001~500000を計算
左5:250001~375000を計算
右5:375001~500000を計算
左6:375001~437500を計算
右6:437501~500000を計算
左7:500001~625000を計算
右7:625001~750000を計算
左8:625001~687500を計算
右8:687501~750000を計算
左9:250001~312500を計算
右9:312501~375000を計算
左10:500001~562500を計算
右10:562501~625000を計算
左11:1~125000を計算
右11:125001~250000を計算
左12:125001~187500を計算
右12:187501~250000を計算
左13:750001~812500を計算
右13:812501~875000を計算
左14:1~62500を計算
右14:62501~125000を計算
1~1000000 の合計 = 500000500000
5. parallelStreamを使う方法
parallelStreamを使って、1~1,000,000の合計値を求めるコードは以下となります。
package parallel;
import java.util.stream.LongStream;
public class ParallelSum05ParallelStream {
private static final int N_MAX = 1_000_000;
public static void main(String[] args) {
long result = LongStream.rangeClosed(1, N_MAX)
.parallel()
.reduce(0L, (a, b) -> a + b);
System.out.println("1~" + N_MAX + " の合計 = " + result);
}
}
上記コードの実行結果は以下となります。
1~1000000 の合計 = 500000500000
6. 上記方法の比較
| 方法 | 抽象度 | 戻り値 | スレッド管理 | 特徴 | 適した用途 |
|---|---|---|---|---|---|
| Thread | 最低 | × | 手動 (start/join) | 直接スレッドを生成・管理。Runnableを渡すだけ。 | 学習用、小規模処理 |
| Thread + Callable (FutureTask) | 低 | ○ | 手動 | Callableで戻り値を返せる。Threadに渡すにはFutureTaskが必要。 | 戻り値が欲しいが低レベルで制御したい場合 |
| ExecutorService | 中 | ○ | 自動(スレッドプール) | submitでタスク投入、Futureで結果取得。スレッドプール管理が自動。 | 実務標準。Webサーバー、バッチ処理 |
| ForkJoinPool | 中~高 | ○ | 自動(work-stealing) | divide & conquer に最適化。RecursiveTaskで分割統治。 | ソート・探索・数値計算などCPUバウンド処理 |
| parallelStream | 最高 | ○ | 完全自動(内部でForkJoinPool) | .parallel() を付けるだけ。map/filter/reduceを並列化。 | データ変換・大量リスト処理 |
以上