はじめに
お仕事で、下記のような相談を受けました。
「なんかCompletableFuture使って並列処理してると、数十回~数百回に1回くらい、正しく値が返ってこない場合がある」
その時の対応をメモ書きしておきます。
問題
実行環境は、
- JDK1.8 ( 8u181 )
- dockerコンテナ上で実行 (alpine linux)
です。
下記のような処理があります。
CompletableFutureを利用し、並列的にsummaryItemに情報を格納しています。
xxxAPI.search(number);
は、外部のAPIを呼び出してItemの情報を取得しています。
どこにバグが潜んでいるでしょうか?
public List<Item> search(int length) {
//検索結果を保持する変数
List<Item> summaryItem = new ArrayList<>(length);
List<CompletableFuture<Void>> futures = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
final int number = i;
CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
//外部APIを呼び出して、データの取得
Item item = xxxAPI.search(number);
summaryItem.add(item);
}, pool);
futures.add(f);
}
//全ての並列処理が終わるまで待機
CompletableFuture<Void> all =
CompletableFuture.allOf(futures.toArray(new CompletableFuture[length]));
all.join();
return summaryItem;
}
回答
下記処理の部分ですね。
※実行環境は関係ありません
summaryItem.add(item);
summaryItemは、java.util.ArrayListです。ArrayListは、 スレッドセーフではありません。
JavaDocを見てみましょう。
**この実装はsynchronizedされません。**複数のスレッドが並行してArrayListインスタンスにアクセスし、それらのスレッドの少なくとも1つがリストを構造的に変更する場合は、外部でその同期をとる必要があります。構造的な変更とは、1つ以上の要素を追加または削除したり、基になる配列のサイズを明示的に変更したりする処理のことです。要素の値だけを設定する処理は、構造的な変更ではありません。これは通常、リストを自然にカプセル化する一部のオブジェクトでsynchronizedすることによって達成されます。そのようなオブジェクトが存在しない場合は、Collections.synchronizedListメソッドを使用してリストを「ラップ」するようにしてください。リストが誤ってsynchronizedなしでアクセスされるのを防ぐために、作成時に行うことをお薦めします。
しっかりと書いていますね。
対応
スレッドセーフなListを使う。もしくは、そもそもスレッドセーフな処理に変更する。
という形のアプローチで対応しました。
スレッドセーフなListを使う
java.util.concurrent
パッケージにちゃんと同期化されたリストがあります。こちらを使いましょう。
List<Item> summaryItem = new ArrayList<>(length);
上記コードを、下記のように変えるだけで正常に動作するようになります。
List<Item> summaryItem = new CopyOnWriteArrayList<>(length);
場合によっては、 Collections.synchronizedList
を使う事で解決はしますが、iteratorを使う場合は、やはり自分で同期化が必要になります。
スレッドセーフな処理に変更する
そもそも、java.util.function.Supplier形式で値を返すようにすれば良いです。
※私はこの方が好きです。summaryItemを長いスコープで利用しくなるからです。
public List<Item> search(int length) {
// 検索の並列実行処理
List<CompletableFuture<Item>> futures = new ArrayList<>(length);
for (int i = 0; i < length; i++) {
final int number = i;
CompletableFuture<Item> f = CompletableFuture.supplyAsync(() -> {
//外部APIを呼び出して、データの取得
return xxxAPI.search(number);
}, pool);
futures.add(f);
}
// 検索結果の収集
try {
List<Item> summaryItem = new ArrayList<>();
for (CompletableFuture<Item> f : futures) {
summaryItem.add(f.get());
}
return summaryItem;
} catch (ExecutionException | InterruptedException ex) {
throw new RuntimeException(ex);//適当に書いてます。
}
}
ものすごい偶発的に発生しますし、Java APIのある程度の知識が無いと、問題が発見しにくいのがポイントですね。
テストコードも (大体) スルーしてしまいます。
こういった部分はデバッグ等よりもコードレビューを行う事が役立つと思います。
もっとやりたいこと
CheckStyleじゃ多分発見できないけど、静的コード解析系のツールで問題を発見できそうな気がする。
今後探してみようと思います。