■はじめに
Javaでマルチスレッドの実装について考える機会があったので、復習のためにまとめます。
今回は、決まったスレッド数を使いまわして処理を実行させるようにしたかったので、Executors.newFixedThreadPool(int nThreads)
を使ってスレッドを用意しました。
■今回実装した内容(一部加工、簡略化)
データInsertするメソッドをマルチスレッドで実行する
■実装
基本形
- 10つのスレッドを用意
- 1.のスレッドを使って、データInsert処理(executeInsert)を実行
- 新規タスクの受付を停止
- 実行中の処理が全て終わるまで待機
ExecutorService executor = Executors.newFixedThreadPool(10); // 1
for (Data data : dataList) {
executor.submit(() -> executeInsert(data)); // 2
}
executor.shutdown(); // 3
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS); // 4
・Executors.newFixedThreadPool
について
一番最後に記載した参考文献のページを参考にしました。
newFixedThreadPool
以外にもいろいろメソッドがあるようなので、今度色々試してみたい。
マルチスレッドで実行したメソッドの戻り値を受け取る
- 処理の戻り値を受け取れる
CompletableFuture<戻り値>
のListを用意 - 用意したマルチスレッドで処理を実行し、結果をfutureListに追加
- futureListに入ったすべての処理が終わるまで待機
- 処理の戻り値をチェックしていく(falseだったらException発生させている)
ExecutorService executor = Executors.newFixedThreadPool(10);
List<CompletableFuture<Boolean>> futureList = new ArrayList<>(); // 1
for (Data data : dataList) {
futureList.add(CompletableFuture.supplyAsync(() -> executeInsert(data), executor)); // 2
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join(); // 3
for (CompletableFuture<Boolean> future : futureList) {
if (!future.get()) { // 4
throw new RuntimeException("データ投入に失敗しました");
}
}
・CompletableFuture
について
gitHub copilotに質問して見つけた実装方法。
CompletableFutureはFutureインターフェースの実装クラスです。
Futureは、
非同期計算の結果を表します。計算が完了したかどうかのチェック、完了までの待機、計算結果の取得などを行うためのメソッドが用意されています。(参照:https://docs.oracle.com/javase/jp/8/docs/api/java/util/concurrent/Future.html )
というもの。非同期処理が完了したかをチェックしたり完了まで待機させたり、非同期処理の結果(戻り値)を受け取ることもできてとても便利。
CompletableFuture.supplyAsync
メソッドで、引数2つ目でexecutorを指定していることで、用意したスレッドプールを使ってデータInsertする処理(executeInsert)を非同期で実行できています。
今回executeInsertの戻り値をbooleanにしているのでCompletableFuture<Boolean>
で受け取っていますが、他の戻り値でも同様に使えます。(例:戻り値がStringであればCompletableFuture<String>
で受け取る)
■実装で困ったこと
1. dataListが多いときにOutOfMemoryが発生
マルチスレッドで実行する処理数が多くなると、メモリが不足するようになった。
とりあえずヒープサイズを大きくしてみたが、dataListが大きくなればまたOOMが発生したので根本解決にならなかった
原因:dataList分だけfutureListが生成されていて、これがメモリをひっ迫していた
- Eclipse Memory Analyzerで分析してみて発覚
解決:futureListを、一定件数ごとに確認してリセットするように変更
- 処理実行のループ中に100件ごとに結果確認して空にすることで、futureListには100件までしか溜まらなくなった
- 一定件数ごとに待機が入ってしまうので、修正前と比べて少しぐらい遅くなるかもしれない
ExecutorService executor = Executors.newFixedThreadPool(10);
List<CompletableFuture<Boolean>> futureList = new ArrayList<>();
for (Data data : dataList) {
futureList.add(CompletableFuture.supplyAsync(() -> process.execute(data), executor));
if (futureList.size() >= 100) { // 追加部分
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<Boolean> future : futureList) {
if (!future.get()) {
throw new RuntimeException("データ投入に失敗しました");
}
}
futureList.clear();
}
}
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).join();
for (CompletableFuture<Boolean> future : futureList) {
if (!future.get()) {
throw new RuntimeException("データ投入に失敗しました");
}
}
2.データInsertで一意制約が起きる(次回)
上記では簡略化していて見えないですが、
今回の実装ではPreparedStatementを利用してデータ投入をつくっていました。
全ての処理で同じインスタンスを使ってデータ更新SQLを追加(addBatchメソッド)し、一定件数がたまったら実行(executeBatchメソッド)するようにしていました。
このとき工夫をなにもしないと、executeBatchメソッドが実行された時に一意制約が起きました。
こちらの原因と解決方法は長くなりそうなので、次回書こうと思います。
■参考文献