1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Java 並列処理

1
Last updated at Posted at 2025-05-21

Runnnableインターフェース

Javaの並列処理についてです。Runnableインターフェースを使ったコンソールアプリを作って。
Main.java
import java.util.*;

public class Main {
    public static void main(String[] args) throws Exception {
        Thread thread1 = new Thread(new CounterTask("Task1"));
        Thread thread2 = new Thread(new CounterTask("Task2"));
        Thread thread3 = new Thread(new CounterTask("Task3"));
        
        thread1.start();
        thread2.start();
        thread3.start();
    }
}

class CounterTask implements Runnable{
    private final String taskName;
    
    public CounterTask(String taskName){
        this.taskName = taskName;
    }
    
    @Override
    public void run(){
        for(int i=0;i<5;i++){
            System.out.println(taskName + " - Count: " + i);
            try{
                Thread.sleep(500);
            }catch(InterruptedException ex){
                System.out.println(taskName + " was interrupted.");
            }
        }
        System.out.println(taskName + " - Finished!");
    }
}

実行結果

Task2 - Count: 0
Task1 - Count: 0
Task3 - Count: 0
Task2 - Count: 1
Task3 - Count: 1
Task1 - Count: 1
Task2 - Count: 2
Task1 - Count: 2
Task3 - Count: 2
Task2 - Count: 3
Task3 - Count: 3
Task1 - Count: 3

ExecutorServiceフレームワーク

Javaの並列処理についてです。ExecutorServiceフレームワークを使ったコンソールアプリを作って。
Main.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ExecutorServiceExample {

    // Runnableを実装したタスク
    static class CounterTask implements Runnable {
        private final String taskName;

        public CounterTask(String taskName) {
            this.taskName = taskName;
        }

        @Override
        public void run() {
            for (int i = 0; i < 5; i++) {
                System.out.println(taskName + " - Count: " + i + " (Thread: " + Thread.currentThread().getName() + ")");
                try {
                    Thread.sleep(500); // 0.5秒スリープ
                } catch (InterruptedException e) {
                    System.out.println(taskName + " was interrupted.");
                }
            }
            System.out.println(taskName + " - Finished!");
        }
    }

    public static void main(String[] args) {
        // 固定スレッドプール(3スレッド)を作成
        ExecutorService executor = Executors.newFixedThreadPool(3);

        // タスクを5つ送信
        for (int i = 1; i <= 5; i++) {
            executor.submit(new CounterTask("Task " + i));
        }

        // もう新しいタスクを受け付けないようにする
        executor.shutdown();

        try {
            // すべてのタスクが完了するまで最大10秒待つ
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                executor.shutdownNow(); // 終わらなければ強制終了
            }
        } catch (InterruptedException e) {
            executor.shutdownNow();
        }

        System.out.println("All tasks completed.");
    }
}

Fork/Joinフレームワーク

Fork/Join Frameworkは、大きなタスクを小さなサブタスクに分割して並列処理を行うのに適した仕組みです。Java 7以降で利用でき、再帰的な処理や分割統治(Divide and Conquer)アルゴリズムに向いています。

🧠 サンプル概要:Fork/Join で配列の合計を並列計算
このサンプルでは、整数の配列を複数のサブタスクに分割し、Fork/Joinフレームワークを使って並列に合計を計算します。

Main.java
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;

public class ForkJoinSumExample {

    // 再帰タスク:配列の部分合計を計算
    static class SumTask extends RecursiveTask<Long> {
        private static final int THRESHOLD = 10; // 分割のしきい値
        private final int[] numbers;
        private final int start;
        private final int end;

        public SumTask(int[] numbers, int start, int end) {
            this.numbers = numbers;
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            int length = end - start;
            if (length <= THRESHOLD) {
                // 小さい範囲は逐次処理
                long sum = 0;
                for (int i = start; i < end; i++) {
                    sum += numbers[i];
                }
                System.out.println("Computed sum of [" + start + ", " + end + ") = " + sum + " in " + Thread.currentThread().getName());
                return sum;
            } else {
                // 大きければ2つに分割して再帰
                int mid = start + length / 2;
                SumTask leftTask = new SumTask(numbers, start, mid);
                SumTask rightTask = new SumTask(numbers, mid, end);

                // タスクをフォークし並列実行
                leftTask.fork(); // 非同期実行
                long rightResult = rightTask.compute(); // こちらはメインスレッドで実行
                long leftResult = leftTask.join(); // forkしたタスクの完了を待つ

                return leftResult + rightResult;
            }
        }
    }

    public static void main(String[] args) {
        // データの準備(1〜100までの整数)
        int[] numbers = new int[100];
        for (int i = 0; i < numbers.length; i++) {
            numbers[i] = i + 1;
        }

        // ForkJoinPoolを作成してタスクを実行
        ForkJoinPool pool = new ForkJoinPool();
        SumTask task = new SumTask(numbers, 0, numbers.length);

        long result = pool.invoke(task);
        System.out.println("Total Sum = " + result);
    }
}

CompletableFutureを使った非同期処理

Main.java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class CompletableFutureExample {

    // 疑似的な外部サービス呼び出し
    public static CompletableFuture<String> fetchData(String serviceName, int delaySeconds) {
        return CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println(serviceName + " - Started (Thread: " + Thread.currentThread().getName() + ")");
                TimeUnit.SECONDS.sleep(delaySeconds); // 疑似的な遅延
            } catch (InterruptedException e) {
                return serviceName + " - Failed";
            }
            return serviceName + " - Result after " + delaySeconds + "s";
        });
    }

    public static void main(String[] args) {
        // 3つの非同期タスクを実行
        CompletableFuture<String> future1 = fetchData("Service A", 2);
        CompletableFuture<String> future2 = fetchData("Service B", 3);
        CompletableFuture<String> future3 = fetchData("Service C", 1);

        // すべての処理が完了した後にまとめて表示
        CompletableFuture<Void> allDone = CompletableFuture.allOf(future1, future2, future3);

        allDone.thenRun(() -> {
            try {
                System.out.println("=== Results ===");
                System.out.println(future1.get());
                System.out.println(future2.get());
                System.out.println(future3.get());
            } catch (InterruptedException | ExecutionException e) {
                System.out.println("エラーが発生しました: " + e.getMessage());
            }
        }).join(); // メインスレッドが終了しないよう待機

        System.out.println("All tasks completed.");
    }
}

🚀 応用アイディア(希望があれば拡張します)
.thenApply() を使った結果の加工

.thenCombine() で複数の結果を統合

.exceptionally() でエラー処理

.timeout() を使ったタイムアウト処理(Java 9以降)

ParallelStream

Main.java
import java.util.Arrays;
import java.util.List;

public class ParallelStreamExample {

    // 疑似的な重い処理
    public static String process(String taskName, int delaySeconds) {
        try {
            System.out.println("Started " + taskName + " (Thread: " + Thread.currentThread().getName() + ")");
            Thread.sleep(delaySeconds * 1000); // 疑似的な処理時間
        } catch (InterruptedException e) {
            return taskName + " - Failed";
        }
        return taskName + " - Completed";
    }

    public static void main(String[] args) {
        // 処理対象のタスクリスト
        List<String> tasks = Arrays.asList("Task A", "Task B", "Task C", "Task D", "Task E");

        // 並列に処理(すべての結果を出力)
        tasks.parallelStream()
                .map(task -> process(task, 2))
                .forEach(result -> System.out.println("Result: " + result));

        System.out.println("All tasks completed.");
    }
}

🚀 応用アイディア(必要なら追加できます)
.collect(Collectors.toList()) で結果を集約

.filter() や .sorted() を組み合わせた並列フィルタ処理

並列処理の実行時間の比較(stream() vs parallelStream())

Virtual Threads

Javaの Virtual Threads(仮想スレッド) は、Java 21(正式リリース)以降で標準化された軽量スレッド機能で、高スケーラビリティな非同期処理を非常に簡潔に実装できます。

🧵 Virtual Threadsとは?
OSスレッドではなく、JVMが管理する軽量なスレッド

数万〜数百万単位のスレッドを効率的に作成・実行可能

Thread.ofVirtual().start() Executors.newVirtualThreadPerTaskExecutor() で使用

Main.java
import java.time.Duration;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class VirtualThreadExample {

    public static void main(String[] args) {
        List<String> tasks = List.of("Task A", "Task B", "Task C", "Task D", "Task E");

        // 仮想スレッドごとにタスクを実行するExecutor
        try (ExecutorService executor = Executors.newVirtualThreadPerTaskExecutor()) {
            for (String task : tasks) {
                executor.submit(() -> {
                    System.out.println(task + " started (Thread: " + Thread.currentThread() + ")");
                    try {
                        Thread.sleep(Duration.ofSeconds(2)); // 疑似的な重い処理
                    } catch (InterruptedException e) {
                        System.out.println(task + " was interrupted");
                    }
                    System.out.println(task + " completed");
                });
            }
        } // 自動的に全タスク完了を待ってクローズ

        System.out.println("All tasks submitted.");
    }
}

メモリリークの防止

① メモリリークを引き起こすコード(スレッドを適切に停止しない場合)

Main.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MemoryLeakExample {

    public static void main(String[] args) {
        // スレッドプールを作成
        ExecutorService executorService = Executors.newCachedThreadPool();

        // 非同期タスクを5000回送信
        for (int i = 0; i < 5000; i++) {
            executorService.submit(() -> {
                try {
                    // 疑似的な重い処理
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // メモリリークを発生させるために、executorServiceを適切にシャットダウンしない
        // executorService.shutdown(); // これをコメントアウトすることで、スレッドプールが終了せずメモリリークが発生
    }
}

❌ 解説:
上記のコードでは、ExecutorService を使って多くのタスクを非同期に実行していますが、shutdown() メソッドを呼び出さないため、スレッドプールが終了せず、実行が完了したスレッドが解放されません。

実行後、スレッドが不要なままメモリに残り続け、最終的に メモリリーク が発生します。

② メモリリークを防止する非同期処理のコード(適切にリソースを管理)

Main.java
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class MemoryLeakPreventionExample {

    public static void main(String[] args) {
        // スレッドプールを作成(終了時に自動的にシャットダウンされる)
        try (ExecutorService executorService = Executors.newCachedThreadPool()) {
            // 非同期タスクを5000回送信
            for (int i = 0; i < 5000; i++) {
                executorService.submit(() -> {
                    try {
                        // 疑似的な重い処理
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                });
            }

            // executorServiceはtry-with-resourcesで自動的にシャットダウンされる
            // シャットダウンを忘れずに行うことでメモリリークを防止する
        } catch (Exception e) {
            e.printStackTrace();
        }

        System.out.println("All tasks submitted and executor properly shutdown.");
    }
}

メモリ使用量のモニタリング

メモリ使用量をモニタリングするためには、jvisualvm や jconsole を使用できます。以下に、メモリ使用量を監視する簡単なコードを示します。

Main.java
import java.util.ArrayList;
import java.util.List;

public class MemoryMonitoringExample {

    public static void main(String[] args) {
        List<byte[]> memoryLeakList = new ArrayList<>();
        
        // メモリ使用量を監視するための定期的な処理
        Runnable memoryMonitor = () -> {
            while (true) {
                // 1MBのメモリを継続的に確保(メモリリークの模倣)
                memoryLeakList.add(new byte[1024 * 1024]); // 1MBの配列を追加
                System.gc(); // ガーベジコレクションを手動で呼び出す
                try {
                    Thread.sleep(1000); // 1秒ごとにメモリ使用量をチェック
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                // メモリ使用量のモニタリング
                long totalMemory = Runtime.getRuntime().totalMemory() / 1024 / 1024;
                long freeMemory = Runtime.getRuntime().freeMemory() / 1024 / 1024;
                System.out.println("Total Memory: " + totalMemory + " MB, Free Memory: " + freeMemory + " MB");
            }
        };

        // メモリモニタリングスレッドを開始
        Thread monitorThread = new Thread(memoryMonitor);
        monitorThread.start();

        // メインスレッドは他の処理を続行
        try {
            while (true) {
                Thread.sleep(5000); // メモリ使用量のモニタリングを続ける
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}


✅ ヒープダンプを生成するコード

🎯 実装内容
HeapDumpGenerator クラス: これを使ってヒープダンプを生成します。

非同期タスク: ダンプを生成するトリガーを一定時間おきに非同期で実行します。

コンソールアプリケーション: メイン処理に HeapDumpGenerator を統合します。

Main.java
import com.sun.management.HotSpotDiagnosticMXBean;
import java.lang.management.ManagementFactory;
import java.lang.management.MBeanServer;

public class HeapDumpGenerator {
    // ヒープダンプを生成するメソッド
    public static void generateHeapDump() {
        try {
            // MBeanServerを取得
            MBeanServer server = ManagementFactory.getPlatformMBeanServer();
            
            // HotSpotDiagnosticMXBeanを取得
            HotSpotDiagnosticMXBean mxBean = ManagementFactory.newPlatformMXBeanProxy(
                server,
                "com.sun.management:type=HotSpotDiagnostic",
                HotSpotDiagnosticMXBean.class
            );
            
            // ヒープダンプのファイル名を設定
            String fileName = "heap_dump_" + System.currentTimeMillis() + ".hprof";
            
            // ヒープダンプをファイルとして生成
            mxBean.dumpHeap(fileName, true);
            
            System.out.println("Heap dump generated: " + fileName);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
    
    public static void main(String[] args) {
        // ダンプを生成する間隔を設定(例:5秒ごとにダンプ生成)
        Runnable task = () -> {
            while (true) {
                try {
                    Thread.sleep(5000); // 5秒ごとにヒープダンプを生成
                    generateHeapDump();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        
        // 非同期にタスクを実行
        Thread taskThread = new Thread(task);
        taskThread.start();
        
        // メインスレッドが何かの処理を行っていると仮定
        try {
            while (true) {
                // ここで他の処理を行っても良い(例:メモリリークを引き起こす)
                Thread.sleep(10000); // 10秒ごとに何かを処理
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

📌 コード解説
generateHeapDump() メソッド:

ManagementFactory.getPlatformMBeanServer() で MBeanServer を取得し、HotSpotDiagnosticMXBean を使ってヒープダンプを生成します。

ダンプのファイル名には、System.currentTimeMillis() を使ってユニークな名前を付けています。

mxBean.dumpHeap(fileName, true) でヒープダンプを指定されたファイル名で保存します。

main() メソッド:

非同期タスクとして、Runnable を使ってヒープダンプを生成する処理を定期的に実行します(5秒ごとにダンプ生成)。

メインスレッドはその間に他の処理を実行することができます。ここでは、仮に他の処理を行うために10秒ごとに何かを処理しています。

ヒープダンプの生成:

非同期タスクが実行される間に、指定した間隔(5秒ごと)でヒープダンプが生成されます。

ヒープダンプは、heap_dump_.hprof という形式でファイルに保存されます。


スレッドプールサイズ計算

calculateOptimalThreads() メソッドでは、ターゲットのCPU使用率(targetCpuUtilization)とCPU比率(cpuRatio)を元に最適なスレッド数を算出します。

以下に、このコードを使って スレッドプールサイズを計算するコンソールアプリ を作成します。このアプリでは、ユーザーからターゲットCPU使用率とCPU比率を入力してもらい、それに基づいて最適なスレッド数を計算して表示します。

Main.java
import java.util.Scanner;

public class ThreadPoolCalculator {

    // 最適なスレッド数を計算するメソッド
    public static int calculateOptimalThreads(float targetCpuUtilization, float cpuRatio) {
        int numberOfCores = Runtime.getRuntime().availableProcessors();

        // 最適なスレッド数の計算式
        int optimal = Math.round(numberOfCores * targetCpuUtilization * (1 + cpuRatio));

        return Math.max(optimal, 1); // 最低1スレッドを確保
    }

    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);

        // ユーザーからの入力を受け取る
        System.out.println("ターゲットCPU使用率を入力してください(0.0 - 1.0の範囲):");
        float targetCpuUtilization = scanner.nextFloat();

        System.out.println("CPU比率を入力してください(例: 0.5, 1.0 など):");
        float cpuRatio = scanner.nextFloat();

        // 最適なスレッド数を計算
        int optimalThreads = calculateOptimalThreads(targetCpuUtilization, cpuRatio);

        // 結果を表示
        System.out.println("推奨されるスレッドプールサイズ: " + optimalThreads);

        scanner.close();
    }
}

コード解説
calculateOptimalThreads() メソッド:

まず、Runtime.getRuntime().availableProcessors() を使用して、システム上のCPUコア数を取得します。

targetCpuUtilization(ターゲットのCPU使用率)と cpuRatio(CPU比率)を使用して、最適なスレッド数を計算します。

Math.round() で計算結果を四捨五入し、最小でも1スレッドは確保するために Math.max(optimal, 1) を使用しています。

main() メソッド:

ユーザーからターゲットCPU使用率(0.0~1.0の範囲)とCPU比率を入力してもらい、その入力に基づいて最適なスレッド数を計算します。

計算結果をコンソールに出力します。


カスタムスレッドプールの実装例

OptimizedThreadPool クラスを使ってスレッドプールを最適化し、スレッドプールのサイズ(corePoolSize と maxPoolSize)を計算しています。特に、createOptimizedPool() メソッドで、スレッドプールを作成するための処理が行われています。以下では、このコードをもとに スレッドプールサイズを計算し、コンソールアプリ として作成する方法を示します。

✅ 実装内容
スレッドプールサイズの計算:
corePoolSize と maxPoolSize を計算するロジックを統合。

スレッドプールの作成:
ThreadPoolExecutor を使ってスレッドプールを作成。

コンソールアプリケーション:
ユーザーから設定値(ターゲットCPU使用率やCPU比率)を受け取り、スレッドプールを動的に作成して管理。

スレッドプールサイズ計算のコンソールアプリ

Main.java
import java.util.Scanner;
import java.util.concurrent.*;

public class OptimizedThreadPool {
    // 最適なスレッド数を計算するメソッド
    public static int calculateOptimalThreads(float targetCpuUtilization, float cpuRatio) {
        int numberOfCores = Runtime.getRuntime().availableProcessors();

        // 最適なスレッド数を計算
        int optimal = Math.round(numberOfCores * targetCpuUtilization * (1 + cpuRatio));

        return Math.max(optimal, 1); // 最低1スレッドを確保
    }

    // スレッドプールを作成するメソッド
    public static ExecutorService createOptimizedPool(String poolName) {
        int corePoolSize = calculateOptimalThreads(0.8f, 0.5f);
        int maxPoolSize = corePoolSize * 2;
        long keepAliveTime = 60L;

        return new ThreadPoolExecutor(
            corePoolSize,
            maxPoolSize,
            keepAliveTime,
            TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(1000),
            new ThreadFactory() {
                private final AtomicInteger threadNumber = new AtomicInteger(1);

                @Override
                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r);
                    t.setName(poolName + "-" + threadNumber.getAndIncrement());
                    return t;
                }
            },
            new ThreadPoolExecutor.CallerRunsPolicy()
        );
    }

    public static void main(String[] args) {
        Scanner scanner = new Scanner(System.in);

        // ユーザーからの入力を受け取る
        System.out.println("ターゲットCPU使用率を入力してください(0.0 - 1.0の範囲):");
        float targetCpuUtilization = scanner.nextFloat();

        System.out.println("CPU比率を入力してください(例: 0.5, 1.0 など):");
        float cpuRatio = scanner.nextFloat();

        // 最適なスレッド数を計算
        int corePoolSize = calculateOptimalThreads(targetCpuUtilization, cpuRatio);
        int maxPoolSize = corePoolSize * 2;

        // 計算結果を表示
        System.out.println("最適なスレッドプールサイズ:");
        System.out.println("コアプールサイズ (corePoolSize): " + corePoolSize);
        System.out.println("最大プールサイズ (maxPoolSize): " + maxPoolSize);

        // スレッドプールを作成
        ExecutorService executorService = createOptimizedPool("OptimizedPool");

        // 簡単なタスクを実行してみる
        for (int i = 0; i < 5; i++) {
            executorService.submit(() -> {
                try {
                    Thread.sleep(1000);
                    System.out.println(Thread.currentThread().getName() + " is executing task.");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }

        // シャットダウンして終了
        executorService.shutdown();
        scanner.close();
    }
}

📌 コード解説
calculateOptimalThreads() メソッド:

既存のコードと同様に、ターゲットCPU使用率とCPU比率を使って最適なスレッド数を計算します。

createOptimizedPool() メソッド:

ThreadPoolExecutor を使用してスレッドプールを作成します。

corePoolSize と maxPoolSize は、前述の calculateOptimalThreads() メソッドを基に計算されます。

スレッドプール内で使用するスレッドの名前は、ThreadFactory を使って動的に設定しています(OptimizedPool-1, OptimizedPool-2, ...)。

タスクが実行されると、スレッド名が表示されます。

main() メソッド:

ユーザーからターゲットCPU使用率(0.0~1.0の範囲)とCPU比率を入力してもらい、その入力を基に最適なスレッド数を計算します。

計算されたスレッド数を使ってスレッドプールを作成し、いくつかの簡単なタスクを非同期に実行します。


パフォーマンス計測ユーティリティ

Main.java
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.DoubleSummaryStatistics;
import java.util.concurrent.ConcurrentHashMap;

public class PerformanceMonitor {
    private static final Map<String, List<Long>> executionTimes = 
        new ConcurrentHashMap<>();

    // 実行時間を記録するメソッド
    public static void recordExecutionTime(String taskName, long startTime) {
        long executionTime = System.nanoTime() - startTime;
        executionTimes.computeIfAbsent(taskName, k -> new ArrayList<>())
                     .add(executionTime);
    }

    // 統計情報を出力するメソッド
    public static void printStatistics(String taskName) {
        List<Long> times = executionTimes.get(taskName);
        if (times == null || times.isEmpty()) {
            System.out.println("No data for task: " + taskName);
            return;
        }

        DoubleSummaryStatistics stats = times.stream()
            .mapToDouble(t -> t / 1_000_000.0) // ナノ秒からミリ秒に変換
            .summaryStatistics();

        System.out.printf("""
            Task: %s
            Count: %d
            Average: %.2f ms
            Min: %.2f ms
            Max: %.2f ms
            StdDev: %.2f ms%n""",
            taskName,
            stats.getCount(),
            stats.getAverage(),
            stats.getMin(),
            stats.getMax(),
            calculateStdDev(times)
        );
    }

    // 標準偏差を計算するメソッド
    private static double calculateStdDev(List<Long> times) {
        double mean = times.stream()
            .mapToDouble(t -> t / 1_000_000.0)
            .average()
            .orElse(0.0);

        double variance = times.stream()
            .mapToDouble(t -> t / 1_000_000.0)
            .map(t -> Math.pow(t - mean, 2))
            .average()
            .orElse(0.0);

        return Math.sqrt(variance);
    }

    // 実行をシミュレートするタスク
    public static void performTask(String taskName) {
        long startTime = System.nanoTime();

        // タスクをシミュレート(例えば 1 秒の待機)
        try {
            Thread.sleep((long)(Math.random() * 2000)); // 0 - 2 秒のランダム待機
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }

        // 実行時間を記録
        recordExecutionTime(taskName, startTime);
    }

    public static void main(String[] args) {
        // コンソールで実行するタスクの数
        int numTasks = 5;
        String taskName = "Task";

        // 5回タスクを実行し、その実行時間を計測
        for (int i = 0; i < numTasks; i++) {
            performTask(taskName + "-" + (i + 1));
        }

        // 統計情報を表示
        for (int i = 0; i < numTasks; i++) {
            printStatistics(taskName + "-" + (i + 1));
        }
    }
}

📌 コードの解説
recordExecutionTime() メソッド:

このメソッドは、タスク名とタスクの開始時刻を引数に取り、実行時間をナノ秒で記録します。その後、ConcurrentHashMap に格納して、スレッドセーフに複数のタスクの実行時間を管理します。

printStatistics() メソッド:

実行したタスクに関連する統計情報(回数、平均、最小、最大、標準偏差)を計算して出力します。

統計は、List に保存された実行時間をミリ秒単位で表示します。

calculateStdDev() メソッド:

実行時間のリストを基に標準偏差を計算し、パフォーマンスのばらつきを確認します。

performTask() メソッド:

タスクをシミュレートするために、ランダムな待機時間(0〜2秒)を用いてタスクの実行時間を模倣します。このタスクを5回繰り返して、実行時間を記録します。

main() メソッド:

performTask() を 5 回実行し、タスクごとに実行時間を記録します。次に、printStatistics() メソッドを使用して、各タスクの統計情報を表示します。


スレッドダンプの取得と分析

ThreadDumpAnalyzer クラスを使って、Java のスレッドダンプを取得し、その内容を解析するコンソールアプリケーションを作成します。このアプリケーションは、現在のスレッドの状態を出力し、ボトルネックを特定するための手段として使用できます。

Main.java
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.util.Arrays;
import java.util.stream.Collectors;

public class ThreadDumpAnalyzer {

    // スレッドダンプを生成するメソッド
    public static void generateThreadDump() {
        // ThreadMXBeanを取得
        ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean();

        // 全スレッド情報を取得(スタックトレース、ロック情報も取得)
        ThreadInfo[] threadInfos = threadMXBean.dumpAllThreads(true, true);

        // スレッド情報を出力
        for (ThreadInfo info : threadInfos) {
            System.out.println("""
                Thread: %s
                State: %s
                Blocked Count: %d
                Waited Count: %d
                Lock Name: %s
                Stack Trace:
                %s
                """.formatted(
                    info.getThreadName(),
                    info.getThreadState(),
                    info.getBlockedCount(),
                    info.getWaitedCount(),
                    info.getLockName(),
                    formatStackTrace(info.getStackTrace())
                )
            );
        }
    }

    // スタックトレースを整形するメソッド
    private static String formatStackTrace(StackTraceElement[] elements) {
        return Arrays.stream(elements)
                    .map(element -> "\t" + element.toString())
                    .collect(Collectors.joining("\n"));
    }

    public static void main(String[] args) {
        // スレッドダンプの生成
        System.out.println("Generating thread dump...\n");
        generateThreadDump();

        // シミュレーションタスクの実行
        simulateBlockingTask();
    }

    // スレッドのブロッキングタスクをシミュレートするメソッド
    private static void simulateBlockingTask() {
        Thread thread1 = new Thread(() -> {
            synchronized (ThreadDumpAnalyzer.class) {
                try {
                    Thread.sleep(5000);  // 5秒間ブロック状態にする
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        Thread thread2 = new Thread(() -> {
            synchronized (ThreadDumpAnalyzer.class) {
                try {
                    Thread.sleep(5000);  // 5秒間ブロック状態にする
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        thread1.start();
        thread2.start();

        try {
            thread1.join();
            thread2.join();
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

📌 コードの解説
generateThreadDump() メソッド:

ThreadMXBean を利用して現在の全スレッドの情報を取得します。

dumpAllThreads(true, true) によって、スレッドの状態(ブロック、待機)やロック名、スタックトレースを取得します。

各スレッドの状態とスタックトレースを標準出力に出力します。

formatStackTrace() メソッド:

スタックトレースをきれいに整形して、見やすく出力します。各スタックトレース要素はインデントして表示されます。

simulateBlockingTask() メソッド:

2つのスレッドが ThreadDumpAnalyzer.class のロックを取得しようとして、それぞれ 5 秒間ブロックされる状況をシミュレートします。これにより、スレッドダンプを取得した際に、スレッドがブロックされている状態(BLOCKED)を見ることができます。

main() メソッド:

generateThreadDump() メソッドを呼び出して、プログラム開始時のスレッドダンプを取得し、その後にシミュレーションを実行してスレッドのブロック状態を確認します。


メトリクス収集の実装

ConcurrencyMetrics クラスは、並列処理におけるメトリクス収集を行うためのクラスです。このクラスを使って、メトリクス(処理回数や平均処理時間など)を収集し、それをコンソールに出力するアプリケーションを作成できます。

以下に、ConcurrencyMetrics を使って並列処理のパフォーマンスメトリクスを収集し、結果をコンソールに表示するサンプルアプリケーションを作成します。

Main.java
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

public class ConcurrencyMetricsApp {

    public static void main(String[] args) {
        // ExecutorServiceのセットアップ(スレッドプール)
        ExecutorService executorService = Executors.newFixedThreadPool(4);
        
        // ジョブ数の設定
        int jobCount = 10;

        // ジョブを非同期で実行し、メトリクスを収集
        for (int i = 0; i < jobCount; i++) {
            executorService.submit(() -> {
                try (ConcurrencyMetrics.Timer timer = ConcurrencyMetrics.startTimer("job")) {
                    simulateJob(); // 処理のシミュレーション
                }
            });
        }

        // 全てのジョブの終了を待機
        executorService.shutdown();
        try {
            if (!executorService.awaitTermination(1, TimeUnit.MINUTES)) {
                executorService.shutdownNow();
            }
        } catch (InterruptedException e) {
            executorService.shutdownNow();
        }

        // メトリクスを表示
        ConcurrencyMetrics.printMetrics();
    }

    // ジョブのシミュレーション(計算処理やAPI呼び出しなど)
    private static void simulateJob() {
        try {
            // 1秒間処理を行うシミュレーション
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}

🎯 ConcurrencyMetrics クラスの解説
このクラスは、以下のメトリクスを収集するための仕組みを提供します。

Meter:

メトリクスを収集するクラスで、処理の回数(count)と総時間(totalTime)を追跡します。

mark(long time) メソッドで、処理時間を加算し、getMeanTime() で平均時間を計算できます。

Timer:

特定の処理の実行時間を測定するためのクラスです。

startTimer(String name) メソッドでタイマーを開始し、close() メソッドを呼び出すと処理時間が記録されます(try-with-resources で使用します)。

処理が終了すると、タイマーが自動的に停止し、メトリクスが記録されます。

printMetrics():

収集したメトリクスをコンソールに出力するメソッドです。メトリクスの名前、処理回数、平均処理時間を表示します。

📌 アプリケーションの実行
メインスレッド:

ExecutorService を使って、複数のスレッドでジョブを並列に実行します(ここでは 10 個のジョブをシミュレート)。

各ジョブは simulateJob() メソッドで 1 秒の処理を行い、その間の処理時間を ConcurrencyMetrics.Timer で計測します。

メトリクスの収集:

各スレッドが処理を完了した後、その実行時間が ConcurrencyMetrics に記録されます。

printMetrics() メソッドが、メトリクス(実行回数と平均実行時間)をコンソールに表示します。

サイト

1
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?