8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

Stream Gatherers使い方メモ

Posted at

Stream Gatherers とは

  • Stream API で任意の中間操作を実装できるようにする仕組み、API
  • JDK 22 から試験的な導入が始まり、来年の2025年3月リリース予定の JDK 24 で正式リリースされる予定
  • JDK 24 での JEP は 485
  • Gatherer の読みは、「ぎゃざらー」(多分)

環境

------------------------------------------------------------
Gradle 8.11.1
------------------------------------------------------------

Build time:    2024-11-20 16:56:46 UTC
Revision:      481cb05a490e0ef9f8620f7873b83bd8a72e7c39

Kotlin:        2.0.20
Groovy:        3.0.22
Ant:           Apache Ant(TM) version 1.10.14 compiled on August 16 2023
Launcher JVM:  23.0.1 (Eclipse Adoptium 23.0.1+11)
Daemon JVM:    C:\pf\java\adoptium\jdk23 (no JDK specified, using current Java home)
OS:            Windows 10 10.0 amd64

日本語の公式参考情報

Hello World

実装

build.gradle
plugins {
    id "application"
}

sourceCompatibility = 23
targetCompatibility = 23

application {
    mainClassName = "sandbox.gatherer.Main"
    applicationDefaultJvmArgs = ["--enable-preview"]
}

tasks.withType(JavaCompile).configureEach {
    options.compilerArgs += "--enable-preview"
}
  • JDK 23 ではまだ試験的な機能なので、 --enable-preview を指定する必要がある
Main.java
package sandbox.gatherer;

import java.util.stream.Gatherer;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer.Integrator<Void, Integer, String> squared =
            Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
                downstream.push(
                    String.format("%s * %s = %s", element, element, (element * element))
                );
                return true;
            });

        Stream.of(1, 2, 3, 4, 5)
                .gather(Gatherer.of(squared))
                .forEach(System.out::println);
    }
}

実行結果

実行結果
1 * 1 = 1
2 * 2 = 4
3 * 3 = 9
4 * 4 = 16
5 * 5 = 25

説明

Main.java
        Stream.of(1, 2, 3, 4, 5)
                .gather(Gatherer.of(squared))
                .forEach(System.out::println);
  • Streamgather という中間操作のためのメソッドが追加されている
  • このメソッドに中間操作の処理を定義した Gatherer を渡すことで、任意の中間操作を実装できる
Main.java
        Gatherer.Integrator<Void, Integer, String> squared =
            Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
                downstream.push(
                    String.format("%s * %s = %s", element, element, (element * element))
                );
                return true;
            });
  • ここでは、入力から受け取った数値(element)を二乗する数式(文字列)に変換して出力(downstream)に渡している

Gatherer

Gatherer(重要なメソッドのみ抽出)
public interface Gatherer<T, A, R> {
    ...
    Integrator<A, T, R> integrator();
    default Supplier<A> initializer() {...}
    default BiConsumer<A, Downstream<? super R>> finisher() {...}
    default BinaryOperator<A> combiner() {...}
    ...
}
  • 中間操作は Gatherer インタフェースを実装して作成する
  • Gatherer に定義されているメソッドの内、重要なものは以下の4つになる
    • integrator
    • initializer
    • finisher
    • combiner
  • 必須のメソッドは integrator のみで、残りはデフォルト実装が提供されていて上書きは任意となっている

Gatherer の生成について

Main.java
        Stream.of(1, 2, 3, 4, 5)
                .gather(Gatherer.of(squared))
                .forEach(System.out::println);
  • Gatherer インタフェースには、 of メソッド などのファクトリメソッドが用意されている
  • Gatherer インタフェースの抽象メソッドは integrator メソッドのみなので、ラムダ式で直接 Gatherer のインスタンスを生成することもできる
  • このため、わざわざ of メソッドを使う意味が無いようにも思える
  • しかし、このファクトリメソッドはストリームを 並列処理するときに意味が出てくる
  • 詳細は並列処理のところで説明するので、今はとりあえず「Gatherer のインスタンスは of メソッドで生成する」ということにしておく

integrator

Gaherer
public interface Gatherer<T, A, R> {
    ...
    Integrator<A, T, R> integrator();
    ...
}
  • integrator メソッドは、 Integrator インタフェース を実装したオブジェクトを返すように実装する
  • Integrator は中間操作の具体的な処理を提供するためのインタフェースで、以下のような定義になっている
Integrator
interface Integrator<A, T, R> {
    boolean integrate(A state, T element, Downstream<? super R> downstream);
}
  • integrate メソッド を実装する必要があり、この中で具体的な中間操作の処理を実装する
  • integrate メソッドは、3つの引数を受け取る
    • state は中間操作のあいだだけ保持したい情報を記録しておくためのオブジェクトで、非公開状態オブジェクト (private state object) と呼ぶ
      • 詳細は initializer メソッドを説明するところで解説
    • element は中間操作への入力値で、前のストリームから渡ってきた値になる
    • downstream は中間操作の結果を後続のストリームに書き出すための API を提供する
  • integrate メソッドは boolean を返す必要がある
    • true を返した場合、ストリームの処理はそのまま続行される
    • false を返した場合、ストリームの処理はその時点で中断される(これを 短絡 (Short-circuit) と呼ぶ)

後続ストリームへの値の受け渡しを制御する

package sandbox.gatherer;

import java.util.stream.Gatherer;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer.Integrator<Void, Integer, Integer> filterOddNumber =
            Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
                if (element % 2 != 0) {
                    downstream.push(element);
                }
                return true;
            });

        Stream.of(1, 2, 3, 4, 5)
                .gather(Gatherer.of(filterOddNumber))
                .forEach(System.out::println);
    }
}
実行結果
1
3
5
  • Downstreampush メソッドを使うことで、後続のストリームに値を受け渡すことができる
  • push を呼ばなければ、現在の要素(element)に対応する値は次のストリームへ渡されないことになる
  • 逆に push を複数回呼べば、1つの入力要素から複数の出力に変換するようなこともできる

短絡を制御する

Main.java
package sandbox.gatherer;

import java.util.stream.Gatherer;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer.Integrator<Void, Integer, Integer> filter =
            Gatherer.Integrator.of((_, element, downstream) -> {
                if (element == 9) {
                    return false;
                }

                downstream.push(element);
                return true;
            });

        Stream.of(1, 2, 3, 9, 4, 5)
                .gather(Gatherer.of(filter))
                .forEach(System.out::println);
    }
}
実行結果
1
2
3
  • integrate メソッドが false を返すと、その時点でストリームの処理が中断される
  • 上記例では、 9 が来たら false を返して短絡するようにしている

Integrator インスタンスの生成方法

ofGreedyを使ったケース
        Gatherer.Integrator<Void, Integer, Integer> filterOddNumber =
            Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
                if (element % 2 != 0) {
                    downstream.push(element);
                }
                return true;
            });
ofを使ったケース
        Gatherer.Integrator<Void, Integer, Integer> filter =
            Gatherer.Integrator.of((_, element, downstream) -> {
                if (element == 9) {
                    return false;
                }

                downstream.push(element);
                return true;
            });
  • そもそも Integrator は関数型インタフェースなので、ファクトリメソッドを使わなくてもラムダ式で直接インスタンスを生成することもできる
ラムダ式で直接生成した場合
        Gatherer.Integrator<Void, Integer, Integer> filter =
            (_, element, downstream) -> {
                if (element == 9) {
                    return false;
                }

                downstream.push(element);
                return true;
            };
  • 関数型インタフェースを利用した場合も、機能的な差は無い
  • これらの生成方法の差は、 処理の最適化に関係している
  • ofGreedy メソッドは、短絡を行わない Integrator を生成するときに利用することで、処理を最適化できるようになっている
  • ofGreedy メソッドは Greedy インタフェース を実装したインスタンスを返却する
  • GreedyIntegrator を継承したマーカーインタフェースで、短絡を行わない Integrator であることを表している
    • greedy は「貪欲」や「強欲」という意味
    • 途中で短絡せずに全ての要素を処理するという意味合いから来ていると考えられる
  • ストリームが処理される際、 Integrator の実体が Greedy かどうかで処理が調整されるようになっている
  • 以下は OpenJDK の実装の一部1 を抜粋したものになる
GathererOp
    ...
    private <CA, CR> CR evaluate(...) {
        ...

        // Optimization
        final boolean greedy = integrator instanceof Integrator.Greedy<A, T, R>;

        ...
        final class Sequential implements Consumer<T>, Gatherer.Downstream<R> {
            ...

            @ForceInline
            Sequential evaluateUsing(Spliterator<T> spliterator) {
                if (greedy)
                    spliterator.forEachRemaining(this);
                else
                    do {
                    } while (proceed && spliterator.tryAdvance(this));

                return this;
            }
    ...
  • integratorGreedy かどうかを判定して、結果を greedy 変数に記録している
  • そして greedytruefalse かで、 Spliterator をイテレートするときの処理を forEachRemaningtryAdvance で切り分けている
  • forEachRemaning は残りの要素を一気にイテレートするのに対して、 tryAdvance は次の要素があるか都度確認しながらイテレートする形になる
  • Greedy短絡しない Integrator であることを表しているので、都度確認しながらイテレートする必要がなく forEachRemaning で一気にイテレートする形になっている
  • つまり、短絡しないことが保証されている Integrator を使うのであれば ofGreedy メソッドでインスタンスを生成した方が、より最適化された実装を選択できることになる

initializer, finisher

Main.java
package sandbox.gatherer;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
import java.util.stream.Gatherer;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Supplier<List<Integer>> initializer = () -> {
            System.out.println("** initializer **");
            return new ArrayList<>(List.of(0));
        };

        Gatherer.Integrator<List<Integer>, String, Integer> integrator =
            Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                int totalLength = state.getFirst() + element.length();
                state.set(0, totalLength);

                System.out.printf("""
                == integrator ==
                  element = %s
                  totalLength = %s
                """, element, totalLength);
                return true;
            });

        BinaryOperator<List<Integer>> combiner = Gatherer.defaultCombiner();

        BiConsumer<List<Integer>, Gatherer.Downstream<? super Integer>> finisher =
            (state, downstream) -> {
                int totalLength = state.getFirst();
                System.out.printf("""
                ++ finisher ++
                  totalLength = %s
                """, totalLength);
                downstream.push(totalLength);
            };

        int result = Stream.of("one", "two", "three", "four", "five")
                    .gather(Gatherer.of(initializer, integrator, combiner, finisher))
                    .findFirst()
                    .get();

        System.out.println("result = " + result);
    }
}
実行結果
** initializer **
== integrator ==
  element = one
  totalLength = 3
== integrator ==
  element = two
  totalLength = 6
== integrator ==
  element = three
  totalLength = 11
== integrator ==
  element = four
  totalLength = 15
== integrator ==
  element = five
  totalLength = 19
++ finisher ++
  totalLength = 19
result = 19
  • ここで実装している中間操作は、入力から受け取った文字列の文字数を合計して次に渡す、という実装になっている
initializerの実装
        Supplier<List<Integer>> initializer = () -> {
            System.out.println("** initializer **");
            return new ArrayList<>(List.of(0));
        };
  • initializer は、ストリームの処理を開始する前に一度だけ呼ばれる
  • initializer は、非公開状態オブジェクト(private state object) を返却するように実装する
  • この非公開状態オブジェクトは、 integrator の処理が呼ばれるときに毎回渡されるので、イテレートのあいだ保持しておきたい情報を格納するために利用できる
  • この実装では、文字数の合計を記録するために使用している
integratorの実装
        Gatherer.Integrator<List<Integer>, String, Integer> integrator =
            Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                int totalLength = state.getFirst() + element.length();
                state.set(0, totalLength);

                System.out.printf("""
                == integrator ==
                  element = %s
                  totalLength = %s
                """, element, totalLength);
                return true;
            });
  • ストリーム内の全ての要素が処理されると、最後に finisher の処理が呼ばれる
  • finisher には非公開状態オブジェクトと Downstream が渡されるので、必要に応じて Downstream に追加の要素を渡すことができる
  • この実装では、非公開状態オブジェクトに溜めていた文字数の合計を Downstream に流している
finisherの実装
        BiConsumer<List<Integer>, Gatherer.Downstream<? super Integer>> finisher =
            (state, downstream) -> {
                int totalLength = state.getFirst();
                System.out.printf("""
                ++ finisher ++
                  totalLength = %s
                """, totalLength);
                downstream.push(totalLength);
            };
  • なお、 combiner についてはデフォルトの Gatherer.defaultCombiner() を利用している
  • combiner についてはこの後で説明する

combiner (並列実行)

Main.java
package sandbox.gatherer;

import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Supplier;
import java.util.stream.Gatherer;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Supplier<List<Integer>> initializer = () -> {
            print("initializer");
            return new ArrayList<>(List.of(0));
        };

        Gatherer.Integrator<List<Integer>, String, Integer> integrator =
            Gatherer.Integrator.ofGreedy((state, element, downstream) -> {
                int totalLength = state.getFirst() + element.length();
                state.set(0, totalLength);

                print("integrator");
                return true;
            });

        BinaryOperator<List<Integer>> combiner = (state1, state2) -> {
            int totalOfState1 = state1.getFirst();
            int totalOfState2 = state2.getFirst();
            int total = totalOfState1 + totalOfState2;

            print("combiner");
            return new ArrayList<>(List.of(total));
        };

        BiConsumer<List<Integer>, Gatherer.Downstream<? super Integer>> finisher =
            (state, downstream) -> {
                print("finisher");
                downstream.push(state.getFirst());
            };

        int result = Stream.of("one", "two", "three", "four", "five")
                    .gather(Gatherer.of(initializer, integrator, combiner, finisher))
                    .parallel()
                    .findFirst()
                    .get();

        System.out.println("result = " + result);
    }

    private static void print(String message) {
        System.out.println(Thread.currentThread().getName() + ": " + message);
    }
}
実行結果
ForkJoinPool.commonPool-worker-2: initializer
main: initializer
ForkJoinPool.commonPool-worker-3: initializer
ForkJoinPool.commonPool-worker-3: integrator
main: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-4: initializer
ForkJoinPool.commonPool-worker-1: initializer
ForkJoinPool.commonPool-worker-4: integrator
ForkJoinPool.commonPool-worker-1: integrator
ForkJoinPool.commonPool-worker-1: combiner
ForkJoinPool.commonPool-worker-4: combiner
ForkJoinPool.commonPool-worker-4: combiner
ForkJoinPool.commonPool-worker-4: combiner
main: finisher
result = 19
  • 文字数の合計を処理するという実装は initializer, finisher の例と同じにしている
  • 異なるのは、 combiner を実装している点と、ストリームの実行に parallel を加えて並列処理にしている点
  • initializer, integrator がそれぞれ異なるスレッドで並列に実行されていることがわかる
    • initializer がスレッドごとに実行されているということは、非公開状態オブジェクトはスレッドごとに生成されていることが分かる
    • したがって、非公開状態オブジェクトはスレッドセーフでないオブジェクトでも問題ない(スレッド間で共有されることはない)
combinerの実装
        BinaryOperator<List<Integer>> combiner = (state1, state2) -> {
            int totalOfState1 = state1.getFirst();
            int totalOfState2 = state2.getFirst();
            int total = totalOfState1 + totalOfState2;

            print("combiner");
            return new ArrayList<>(List.of(total));
        };
  • combiner は、ストリームが並列で実行された際に呼び出される
  • 引数として、2つの非公開状態オブジェクトが渡される
  • これらは並列実行された各スレッドによって処理された非公開状態オブジェクトであり、必要に応じて1つの非公開状態オブジェクトに統合して返却するように実装する

of と ofSequential

  • Gatherer インタフェースには、 Gatherer インスタンスを生成するためのファクトリメソッドが大きく2種類用意されている
  • 一見すると、いずれも integrator を受け取って Gatherer を返しているのは同じで違いが無いように見える
  • これらは、ストリームを並列にして実行すると違いがわかる
Main.java
package sandbox.gatherer;

import java.util.stream.Gatherer;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer.Integrator<Void, Integer, Integer> integrator =
            Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
                downstream.push(element);
                System.out.println(Thread.currentThread().getName() + ": integrator");
                return true;
            });

        Gatherer<Integer, Void, Integer> gatherer
            = Gatherer.of(integrator);
        Gatherer<Integer, Void, Integer> sequentialGatherer
            = Gatherer.ofSequential(integrator);

        System.out.println("[gatherer]");
        Stream.of(1, 2, 3, 4, 5)
            .parallel()
            .gather(gatherer)
            .forEach(_ -> {});

        System.out.println("[sequentialGatherer]");
        Stream.of(1, 2, 3, 4, 5)
            .parallel()
            .gather(sequentialGatherer)
            .forEach(_ -> {});
    }
}
  • 同じ Integrator を元に、 ofofSequential の2つで Gatherer インスタンスを生成し、同じ内容の Stream にそれぞれ適用して並列で起動している
実行結果
[gatherer]
ForkJoinPool.commonPool-worker-4: integrator
main: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-1: integrator
ForkJoinPool.commonPool-worker-3: integrator

[sequentialGatherer]
ForkJoinPool.commonPool-worker-5: integrator
ForkJoinPool.commonPool-worker-5: integrator
ForkJoinPool.commonPool-worker-5: integrator
ForkJoinPool.commonPool-worker-5: integrator
ForkJoinPool.commonPool-worker-5: integrator
  • of で生成した方は、 Integrator の処理が異なるスレッドで実行されている
  • 一方で、 ofSequential で生成した Integrator は、全て同じスレッドで実行されている
  • ofSequential で生成された Gathererシーケンシャルな Gathererと判断され、ストリームを並列実行しても単一のスレッドでしか実行されないようになっている
  • したがって、マルチスレッドで実行されても問題ない Gatherer を作りたい場合は of メソッドでインスタンスを生成しておく必要がある
    • 間違えて ofSequential で生成すると、実装上は並列実行になっていても実際には単一スレッドで実行されるようなことになりかねない
  • 逆にマルチスレッドでの実行は想定していない Gatherer を用意する場合は、 ofSequential メソッドでインスタンスを生成しておかないといけない

ラムダ式で Gatherer を生成するとシーケンシャルな Gatherer になる

  • ところで Gatherer インタフェースは抽象メソッドが integrator の1つだけなので、関数型インタフェースとなっている
  • したがって、 Gatherer のインスタンスはラムダ式で生成することでもきる
Gathererをラムダ式で生成した場合
package sandbox.gatherer;

import java.util.stream.Gatherer;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer.Integrator<Void, Integer, Integer> integrator =
            Gatherer.Integrator.ofGreedy((_, element, downstream) -> {
                downstream.push(element);
                System.out.println(Thread.currentThread().getName() + ": integrator");
                return true;
            });

        Gatherer<Integer, Void, Integer> gatherer = () -> integrator;

        System.out.println("[gatherer]");
        Stream.of(1, 2, 3, 4, 5)
            .parallel()
            .gather(gatherer)
            .forEach(_ -> {});
    }
}
  • of および ofSequential を使わずに、ラムダ式だけで Gatherer インスタンスを生成している
  • このラムダ式で生成した Gatherer を使ってストリームを並列で起動している
実行結果
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-2: integrator
ForkJoinPool.commonPool-worker-2: integrator
  • ラムダ式だけで生成した Gatherer を使うと、ストリームを並列実行しても単一スレッドで実行されてしまう
  • つまり、シーケンシャルな Gatherer となっている
  • Gatherer の Javadoc には以下のように書かれている

コンビナがdefaultCombiner()であるGatherersは、順次しか評価できません
https://docs.oracle.com/javase/jp/22/docs/api/java.base/java/util/stream/Gatherer.html

  • Gatherercombiner メソッドはデフォルト実装が提供されており defaultCombiner() の結果を返すように実装されている
  • したがって、ラムダ式で生成した Gatherer の combiner は defaultCombiner() と同じモノを返すことになり、順次しか評価できない=シーケンシャルな Gatherer として扱われることになる
  • このため、並列処理されると困るような実装でない限り、 Gatherer のインスタンスは of メソッドで生成しておくのが無難と考えられる
    • ラムダ式で生成していると並列で実行しても単一スレッドでしか動かず、パフォーマンスがよくならない可能性がある

組み込みの Gatherer

Gatherers というクラスに、組み込みの Gatherer を返すメソッドがいくつか用意されている。

fold

Main.java
package sandbox.gatherer;

import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer<Integer, ?, String> fold =
            Gatherers.fold(() -> "@", (state, element) -> state + element);

        String result = Stream.of(1, 2, 3, 4, 5)
                .gather(fold)
                .findFirst()
                .get();

        System.out.println("result = " + result);
    }
}
実行結果
result = @12345
  • fold メソッドを使用すると、ストリームの各要素を単一の結果にまとめる(畳みこむ)処理を実装できる
  • fold は「畳みこむ」という意味
  • fold メソッドの第一引数には、畳み込み処理の初期値を返す関数を渡す
  • 第二引数は、畳み込み後の値と現在の要素が渡されるので、畳み込み後の値を返すように実装した関数を渡す

mapConcurrent

Main.java
package sandbox.gatherer;

import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer<Integer, ?, String> mapConcurrent
            = Gatherers.mapConcurrent(3, element -> {
                Thread thread = Thread.currentThread();
                System.out.printf("""
                id=%s, isVirtual=%s
                """, thread.threadId(), thread.isVirtual());
                return "[" + element + "]";
            });

        Stream.of(1, 2, 3, 4, 5)
                .gather(mapConcurrent)
                .forEach(System.out::println);
    }
}
実行結果
id=30, isVirtual=true
id=32, isVirtual=true
id=34, isVirtual=true
[1]
[2]
[3]
id=38, isVirtual=true
id=37, isVirtual=true
[4]
[5]
  • mapConcurrent メソッド を使用すると、変換を仮装スレッドで行う Gatherer を生成できる
  • 第一引数には同時実行数を渡し、第二引数には変換処理を渡す
  • ストリームの順序は保存(維持)される

scan

Main.java
package sandbox.gatherer;

import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer<Integer, ?, String> fold =
                Gatherers.scan(() -> "@", (state, element) -> state + element);

        Stream.of(1, 2, 3, 4, 5)
                .gather(fold)
                .forEach(System.out::println);
    }
}
実行結果
@1
@12
@123
@1234
@12345
  • fold は結果を1つに畳み込んでいたのに対して、 scan メソッド は結果を蓄積しつつ、各イテレートの結果はそのまま次のストリームに渡される

windowFixed

Main.java
package sandbox.gatherer;

import java.util.List;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer<Integer, ?, List<Integer>> windowFixed
            = Gatherers.windowFixed(2);

        List<List<Integer>> result = Stream.of(1, 2, 3, 4, 5)
                .gather(windowFixed)
                .toList();

        System.out.println("result = " + result);
    }
}
実行結果
result = [[1, 2], [3, 4], [5]]
  • windowFixed メソッド を使うと、各要素を指定された要素数の List で切り分けることができる

windowSliding

Main.java
package sandbox.gatherer;

import java.util.List;
import java.util.stream.Gatherer;
import java.util.stream.Gatherers;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer<Integer, ?, List<Integer>> windowFixed
            = Gatherers.windowSliding(3);

        List<List<Integer>> result = Stream.of(1, 2, 3, 4, 5)
                .gather(windowFixed)
                .toList();

        System.out.println("result = " + result);
    }
}
実行結果
result = [[1, 2, 3], [2, 3, 4], [3, 4, 5]]
  • windowSliding メソッド を使用すると、各要素を指定された要素数の List で切り分けることができる
  • このとき、切り分けられた各 List の中身は、1つずつスライドするように格納される

Gatherer の連結

Main.java
package sandbox.gatherer;

import java.util.stream.Gatherer;
import java.util.stream.Stream;

public class Main {
    public static void main(String[] args) {
        Gatherer<String, Void, String> toUpperCase
            = Gatherer.of((_, element, downstream) -> {
                downstream.push(element.toUpperCase());
                return true;
            });
        Gatherer<String, Void, String> wrapWithParentheses
            = Gatherer.of((_, element, downstream) -> {
                downstream.push("(" + element + ")");
                return true;
            });

        Gatherer<String, ?, String> gatherer = toUpperCase.andThen(wrapWithParentheses);

        Stream.of("one", "two", "three")
                .gather(gatherer)
                .forEach(System.out::println);
    }
}
  • 入力値を大文字に変換する Gatherer と括弧で囲う Gatherer を用意して、 andThen で連結してから利用している
実行結果
(ONE)
(TWO)
(THREE)
  • GathererandThen であらかじめ連結しておくことができる

参考

  1. jdk/src/java.base/share/classes/java/util/stream/GathererOp.java at jdk-25+0 · openjdk/jdk

8
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?