チームラボエンジニアリングアドベントカレンダー15日目の記事です。
はじめに
ストリームAPIを初めて採用したJavaSE8がリリースされてからしばらく経ち、最近のJava開発ではレガシーシステムでもラムダ 式によって記述されたストリームのパイプライン処理を見かけることが多くなりました。
そんな中、普段なかなか使うことのないストリームの並列処理に今更ながら触れていきたいと思います。💪
ストリームAPIとは?
コレクションや配列などのデータをもとにデータの加工や、集計を行うAPIです。ストリームはデータの処理した結果を次の処理へ渡すことができるので、複数の処理をメソッドチェーンで繋ぐことができます。このような仕組みをストリームのパイプライン処理と呼ぶらしいです。。
データ 数字のリスト(ストリームの生成)
↓
中間操作 配列内の各要素をインクリメント
↓
中間操作 配列内の各要素を並び替え
↓
終端操作 要素を出力
上記イメージを実装すると次のようになります。
// 数値のリスト
List<Integer> list = Arrays.asList(0, 21, 7, 5);
// リストからストリーム生成 -> 処理① -> 処理② -> 結果
list.stream().map(num -> ++num).sorted().forEach(System.out::println);
List<Integer> list = Arrays.asList(0, 21, 7, 5);
// 各要素に + 1
for(int i = 0 ; list.size() > i ; i++ ){
list.set( i , list.get(i) + 1);
}
// ソート
Collections.sort(list);
// 各要素を出力
for(Integer num : list){
System.out.println(num);
}
コードを比較すると、ストリームAPIを使用することで処理を簡潔に繋げて記述できることが分かります。そして、明示的な反復処理の記述がなくても各要素に対してメソッドが作用していることが分かります。(実際に反復処理が実行されるのはメソッドごとではなく終端操作の1度のみ)
また、stream
メソッドでストリームを生成していますが、このように生成されたストリームを順次ストリームと呼びます。名前の通りデータに対し、並んでいる順番通りに連続して処理を実行していきます。
パラレルストリーム(本題)
パラレルストリームを使用すると先ほど解説したパイプライン処理をシンプルに並列で実行することができます。
パラレルストリームの取得
具体的には、下記のようにparallelStream
メソッドでストリームを生成します。その他は先ほどと変わらず中間操作、終端操作を記述していきます。
// リストからパラレルストリーム生成 -> 処理① -> 処理② -> 結果
list.parallelStream().map(num -> ++num).sorted().forEach(System.out::println);
実行して先ほどの結果と比較すると 6,22,8,1...
と出力される順番が実行する度に変わり、ランダムに表示されます。これは並列処理によりどの要素から処理されるかが実行時により異なり、順序が保証されないためです。
forEachOrdered
メソッドで解決できますが今回は説明を省きます。また、実行される順番は実行する度に変わりますが、findFirst(1つ目の要素を取得)
などの結果は順次ストリームと同じ結果を毎回取得できます。
なにはともあれ、これで便利なパイプライン処理を手軽に並列で実装することができました。
実行時間の比較
実際にパラレルストリームを使用して実行時間の比較をしてみます。
下記は1〜1000が格納されたリストの各要素を2倍にして終端操作で合計を取得する処理になります。
// 順次ストリーム
long start = System.nanoTime();
long count1 = LongStream.rangeClosed(1,1000).map(num -> num * 2).sum();
System.out.println("順次 : " + (System.nanoTime() - start)); // 実行時間
// パラレルストリーム
start = System.nanoTime();
long count2 = LongStream.rangeClosed(1,1000).parallel().map(num -> num * 2).sum();
System.out.println("並列 : " + (System.nanoTime() - start)); // 実行時間
順次 : 4820468
並列 : 7720613
並列にした方が早いイメージがありますが、実はパラレルストリームを使用する際オーバーヘッドが生じるのでリストの要素数が少ない場合実行速度が遅くなる場合があります。(ストリームの使用に関わらず並列処理全般に当てはまります)
rangeClosed(1,1000)
を rangeClosed(1,100000000)
に変更して再実行した結果
順次 : 151995263
並列 : 62737393
今度はリスト内の要素数が多いため、並列の方が短い処理時間で完了しました。
このようにパラレルストリームを使用する場合、本当に必要か正しく判断する必要があります。また、ストリームを使用していても通常の並列処理と同様にスレッドセーフなどの考慮も必要になってきます。
リダクション操作でもう少し踏み込む
リダクション操作とは、、、以下公式から引用
リダクション操作(折りたたみとも呼ばれる)は、一連の入力要素を受け取り、結合操作を繰り返し適用することでそれらを結合し、単一のサマリー結果を出力します。(一連の数値の合計または最大値の検索や、リストへの要素の蓄積など)
実際の実装は、順次ストリームでも利用されているreduce,collectメソッドがパラレル用にオーバーロードされて用意されているのでそちらを使用します。
数値リストの値をインクリメントしてその合計を求める処理で比較してみます。
// forEach で 合計用変数に加算
long start = System.nanoTime();
AtomicLong result1 = new AtomicLong(0); // アトミック変数
LongStream.rangeClosed(1,100000000).parallel().forEach(num -> result.getAndAdd(++num));
System.out.println("力技 : " + (System.nanoTime() - start)); //実行時間
// リダクション操作で集約
start = System.nanoTime();
long result2 = LongStream.rangeClosed(1,100000000).boxed().parallel().reduce(
0L, // 初期値
(sum, num) -> sum + ++num, // 集約処理
(sum1, sum2) -> sum1 + sum2); // マージ処理
System.out.println("reduce : " + (System.nanoTime() - start)); // 実行時間
力技 : 1281707565
reduce : 485846382
どちらもパラレルストリームによるパイプライン処理ですが実行結果を比較すると大きく差があります。
forEachで加算していく集約方法だとスレッドセーフな実装のため、結果格納用のアトミック変数への操作で待ちの状態が発生し、並列性によるメリットが活きません。
一方、reduceメソッドを使用すると数値リストの要素を複数に分割し、部分的な集約処理とマージ処理を繰り返し全体の計算をしていきます。そうすることで待ちの状態がなく並列性を保ったまま計算することができます。
このようにパラレルストリームを使用してFork/Joinフレームワークと比べシンプルに並列処理の実装ができました。
おわりに
今回の並列化によるオーバーヘッドなど、どんな機能を採用するにも本当に問題が改善されるのかを把握することが大切になります。
表面的な部分しか触れることができなかったので、初歩的な部分から深く踏み込んだ記事を次回以降書いていけたらなと思います。
参考
Java公式ドキュメント (パッケージ java.util.stream)
https://docs.oracle.com/javase/jp/11/docs/api/java.base/java/util/stream/package-summary.html