0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

JavaプログラムでCPUのcore数を有効活用する方法(サンプルプログラム)

Posted at

Javaで通常の(シングルスレッドで)プログラムを書くと、CPUコアは1つしか使いません。複数コアを使うには、明示的にスレッドや並列処理を使う必要があります。
ここでは、1~1,000,000の合計値を求めるロジックを例に、Javaで複数コアを使うためのサンプルを紹介します。
Javaで複数コアを使うための方法として以下を紹介します。

  • Thread/Runnable を使う方法
    • 特徴: new Thread(runnable).start()で直接スレッドを起動
    • メリット: シンプルで仕組み理解に最適
    • デメリット: 戻り値なし、スレッド管理が煩雑
  • Thread/Callable/FutureTaskを使う方法
    • 特徴: Callableは戻り値を返せる
    • メリット: 戻り値を自然に扱える
    • デメリット: ExecutorServiceより冗長
  • ExecutorService を使う方法
    • 特徴: スレッドプールを管理。submit()でタスク投入、Future.get()で結果取得
    • メリット: スレッド管理が自動化され、大規模処理でも安定
    • デメリット: shutdownを忘れるとスレッドが残る
  • ForkJoinPool を使う方法
    • 特徴: 再帰的にタスクを分割し、work-stealingで効率的に並列化
    • メリット: CPU バウンド処理で最速クラス
    • デメリット: コードが複雑、分割戦略を設計する必要あり
  • parallelStream を使う方法
    • 特徴: Stream API.parallel()を付けるだけで並列化、内部で ForkJoinPoolを利用
    • メリット: 圧倒的に短いコード。データ並列処理に強い
    • デメリット: 副作用のある処理に弱い、順序保証がない

0. シングルスレッド(通常のJavaコード)

通常の(シングルスレッド)で1~1,000,000の合計値を求めるコードは以下となります。

SingleSum.java
package parallel;

public class SingleSum {

	private static final int N_MAX = 1_000_000;

	public static void main(String[] args) {
		long sum = 0;
		for (int i = 0; i < N_MAX; i++) {
			sum += (i + 1);
		}
		System.out.println("1~" + N_MAX + " の合計 = " + sum);
	}
}

上記コードの実行結果は以下となります。

実行結果
1~1000000 の合計 = 500000500000

1. Thread/Runnableを使う方法

Thread/Runnableを使って、1~1,000,000の合計値を求めるコードは以下となります。

ParallelSum01ThreadRunnable.java
package parallel;

public class ParallelSum01ThreadRunnable {

	private static final int N_MAX = 1_000_000;

    public static void main(String[] args) throws Exception {
        int numThreads = Runtime.getRuntime().availableProcessors();
		System.out.println("論理プロセッサ数=" + numThreads);

        long[] partial = new long[numThreads];
        Thread[] threads = new Thread[numThreads];

        int chunk = N_MAX / numThreads;

        for (int i = 0; i < numThreads; i++) {
            final int idx = i;
            final int start = i * chunk + 1;
            final int end = (i == numThreads - 1) ? N_MAX : (i + 1) * chunk;
			System.out.println((idx + 1) + "個目の論理プロセッサ:" + start + "~" + end + "の合計を計算");

            threads[i] = new Thread(() -> {
                long sum = 0;
                for (int x = start; x <= end; x++) {
                    sum += x;
                }
                partial[idx] = sum;
            });

            threads[i].start();
        }

        // 全スレッド終了待ち
        for (Thread t : threads) {
            t.join();
        }

        // 合計
        long total = 0;
        for (long p : partial) {
            total += p;
        }

		System.out.println("1~" + N_MAX + " の合計 = " + total);

    }
}

上記コードの実行結果は以下となります。

実行結果
論理プロセッサ数=4
1個目の論理プロセッサ:1~250000の合計を計算
2個目の論理プロセッサ:250001~500000の合計を計算
3個目の論理プロセッサ:500001~750000の合計を計算
4個目の論理プロセッサ:750001~1000000の合計を計算
1~1000000 の合計 = 500000500000

2. Thread/Callable/FutureTaskを使う方法

Thread/Callable/FutureTaskを使って、1~1,000,000の合計値を求めるコードは以下となります。

ParallelSum02ThreadCallableFutureTask.java
package parallel;

import java.util.concurrent.Callable;
import java.util.concurrent.FutureTask;

public class ParallelSum02ThreadCallableFutureTask {
    private static final int N_MAX = 1_000_000;

    public static void main(String[] args) throws Exception {
        int numThreads = Runtime.getRuntime().availableProcessors();
		System.out.println("論理プロセッサ数=" + numThreads);

        FutureTask<Long>[] tasks = new FutureTask[numThreads];
        Thread[] threads = new Thread[numThreads];

        int chunk = N_MAX / numThreads;

        for (int i = 0; i < numThreads; i++) {
            final int start = i * chunk + 1;
            final int end = (i == numThreads - 1) ? N_MAX : (i + 1) * chunk;
			System.out.println((i + 1) + "個目の論理プロセッサ:" + start + "~" + end + "の合計を計算");

            Callable<Long> callable = () -> {
                long sum = 0;
                for (int x = start; x <= end; x++) {
                    sum += x;
                }
                return sum;
            };

            tasks[i] = new FutureTask<>(callable);
            threads[i] = new Thread(tasks[i]);
            threads[i].start();
        }

        long total = 0;
        for (FutureTask<Long> task : tasks) {
            total += task.get(); // 部分和を取得
        }

        System.out.println("合計 = " + total);
    }
}

上記コードの実行結果は以下となります。

実行結果
論理プロセッサ数=4
1個目の論理プロセッサ:1~250000の合計を計算
2個目の論理プロセッサ:250001~500000の合計を計算
3個目の論理プロセッサ:500001~750000の合計を計算
4個目の論理プロセッサ:750001~1000000の合計を計算
合計 = 500000500000

3. ExecutorServiceを使う方法

ExecutorServiceを使って、1~1,000,000の合計値を求めるコードは以下となります。

ParallelSum03ExecutorService.java
package parallel;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class ParallelSum03ExecutorService {

	private static final int N_MAX = 1_000_000;

	public static void main(String[] args) throws Exception {

		int numThreads = Runtime.getRuntime().availableProcessors();
		System.out.println("論理プロセッサ数=" + numThreads);

		ExecutorService exec = Executors.newFixedThreadPool(numThreads);
		List<Callable<Long>> tasks = new ArrayList<>();
		int chunkSize = N_MAX / numThreads;

		for (int i = 0; i < numThreads; i++) {
			int start = i * chunkSize + 1;
			int end = (i == numThreads - 1) ? N_MAX : (i + 1) * chunkSize;
			System.out.println((i + 1) + "個目の論理プロセッサ:" + start + "~" + end + "の合計を計算");
			tasks.add(() -> {
				long sum = 0;
				for (int x = start; x <= end; x++) {
					sum += x;
				}
				return sum;
			});
		}

		// 並列実行
		List<Future<Long>> results = exec.invokeAll(tasks);

		// 結果を集約
		long total = 0;
		int n = 0;
		for (Future<Long> f : results) {
			System.out.println((n + 1) + "個目の論理プロセッサで計算した合計:" + f.get());
			total += f.get();
			n++;
		}

		exec.shutdown();

		System.out.println("1~" + N_MAX + " の合計 = " + total);
	}
}

上記コードの実行結果は以下となります。

実行結果
論理プロセッサ数=4
1個目の論理プロセッサ:1~250000の合計を計算
2個目の論理プロセッサ:250001~500000の合計を計算
3個目の論理プロセッサ:500001~750000の合計を計算
4個目の論理プロセッサ:750001~1000000の合計を計算
1個目の論理プロセッサで計算した合計:31250125000
2個目の論理プロセッサで計算した合計:93750125000
3個目の論理プロセッサで計算した合計:156250125000
4個目の論理プロセッサで計算した合計:218750125000
1~1000000 の合計 = 500000500000

4. ForkJoinPoolを使う方法

ForkJoinPoolを使って、1~1,000,000の合計値を求めるコードは以下となります。

ParallelSum04ForkJoinPool.java
package parallel;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveTask;

public class ParallelSum04ForkJoinPool {

	private static final int N_MAX = 1_000_000;
	private static int cnt = 0;

	// タスク:start 〜 end の合計を計算
	static class SumTask extends RecursiveTask<Long> {
		private static final int THRESHOLD = 100_000; // 分割しない最小サイズ
		private final int start;
		private final int end;

		SumTask(int start, int end) {
			this.start = start;
			this.end = end;
		}

		@Override
		protected Long compute() {
			int length = end - start + 1;

			// 小さい範囲なら直接計算
			if (length <= THRESHOLD) {
				long sum = 0;
				for (int i = start; i <= end; i++) {
					sum += i;
				}
				return sum;
			}

			// 大きい範囲なら分割
			int mid = (start + end) / 2;

			SumTask left = new SumTask(start, mid);
			SumTask right = new SumTask(mid + 1, end);
			System.out.println("左" + cnt + ":" + start + "~" + mid + "を計算");
			System.out.println("右" + cnt + ":" + (mid + 1) + "~" + end + "を計算");
			cnt++;
			
			// 左を非同期実行
			left.fork();

			// 右を現在スレッドで実行
			long rightResult = right.compute();

			// 左の結果を待つ
			long leftResult = left.join();

			return leftResult + rightResult;
		}
	}

	public static void main(String[] args) {

		ForkJoinPool pool = new ForkJoinPool();

		SumTask task = new SumTask(1, N_MAX);

		long result = pool.invoke(task);

		System.out.println("1~" + N_MAX + " の合計 = " + result);
	}
}

上記コードの実行結果は以下となります。

実行結果
左0:1~500000を計算
右0:500001~1000000を計算
左1:500001~750000を計算
右1:750001~1000000を計算
左2:750001~875000を計算
右2:875001~1000000を計算
左3:875001~937500を計算
右3:937501~1000000を計算
左4:1~250000を計算
右4:250001~500000を計算
左5:250001~375000を計算
右5:375001~500000を計算
左6:375001~437500を計算
右6:437501~500000を計算
左7:500001~625000を計算
右7:625001~750000を計算
左8:625001~687500を計算
右8:687501~750000を計算
左9:250001~312500を計算
右9:312501~375000を計算
左10:500001~562500を計算
右10:562501~625000を計算
左11:1~125000を計算
右11:125001~250000を計算
左12:125001~187500を計算
右12:187501~250000を計算
左13:750001~812500を計算
右13:812501~875000を計算
左14:1~62500を計算
右14:62501~125000を計算
1~1000000 の合計 = 500000500000

5. parallelStreamを使う方法

parallelStreamを使って、1~1,000,000の合計値を求めるコードは以下となります。

ParallelSum05ParallelStream.java
package parallel;

import java.util.stream.LongStream;

public class ParallelSum05ParallelStream {

	private static final int N_MAX = 1_000_000;

	public static void main(String[] args) {

        long result = LongStream.rangeClosed(1, N_MAX)
                                .parallel()
                                .reduce(0L, (a, b) -> a + b);

        System.out.println("1~" + N_MAX + " の合計 = " + result);
    }
}

上記コードの実行結果は以下となります。

実行結果
1~1000000 の合計 = 500000500000


6. 上記方法の比較

方法 抽象度 戻り値 スレッド管理 特徴 適した用途
Thread 最低 × 手動 (start/join) 直接スレッドを生成・管理。Runnableを渡すだけ。 学習用、小規模処理
Thread + Callable (FutureTask) 手動 Callableで戻り値を返せる。Threadに渡すにはFutureTaskが必要。 戻り値が欲しいが低レベルで制御したい場合
ExecutorService 自動(スレッドプール) submitでタスク投入、Futureで結果取得。スレッドプール管理が自動。 実務標準。Webサーバー、バッチ処理
ForkJoinPool 中~高 自動(work-stealing) divide & conquer に最適化。RecursiveTaskで分割統治。 ソート・探索・数値計算などCPUバウンド処理
parallelStream 最高 完全自動(内部でForkJoinPool) .parallel() を付けるだけ。map/filter/reduceを並列化。 データ変換・大量リスト処理

以上

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?