Stream Gatherers とは
- Stream API で任意の中間操作を実装できるようにする仕組み、API
- JDK 22 から試験的な導入が始まり、来年の2025年3月リリース予定の JDK 24 で正式リリースされる予定
- JDK 24 での JEP は 485
- Gatherer の読みは、「ぎゃざらー」(多分)
環境
------------------------------------------------------------
Gradle 8.11.1
------------------------------------------------------------
Build time: 2024-11-20 16:56:46 UTC
Revision: 481cb05a490e0ef9f8620f7873b83bd8a72e7c39
Kotlin: 2.0.20
Groovy: 3.0.22
Ant: Apache Ant(TM) version 1.10.14 compiled on August 16 2023
Launcher JVM: 23.0.1 (Eclipse Adoptium 23.0.1+11)
Daemon JVM: C:\pf\java\adoptium\jdk23 (no JDK specified, using current Java home)
OS: Windows 10 10.0 amd64
日本語の公式参考情報
- Gatherer の公式の日本語情報は以下が参考になる
- いずれも JDK 22 の情報だが、 Stream Gatherers に関する API は 22 から 24 まで一切変更されていないようなので 22 の情報でも多分問題ない
- JEP 473 (JDK 23) と JEP 485 (JDK 24) の冒頭の History のところで、それぞれ過去バージョンから変更していないことが明記されている
Hello World
実装
build.gradle
plugins {
id "application"
}
sourceCompatibility = 23
targetCompatibility = 23
application {
mainClassName = "sandbox.gatherer.Main"
applicationDefaultJvmArgs = ["--enable-preview"]
}
tasks.withType(JavaCompile).configureEach {
options.compilerArgs += "--enable-preview"
}
- JDK 23 ではまだ試験的な機能なので、
--enable-preview
を指定する必要がある
Main.java
package sandbox.gatherer;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer.Integrator<Void, Integer, String> squared =
Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
downstream.push(
String.format("%s * %s = %s", element, element, (element * element))
);
return true;
});
Stream.of(1, 2, 3, 4, 5)
.gather(Gatherer.of(squared))
.forEach(System.out::println);
}
}
実行結果
実行結果
1 * 1 = 1
2 * 2 = 4
3 * 3 = 9
4 * 4 = 16
5 * 5 = 25
説明
Main.java
Stream.of(1, 2, 3, 4, 5)
.gather(Gatherer.of(squared))
.forEach(System.out::println);
-
Stream
にgather
という中間操作のためのメソッドが追加されている - このメソッドに中間操作の処理を定義した
Gatherer
を渡すことで、任意の中間操作を実装できる
Main.java
Gatherer.Integrator<Void, Integer, String> squared =
Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
downstream.push(
String.format("%s * %s = %s", element, element, (element * element))
);
return true;
});
- ここでは、入力から受け取った数値(
element
)を二乗する数式(文字列)に変換して出力(downstream
)に渡している
Gatherer
Gatherer(重要なメソッドのみ抽出)
public interface Gatherer<T, A, R> {
...
Integrator<A, T, R> integrator();
default Supplier<A> initializer() {...}
default BiConsumer<A, Downstream<? super R>> finisher() {...}
default BinaryOperator<A> combiner() {...}
...
}
- 中間操作は Gatherer インタフェースを実装して作成する
-
Gatherer
に定義されているメソッドの内、重要なものは以下の4つになるintegrator
initializer
finisher
combiner
- 必須のメソッドは
integrator
のみで、残りはデフォルト実装が提供されていて上書きは任意となっている
Gatherer の生成について
Main.java
Stream.of(1, 2, 3, 4, 5)
.gather(Gatherer.of(squared))
.forEach(System.out::println);
-
Gatherer
インタフェースには、 of メソッド などのファクトリメソッドが用意されている -
Gatherer
インタフェースの抽象メソッドはintegrator
メソッドのみなので、ラムダ式で直接Gatherer
のインスタンスを生成することもできる - このため、わざわざ
of
メソッドを使う意味が無いようにも思える - しかし、このファクトリメソッドはストリームを 並列処理するときに意味が出てくる
- 詳細は並列処理のところで説明するので、今はとりあえず「
Gatherer
のインスタンスはof
メソッドで生成する」ということにしておく
integrator
Gaherer
public interface Gatherer<T, A, R> {
...
Integrator<A, T, R> integrator();
...
}
-
integrator
メソッドは、 Integrator インタフェース を実装したオブジェクトを返すように実装する -
Integrator
は中間操作の具体的な処理を提供するためのインタフェースで、以下のような定義になっている
Integrator
interface Integrator<A, T, R> {
boolean integrate(A state, T element, Downstream<? super R> downstream);
}
- integrate メソッド を実装する必要があり、この中で具体的な中間操作の処理を実装する
-
integrate
メソッドは、3つの引数を受け取る-
state
は中間操作のあいだだけ保持したい情報を記録しておくためのオブジェクトで、非公開状態オブジェクト (private state object) と呼ぶ- 詳細は
initializer
メソッドを説明するところで解説
- 詳細は
-
element
は中間操作への入力値で、前のストリームから渡ってきた値になる -
downstream
は中間操作の結果を後続のストリームに書き出すための API を提供する
-
-
integrate
メソッドはboolean
を返す必要がある-
true
を返した場合、ストリームの処理はそのまま続行される -
false
を返した場合、ストリームの処理はその時点で中断される(これを 短絡 (Short-circuit) と呼ぶ)
-
後続ストリームへの値の受け渡しを制御する
package sandbox.gatherer;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer.Integrator<Void, Integer, Integer> filterOddNumber =
Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
if (element % 2 != 0) {
downstream.push(element);
}
return true;
});
Stream.of(1, 2, 3, 4, 5)
.gather(Gatherer.of(filterOddNumber))
.forEach(System.out::println);
}
}
実行結果
1
3
5
-
Downstream
のpush
メソッドを使うことで、後続のストリームに値を受け渡すことができる -
push
を呼ばなければ、現在の要素(element
)に対応する値は次のストリームへ渡されないことになる - 逆に
push
を複数回呼べば、1つの入力要素から複数の出力に変換するようなこともできる
短絡を制御する
Main.java
package sandbox.gatherer;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer.Integrator<Void, Integer, Integer> filter =
Gatherer.Integrator.of((_, element, downstream) -> {
if (element == 9) {
return false;
}
downstream.push(element);
return true;
});
Stream.of(1, 2, 3, 9, 4, 5)
.gather(Gatherer.of(filter))
.forEach(System.out::println);
}
}
実行結果
1
2
3
-
integrate
メソッドがfalse
を返すと、その時点でストリームの処理が中断される - 上記例では、
9
が来たらfalse
を返して短絡するようにしている
Integrator インスタンスの生成方法
- ここまでの実装例の中で、
Integrator
のインスタンスを生成するために以下の二種類のファクトリメソッドを使用した例を記載した
ofGreedyを使ったケース
Gatherer.Integrator<Void, Integer, Integer> filterOddNumber =
Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
if (element % 2 != 0) {
downstream.push(element);
}
return true;
});
ofを使ったケース
Gatherer.Integrator<Void, Integer, Integer> filter =
Gatherer.Integrator.of((_, element, downstream) -> {
if (element == 9) {
return false;
}
downstream.push(element);
return true;
});
- そもそも
Integrator
は関数型インタフェースなので、ファクトリメソッドを使わなくてもラムダ式で直接インスタンスを生成することもできる
ラムダ式で直接生成した場合
Gatherer.Integrator<Void, Integer, Integer> filter =
(_, element, downstream) -> {
if (element == 9) {
return false;
}
downstream.push(element);
return true;
};
- 関数型インタフェースを利用した場合も、機能的な差は無い
- これらの生成方法の差は、 処理の最適化に関係している
-
ofGreedy
メソッドは、短絡を行わないIntegrator
を生成するときに利用することで、処理を最適化できるようになっている -
ofGreedy
メソッドは Greedy インタフェース を実装したインスタンスを返却する -
Greedy
はIntegrator
を継承したマーカーインタフェースで、短絡を行わないIntegrator
であることを表している- greedy は「貪欲」や「強欲」という意味
- 途中で短絡せずに全ての要素を処理するという意味合いから来ていると考えられる
- ストリームが処理される際、
Integrator
の実体がGreedy
かどうかで処理が調整されるようになっている - 以下は OpenJDK の実装の一部1 を抜粋したものになる
GathererOp
...
private <CA, CR> CR evaluate(...) {
...
// Optimization
final boolean greedy = integrator instanceof Integrator.Greedy<A, T, R>;
...
final class Sequential implements Consumer<T>, Gatherer.Downstream<R> {
...
@ForceInline
Sequential evaluateUsing(Spliterator<T> spliterator) {
if (greedy)
spliterator.forEachRemaining(this);
else
do {
} while (proceed && spliterator.tryAdvance(this));
return this;
}
...
-
integrator
がGreedy
かどうかを判定して、結果をgreedy
変数に記録している - そして
greedy
がtrue
かfalse
かで、Spliterator
をイテレートするときの処理をforEachRemaning
かtryAdvance
で切り分けている -
forEachRemaning
は残りの要素を一気にイテレートするのに対して、tryAdvance
は次の要素があるか都度確認しながらイテレートする形になる -
Greedy
は 短絡しないIntegrator
であることを表しているので、都度確認しながらイテレートする必要がなくforEachRemaning
で一気にイテレートする形になっている - つまり、短絡しないことが保証されている
Integrator
を使うのであればofGreedy
メソッドでインスタンスを生成した方が、より最適化された実装を選択できることになる
initializer, finisher
Main.java
package sandbox.gatherer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Supplier<List<Integer>> initializer = () -> {
System.out.println("** initializer **");
return new ArrayList<>(List.of(0));
};
Gatherer.Integrator<List<Integer>, String, Integer> integrator =
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
int totalLength = state.getFirst() + element.length();
state.set(0, totalLength);
System.out.printf("""
== integrator ==
element = %s
totalLength = %s
""", element, totalLength);
return true;
});
BinaryOperator<List<Integer>> combiner = Gatherer.defaultCombiner();
BiConsumer<List<Integer>, Gatherer.Downstream<? super Integer>> finisher =
(state, downstream) -> {
int totalLength = state.getFirst();
System.out.printf("""
++ finisher ++
totalLength = %s
""", totalLength);
downstream.push(totalLength);
};
int result = Stream.of("one", "two", "three", "four", "five")
.gather(Gatherer.of(initializer, integrator, combiner, finisher))
.findFirst()
.get();
System.out.println("result = " + result);
}
}
実行結果
** initializer **
== integrator ==
element = one
totalLength = 3
== integrator ==
element = two
totalLength = 6
== integrator ==
element = three
totalLength = 11
== integrator ==
element = four
totalLength = 15
== integrator ==
element = five
totalLength = 19
++ finisher ++
totalLength = 19
result = 19
- ここで実装している中間操作は、入力から受け取った文字列の文字数を合計して次に渡す、という実装になっている
initializerの実装
Supplier<List<Integer>> initializer = () -> {
System.out.println("** initializer **");
return new ArrayList<>(List.of(0));
};
- initializer は、ストリームの処理を開始する前に一度だけ呼ばれる
- initializer は、非公開状態オブジェクト(private state object) を返却するように実装する
- この非公開状態オブジェクトは、 integrator の処理が呼ばれるときに毎回渡されるので、イテレートのあいだ保持しておきたい情報を格納するために利用できる
- この実装では、文字数の合計を記録するために使用している
integratorの実装
Gatherer.Integrator<List<Integer>, String, Integer> integrator =
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
int totalLength = state.getFirst() + element.length();
state.set(0, totalLength);
System.out.printf("""
== integrator ==
element = %s
totalLength = %s
""", element, totalLength);
return true;
});
- ストリーム内の全ての要素が処理されると、最後に finisher の処理が呼ばれる
- finisher には非公開状態オブジェクトと
Downstream
が渡されるので、必要に応じてDownstream
に追加の要素を渡すことができる - この実装では、非公開状態オブジェクトに溜めていた文字数の合計を
Downstream
に流している
finisherの実装
BiConsumer<List<Integer>, Gatherer.Downstream<? super Integer>> finisher =
(state, downstream) -> {
int totalLength = state.getFirst();
System.out.printf("""
++ finisher ++
totalLength = %s
""", totalLength);
downstream.push(totalLength);
};
- なお、 combiner についてはデフォルトの
Gatherer.defaultCombiner()
を利用している - combiner についてはこの後で説明する
combiner (並列実行)
Main.java
package sandbox.gatherer;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Supplier<List<Integer>> initializer = () -> {
print("initializer");
return new ArrayList<>(List.of(0));
};
Gatherer.Integrator<List<Integer>, String, Integer> integrator =
Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
int totalLength = state.getFirst() + element.length();
state.set(0, totalLength);
print("integrator");
return true;
});
BinaryOperator<List<Integer>> combiner = (state1, state2) -> {
int totalOfState1 = state1.getFirst();
int totalOfState2 = state2.getFirst();
int total = totalOfState1 + totalOfState2;
print("combiner");
return new ArrayList<>(List.of(total));
};
BiConsumer<List<Integer>, Gatherer.Downstream<? super Integer>> finisher =
(state, downstream) -> {
print("finisher");
downstream.push(state.getFirst());
};
int result = Stream.of("one", "two", "three", "four", "five")
.gather(Gatherer.of(initializer, integrator, combiner, finisher))
.parallel()
.findFirst()
.get();
System.out.println("result = " + result);
}
private static void print(String message) {
System.out.println(Thread.currentThread().getName() + ": " + message);
}
}
実行結果
ForkJoinPool.commonPool-worker-2: initializer
main: initializer
ForkJoinPool.commonPool-worker-3: initializer
ForkJoinPool.commonPool-worker-3: integrator
main: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-4: initializer
ForkJoinPool.commonPool-worker-1: initializer
ForkJoinPool.commonPool-worker-4: integrator
ForkJoinPool.commonPool-worker-1: integrator
ForkJoinPool.commonPool-worker-1: combiner
ForkJoinPool.commonPool-worker-4: combiner
ForkJoinPool.commonPool-worker-4: combiner
ForkJoinPool.commonPool-worker-4: combiner
main: finisher
result = 19
- 文字数の合計を処理するという実装は initializer, finisher の例と同じにしている
- 異なるのは、
combiner
を実装している点と、ストリームの実行にparallel
を加えて並列処理にしている点 -
initializer
,integrator
がそれぞれ異なるスレッドで並列に実行されていることがわかる-
initializer
がスレッドごとに実行されているということは、非公開状態オブジェクトはスレッドごとに生成されていることが分かる - したがって、非公開状態オブジェクトはスレッドセーフでないオブジェクトでも問題ない(スレッド間で共有されることはない)
-
combinerの実装
BinaryOperator<List<Integer>> combiner = (state1, state2) -> {
int totalOfState1 = state1.getFirst();
int totalOfState2 = state2.getFirst();
int total = totalOfState1 + totalOfState2;
print("combiner");
return new ArrayList<>(List.of(total));
};
-
combiner
は、ストリームが並列で実行された際に呼び出される - 引数として、2つの非公開状態オブジェクトが渡される
- これらは並列実行された各スレッドによって処理された非公開状態オブジェクトであり、必要に応じて1つの非公開状態オブジェクトに統合して返却するように実装する
of と ofSequential
-
Gatherer
インタフェースには、Gatherer
インスタンスを生成するためのファクトリメソッドが大きく2種類用意されている - 一見すると、いずれも
integrator
を受け取ってGatherer
を返しているのは同じで違いが無いように見える - これらは、ストリームを並列にして実行すると違いがわかる
Main.java
package sandbox.gatherer;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer.Integrator<Void, Integer, Integer> integrator =
Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
downstream.push(element);
System.out.println(Thread.currentThread().getName() + ": integrator");
return true;
});
Gatherer<Integer, Void, Integer> gatherer
= Gatherer.of(integrator);
Gatherer<Integer, Void, Integer> sequentialGatherer
= Gatherer.ofSequential(integrator);
System.out.println("[gatherer]");
Stream.of(1, 2, 3, 4, 5)
.parallel()
.gather(gatherer)
.forEach(_ -> {});
System.out.println("[sequentialGatherer]");
Stream.of(1, 2, 3, 4, 5)
.parallel()
.gather(sequentialGatherer)
.forEach(_ -> {});
}
}
- 同じ
Integrator
を元に、of
とofSequential
の2つでGatherer
インスタンスを生成し、同じ内容のStream
にそれぞれ適用して並列で起動している
実行結果
[gatherer]
ForkJoinPool.commonPool-worker-4: integrator
main: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-1: integrator
ForkJoinPool.commonPool-worker-3: integrator
[sequentialGatherer]
ForkJoinPool.commonPool-worker-5: integrator
ForkJoinPool.commonPool-worker-5: integrator
ForkJoinPool.commonPool-worker-5: integrator
ForkJoinPool.commonPool-worker-5: integrator
ForkJoinPool.commonPool-worker-5: integrator
-
of
で生成した方は、Integrator
の処理が異なるスレッドで実行されている - 一方で、
ofSequential
で生成したIntegrator
は、全て同じスレッドで実行されている -
ofSequential
で生成されたGatherer
はシーケンシャルな Gathererと判断され、ストリームを並列実行しても単一のスレッドでしか実行されないようになっている - したがって、マルチスレッドで実行されても問題ない
Gatherer
を作りたい場合はof
メソッドでインスタンスを生成しておく必要がある- 間違えて
ofSequential
で生成すると、実装上は並列実行になっていても実際には単一スレッドで実行されるようなことになりかねない
- 間違えて
- 逆にマルチスレッドでの実行は想定していない
Gatherer
を用意する場合は、ofSequential
メソッドでインスタンスを生成しておかないといけない
ラムダ式で Gatherer を生成するとシーケンシャルな Gatherer になる
- ところで
Gatherer
インタフェースは抽象メソッドがintegrator
の1つだけなので、関数型インタフェースとなっている - したがって、
Gatherer
のインスタンスはラムダ式で生成することでもきる
Gathererをラムダ式で生成した場合
package sandbox.gatherer;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer.Integrator<Void, Integer, Integer> integrator =
Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
downstream.push(element);
System.out.println(Thread.currentThread().getName() + ": integrator");
return true;
});
Gatherer<Integer, Void, Integer> gatherer = () -> integrator;
System.out.println("[gatherer]");
Stream.of(1, 2, 3, 4, 5)
.parallel()
.gather(gatherer)
.forEach(_ -> {});
}
}
-
of
およびofSequential
を使わずに、ラムダ式だけでGatherer
インスタンスを生成している - このラムダ式で生成した
Gatherer
を使ってストリームを並列で起動している
実行結果
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-2: integrator
- ラムダ式だけで生成した
Gatherer
を使うと、ストリームを並列実行しても単一スレッドで実行されてしまう - つまり、シーケンシャルな Gatherer となっている
-
Gatherer
の Javadoc には以下のように書かれている
コンビナがdefaultCombiner()であるGatherersは、順次しか評価できません。
https://docs.oracle.com/javase/jp/22/docs/api/java.base/java/util/stream/Gatherer.html
-
Gatherer
のcombiner
メソッドはデフォルト実装が提供されておりdefaultCombiner()
の結果を返すように実装されている - したがって、ラムダ式で生成した
Gatherer
の combiner はdefaultCombiner()
と同じモノを返すことになり、順次しか評価できない=シーケンシャルな Gatherer として扱われることになる - このため、並列処理されると困るような実装でない限り、
Gatherer
のインスタンスはof
メソッドで生成しておくのが無難と考えられる- ラムダ式で生成していると並列で実行しても単一スレッドでしか動かず、パフォーマンスがよくならない可能性がある
組み込みの Gatherer
Gatherers というクラスに、組み込みの Gatherer
を返すメソッドがいくつか用意されている。
fold
Main.java
package sandbox.gatherer;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer<Integer, ?, String> fold =
Gatherers.fold(() -> "@", (state, element) -> state + element);
String result = Stream.of(1, 2, 3, 4, 5)
.gather(fold)
.findFirst()
.get();
System.out.println("result = " + result);
}
}
実行結果
result = @12345
- fold メソッドを使用すると、ストリームの各要素を単一の結果にまとめる(畳みこむ)処理を実装できる
- fold は「畳みこむ」という意味
-
fold
メソッドの第一引数には、畳み込み処理の初期値を返す関数を渡す - 第二引数は、畳み込み後の値と現在の要素が渡されるので、畳み込み後の値を返すように実装した関数を渡す
mapConcurrent
Main.java
package sandbox.gatherer;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer<Integer, ?, String> mapConcurrent
= Gatherers.mapConcurrent(3, element -> {
Thread thread = Thread.currentThread();
System.out.printf("""
id=%s, isVirtual=%s
""", thread.threadId(), thread.isVirtual());
return "[" + element + "]";
});
Stream.of(1, 2, 3, 4, 5)
.gather(mapConcurrent)
.forEach(System.out::println);
}
}
実行結果
id=30, isVirtual=true
id=32, isVirtual=true
id=34, isVirtual=true
[1]
[2]
[3]
id=38, isVirtual=true
id=37, isVirtual=true
[4]
[5]
-
mapConcurrent メソッド を使用すると、変換を仮装スレッドで行う
Gatherer
を生成できる - 第一引数には同時実行数を渡し、第二引数には変換処理を渡す
- ストリームの順序は保存(維持)される
scan
Main.java
package sandbox.gatherer;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer<Integer, ?, String> fold =
Gatherers.scan(() -> "@", (state, element) -> state + element);
Stream.of(1, 2, 3, 4, 5)
.gather(fold)
.forEach(System.out::println);
}
}
実行結果
@1
@12
@123
@1234
@12345
-
fold
は結果を1つに畳み込んでいたのに対して、 scan メソッド は結果を蓄積しつつ、各イテレートの結果はそのまま次のストリームに渡される
windowFixed
Main.java
package sandbox.gatherer;
import java.util.List;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer<Integer, ?, List<Integer>> windowFixed
= Gatherers.windowFixed(2);
List<List<Integer>> result = Stream.of(1, 2, 3, 4, 5)
.gather(windowFixed)
.toList();
System.out.println("result = " + result);
}
}
実行結果
result = [[1, 2], [3, 4], [5]]
-
windowFixed メソッド を使うと、各要素を指定された要素数の
List
で切り分けることができる
windowSliding
Main.java
package sandbox.gatherer;
import java.util.List;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer<Integer, ?, List<Integer>> windowFixed
= Gatherers.windowSliding(3);
List<List<Integer>> result = Stream.of(1, 2, 3, 4, 5)
.gather(windowFixed)
.toList();
System.out.println("result = " + result);
}
}
実行結果
result = [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
-
windowSliding メソッド を使用すると、各要素を指定された要素数の
List
で切り分けることができる - このとき、切り分けられた各
List
の中身は、1つずつスライドするように格納される
Gatherer の連結
Main.java
package sandbox.gatherer;
import java.util.stream.Gatherer;
import java.util.stream.Stream;
public class Main {
public static void main(String[] args) {
Gatherer<String, Void, String> toUpperCase
= Gatherer.of((_, element, downstream) -> {
downstream.push(element.toUpperCase());
return true;
});
Gatherer<String, Void, String> wrapWithParentheses
= Gatherer.of((_, element, downstream) -> {
downstream.push("(" + element + ")");
return true;
});
Gatherer<String, ?, String> gatherer = toUpperCase.andThen(wrapWithParentheses);
Stream.of("one", "two", "three")
.gather(gatherer)
.forEach(System.out::println);
}
}
- 入力値を大文字に変換する
Gatherer
と括弧で囲うGatherer
を用意して、andThen
で連結してから利用している
実行結果
(ONE)
(TWO)
(THREE)
-
Gatherer
はandThen
であらかじめ連結しておくことができる