LoginSignup
0
0

More than 5 years have passed since last update.

Item 48: Use caution when making streams parallel

Posted at

48.ストリーム処理を並列で行う際は注意せよ

並列処理は依然として困難

Javaの進化に伴い、並列プログラミングは書きやすくなったものの、正しく、速い並列処理の実装を書くことは依然として難しい。これはストリームのparallelストリームでの処理も例外でない。
Item45の例をみてみる。

package tryAny.effectiveJava;

import static java.math.BigInteger.*;

import java.math.BigInteger;
import java.util.stream.Stream;

public class MersennePrimes {
    public static void main(String[] args) {
        primes().map(p -> TWO.pow(p.intValueExact()).subtract(ONE)).filter(mersenne -> mersenne.isProbablePrime(50))
                .limit(20)
                // .forEach(System.out::println);
                .forEach(mp -> System.out.println(mp.bitLength() + ":" + mp));
    }

    static Stream<BigInteger> primes() {
        return Stream.iterate(TWO, BigInteger::nextProbablePrime);
    }
}

このプログラムのストリーム処理にparallel()を入れても、処理が速くなるどころか、処理は完了せず、CPUが高止まりし続ける。

ここでは何が起こっているのか?
簡単には、ストリームのライブラリがこのパイプラインをどのように並列化するか分からず、ヒューリスティックな解が失敗したということが原因である。
もともとStream.iterateから来ていたり、limit中間操作が行われる場合は、パイプラインの並列化は性能を上げにくい。上記のプログラムはこのどちらの要素も持ってしまっている。
ここでの教訓は、見境なく並列ストリームを使ってはならないということだ。

概して、並列化でパフォーマンス向上が見込めるのは、ArrayList、HashMap、HashSet、ConcurrentHashMap、配列、intの範囲をもったもの、longの範囲を持ったもののストリームに対しての並列化である。
これらの共通点は、サブレンジへの分割が容易であることである。
これらのデータ構造が持つもう一つの重要な共通点は、逐次処理される時の参照の局所性である。

終端操作も並列処理の性能に響いてくる。
パイプラインの全体に比して、大量の処理を終端処理で行い、かつ、その終端処理が内部的に逐次処理を行うものであったら、パイプラインの並列化はあまり効果を得られない。
一番効果を得られる終端処理は、min、max、count、sumといったリダクション処理である。
また、anyMatch、allMatch、noneMatchといった短絡評価は並列化の効果を得やすい。
ストリームのcollectメソッドによって行われる可変リダクション操作は、並列化の恩恵を受けにくい。なぜなら、コレクションを結びつける処理のオーバーヘッドがコストとなるからだ。

Safety failure

ストリームの並列化によって、liveness failureを含む性能面の問題が起こるだけでなく、間違った結果や予期しない動作を招くこともある。(safety failure)
safety failureはストリームの厳しい仕様にのっとっていない関数を使用した場合に発生する。
例えば、ストリームに渡されるaccumulate(蓄積させる)する関数とコンバインする関数は、結合的で、非干渉で、ステートレスな関数でなければならない。
これを守れずとも、直線的なパイプラインであれば問題は起きないが、並列化されたパイプラインだと悲惨な結果になりうる。

並列化して効果があるのか?

とても良い条件で並列処理ができたとしても、並列化したコストを相殺するような性能を見せなければ意味がない。
荒く見積もって、(ストリームの要素の数)*(1要素に実行されるコード行数) > 100000 を満たすべきである(リンク元見ると10000のような。。(http://gee.cs.oswego.edu/dl/html/StreamParallelGuidance.html))。

ストリームの並列化はパフォーマンス最適化である、ということは認識しておくべき。
どのような最適化でも、その変更前後でテストをしてやる価値があるか確かめねばならない。理想的には、テストは本番環境で行うべきである。

並列化は正しい状況下で使えばとても有用

正しい状況下で並列化を行えば、プロセッサ数と比例するような性能向上が見込める。
機械学習やデータ処理の分野ではこれらの性能向上がしやすい。

並列化が効率的に行える、素数計数関数の例を見ていく。

package tryAny.effectiveJava;

import java.math.BigInteger;
import java.util.stream.LongStream;

public class ParallelTest1 {
    // Prime-counting stream pipeline - benefits from parallelization
    static long pi(long n) {
        return LongStream.rangeClosed(2, n).mapToObj(BigInteger::valueOf).filter(i -> i.isProbablePrime(50)).count();
    }

    public static void main(String[] args) {
        StopWatch sw = new StopWatch();
        sw.start();
        System.out.println(pi(10000000));
        sw.stop();
        System.out.println(sw.getTime());
    }
}

上記のコードを処理するのに42秒くらいかかった。これを並列化する。

package tryAny.effectiveJava;

import java.math.BigInteger;
import java.util.stream.LongStream;

import org.apache.commons.lang3.time.StopWatch;

public class ParallelTest1 {
    // Prime-counting stream pipeline - benefits from parallelization
    static long pi(long n) {
        return LongStream.rangeClosed(2, n).parallel().mapToObj(BigInteger::valueOf).filter(i -> i.isProbablePrime(50))
                .count();
    }

    public static void main(String[] args) {
        StopWatch sw = new StopWatch();
        sw.start();
        System.out.println(pi(10000000));
        sw.stop();
        System.out.println(sw.getTime());
    }
}

こうすると23秒くらいで終了した。(2コアのマシンで実行)

ランダム値のストリームを並列化する場合

ランダム値の生成を並列で行うのならば、ThreadLocalRandomよりもSplittableRandomを使うべき。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0