15
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Javaで並列実行する際は、スレッドセーフに気を使わないとハマるお話

Last updated at Posted at 2018-09-12

はじめに

お仕事で、下記のような相談を受けました。
「なんかCompletableFuture使って並列処理してると、数十回~数百回に1回くらい、正しく値が返ってこない場合がある」

その時の対応をメモ書きしておきます。

問題

実行環境は、

  • JDK1.8 ( 8u181 )
  • dockerコンテナ上で実行 (alpine linux)

です。

下記のような処理があります。
CompletableFutureを利用し、並列的にsummaryItemに情報を格納しています。
xxxAPI.search(number); は、外部のAPIを呼び出してItemの情報を取得しています。
どこにバグが潜んでいるでしょうか?

  public List<Item> search(int length) {
    //検索結果を保持する変数
    List<Item> summaryItem = new ArrayList<>(length);

    List<CompletableFuture<Void>> futures = new ArrayList<>(length);
    for (int i = 0; i < length; i++) {
      final int number = i;
      CompletableFuture<Void> f = CompletableFuture.runAsync(() -> {
        //外部APIを呼び出して、データの取得
        Item item = xxxAPI.search(number);
        summaryItem.add(item);
      }, pool);
      futures.add(f);
    }

    //全ての並列処理が終わるまで待機
    CompletableFuture<Void> all =
        CompletableFuture.allOf(futures.toArray(new CompletableFuture[length]));
    all.join();

    return summaryItem;
  }

回答

下記処理の部分ですね。
※実行環境は関係ありません:yum:

        summaryItem.add(item);

summaryItemは、java.util.ArrayListです。ArrayListは、 スレッドセーフではありません。
JavaDocを見てみましょう。

**この実装はsynchronizedされません。**複数のスレッドが並行してArrayListインスタンスにアクセスし、それらのスレッドの少なくとも1つがリストを構造的に変更する場合は、外部でその同期をとる必要があります。構造的な変更とは、1つ以上の要素を追加または削除したり、基になる配列のサイズを明示的に変更したりする処理のことです。要素の値だけを設定する処理は、構造的な変更ではありません。これは通常、リストを自然にカプセル化する一部のオブジェクトでsynchronizedすることによって達成されます。そのようなオブジェクトが存在しない場合は、Collections.synchronizedListメソッドを使用してリストを「ラップ」するようにしてください。リストが誤ってsynchronizedなしでアクセスされるのを防ぐために、作成時に行うことをお薦めします。

しっかりと書いていますね。

対応

スレッドセーフなListを使う。もしくは、そもそもスレッドセーフな処理に変更する。
という形のアプローチで対応しました。

スレッドセーフなListを使う

java.util.concurrent パッケージにちゃんと同期化されたリストがあります。こちらを使いましょう。

List<Item> summaryItem = new ArrayList<>(length);

上記コードを、下記のように変えるだけで正常に動作するようになります。

List<Item> summaryItem = new CopyOnWriteArrayList<>(length);

場合によっては、 Collections.synchronizedList を使う事で解決はしますが、iteratorを使う場合は、やはり自分で同期化が必要になります。

スレッドセーフな処理に変更する

そもそも、java.util.function.Supplier形式で値を返すようにすれば良いです。
※私はこの方が好きです。summaryItemを長いスコープで利用しくなるからです。

  public List<Item> search(int length) {
    // 検索の並列実行処理
    List<CompletableFuture<Item>> futures = new ArrayList<>(length);
    for (int i = 0; i < length; i++) {
      final int number = i;
      CompletableFuture<Item> f = CompletableFuture.supplyAsync(() -> {
        //外部APIを呼び出して、データの取得
        return xxxAPI.search(number);
      }, pool);
      futures.add(f);
    }

    // 検索結果の収集
    try {
      List<Item> summaryItem = new ArrayList<>();
      for (CompletableFuture<Item> f : futures) {
        summaryItem.add(f.get());
      }
      return summaryItem;
    } catch (ExecutionException | InterruptedException ex) {
      throw new RuntimeException(ex);//適当に書いてます。
    }
  }

ものすごい偶発的に発生しますし、Java APIのある程度の知識が無いと、問題が発見しにくいのがポイントですね。
テストコードも (大体) スルーしてしまいます。

こういった部分はデバッグ等よりもコードレビューを行う事が役立つと思います。

もっとやりたいこと

CheckStyleじゃ多分発見できないけど、静的コード解析系のツールで問題を発見できそうな気がする。
今後探してみようと思います。

15
11
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
15
11

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?