Today's issue
java 8 で追加された Stream には、順次ストリームと並列ストリームの2種類の動作モードが存在します。
順次/並列による動作の違いと、実装時の基本的な注意事項について整理します。
なお、本稿での動作結果は、すべて Surface Pro 3(Core i7 / Windows 10 Pro)で検証したものです。
基本:順次ストリームと並列ストリーム
次のサンプルコードは、5人のメンバそれぞれに対して、朝の挨拶をして1秒間待機したのち、さよならの挨拶するだけのものです。
これを、順次ストリーム(試行1)と並列ストリーム(試行2)でそれぞれ動かしてみます。
public class StreamTest {
private static final List<String> members =
Arrays.asList("A", "B", "C", "D", "E");
private static final Consumer<String> greet = member -> {
System.out.println(String.format(
"%s: Hello, %s !", Instant.now(), member));
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
}
System.out.println(String.format(
"%s: Good-bye, %s !", Instant.now(), member));
};
public static void main(String[] args) {
// 試行1
members.stream().forEach(greet);
// 試行2
members.parallelStream().forEach(greet);
}
}
試行1:順次ストリームでは、コンソールに次のように表示されます。
元の List の要素順に、順次、処理が行われていることが分かります。
処理全体で 5 秒程度かかっています。
2015-08-15T21:15:59.128Z: Hello, A !
2015-08-15T21:16:00.174Z: Good-bye, A !
2015-08-15T21:16:00.174Z: Hello, B !
2015-08-15T21:16:01.176Z: Good-bye, B !
2015-08-15T21:16:01.176Z: Hello, C !
2015-08-15T21:16:02.176Z: Good-bye, C !
2015-08-15T21:16:02.176Z: Hello, D !
2015-08-15T21:16:03.177Z: Good-bye, D !
2015-08-15T21:16:03.177Z: Hello, E !
2015-08-15T21:16:04.177Z: Good-bye, E !
試行2:並列ストリームでは、コンソールに次のように表示されました。
元の List の要素順とは無関係に、並列に処理が行われていることが分かります。
処理時間は 2 秒程度に短縮され、並列処理の恩恵が表れています。
2015-08-15T21:16:04.181Z: Hello, E !
2015-08-15T21:16:04.181Z: Hello, C !
2015-08-15T21:16:04.181Z: Hello, B !
2015-08-15T21:16:04.182Z: Hello, A !
2015-08-15T21:16:05.183Z: Good-bye, A !
2015-08-15T21:16:05.185Z: Hello, D !
2015-08-15T21:16:05.186Z: Good-bye, B !
2015-08-15T21:16:05.186Z: Good-bye, C !
2015-08-15T21:16:05.188Z: Good-bye, E !
2015-08-15T21:16:06.187Z: Good-bye, D !
ストリームが順次か並列かは、BaseStream#isParallel メソッドで調べることができます。(BaseStream は Stream のスーパーインタフェースです。)
また、BaseStream#sequential と BaseStream#parallel を使うことにより、ストリームの順次/並列特性を変更することができます。(より正確に言うと、順次/並列特性の異なる別のストリームを生成することができます。)
// 試行1’
members.stream().parallel().forEach(greet);
// 試行2’
members.parallelStream().sequential().forEach(greet);
サンプルコードを上記のように変更すると、試行1’ では並列に、試行2’ では順次、処理が行われます。
実装時の注意点
1. マルチスレッドについて注意する
サンプルコードAは標準出力に文字列を出力するだけの簡単なものでしたが、もう少しだけ複雑なものに変えてみましょう。
次のサンプルコードは、朝の挨拶を標準出力へ出力するのではなくリストに記録することを意図したものです。(簡略化のため、さよならの挨拶は省略しました。)
public class StreamTest {
private static final List<String> members =
Arrays.asList("A", "B", "C", "D", "E");
private static List<String> greets = new ArrayList<>();
private static final Consumer<String> greet = member -> {
int n = greets.size() + 1;
greets.add(String.format(
"%d番目: %s: Hello, %s !", n, Instant.now(), member));
};
public static void main(String[] args) {
// 試行3
members.parallelStream().forEach(greet);
greets.forEach(System.out::println);
}
}
1番目: 2018-07-03T16:11:38.144Z: Hello, B !
1番目: 2018-07-03T16:11:38.144Z: Hello, C !
1番目: 2018-07-03T16:11:38.144Z: Hello, E !
1番目: 2018-07-03T16:11:38.144Z: Hello, A !
5番目: 2018-07-03T16:11:38.198Z: Hello, D !
おかしな結果が出力されていますね。
greet は複数のスレッドで並列に処理され得ますので、通常のマルチスレッド・プログラミングと同様に同期制御を行う必要があります。必要な個所を synchronized で囲うことによりバグは解消されます。
// サンプルコードBと同じ部分は省略
private static final Consumer<String> greet = member -> {
synchronized (greets) {
int n = greets.size() + 1;
greets.add(String.format(
"%d番目: %s: Hello, %s !", n, Instant.now(), member));
}
};
2. ステートフルを避ける
サンプルコードB’ では、「n番目」と記録するために外部変数 greets の内容を参照しています。(greets.size()
を呼び出しています。)
このように、ストリーム処理が、ストリーム処理の間に変化しうる状態に依存していることを、「ステートフルである」と言います。
ステートフルな処理は、ソースコードの安全性やパフォーマンスの面で好ましくありません。(実行の度に結果が変わってしまいます。そして、上記のように並列処理のバグが潜んだり、同期のために性能が劣化したりします。)
可能な限り、ステートフルな処理を排除し、ステートレスにすべきです。
次のサンプルコードB’’ は、「n番目」と記録することをやめた、見かけ上のステートレス・バージョンです。(greet 内での同期は不要になったため、synchronized を削除しています。但し、複数のスレッドから変数 greets に対する操作が発生するため、ArrayList を Collections#synchronizedList でラップしています。)
// サンプルコードB’と同じ部分は省略
private static List<String> greets =
Collections.synchronizedList(new ArrayList<>());
private static final Consumer<String> greet = member -> {
greets.add(String.format(
"%s: Hello, %s !", Instant.now(), member));
};
3. 副作用を避ける
サンプルコードB’’ の greet では、greets.add
を実行することにより、外部変数 greets の状態を変化させています。このような処理は「副作用がある」と言います。
(そしてこの処理のために、サンプルコードB’’ は実は未だにステートフルです。)
副作用を伴う処理は危険です。意図せぬステートフル化を招き、マルチスレッド・プログラミングに伴う様々な困難を招くからです。
サンプルコードB’’ を次のように変更することで、ストリーム処理の中から副作用を完全に排除することができます。
public class StreamTest {
private static final List<String> members =
Arrays.asList("A", "B", "C", "D", "E");
private static final Function<String, String> greet = member -> {
return String.format(
"%s: Hello, %s !", Instant.now(), member);
};
public static void main(String[] args) {
// 試行4
List<String> greets = members.parallelStream()
.map(greet)
.collect(Collectors.toList());
greets.forEach(System.out::println);
}
}
greet を Consumer<String> から Function<String, String> に変更したうえで、forEach 内で外部変数に記録する方式から、リダクション操作(collect)で集約する方式に変更しています。
サンプルコードCにおけるストリーム処理は副作用を伴わず、ステートレス※です。こうすることにより、並列処理でありながら、マルチスレッド・プログラミングに伴う同期制御から解放されています。
※ ただし理論上の厳密な意味では、きっと未だにステートフルなのだと思います。というのは、時々刻々と変化する Instant.now() を利用しているからです。
まとめ
- java.util.Stream を利用することにより簡単に並列処理を実現できますが、よく理解しないまま利用すると、検証・デバッグの困難なマルチスレッド起因のバグを招くことになります。
- 堅牢・高性能で保守しやすいソースコードのために、副作用を伴う処理やステートフルな処理は極力排除すべきです。
参考文献
主に java.util.stream パッケージの API ドキュメント を参照しました。
但し、本稿の内容は現時点における投稿者の理解を記したものであり、内容の正しさを保証するものではありません。間違いがありましたらご指摘いただけますと幸いです。