LoginSignup
4
2

More than 1 year has passed since last update.

Stream API の裏を覗いてみた

Last updated at Posted at 2021-12-22

この記事は セゾン情報システムズ Advent Calendar 2021 の 23 日目の記事です。

※ 注意:本記事は Stream API の使い方を説明するものではありません

Java の Stream API を使ってますか?

Java 開発者のみなさん、Stream API を使っていますか?もちろん、使っていますよね。(反論は認めません)

Stream API をうまく使うと表現力の豊かなソースコードを書けるから、 Java 開発者なら学ばない理由はないでしょう。

もし Stream API の使い方にまだ自信持っていないなら、Java 8 以降を対象とする入門書籍なら必ず取り上げているので、ぜひご自分の好みに合う一冊を見つけて熟読して身につけましょう。

そんな普段何気になく使っている Stream API ですが、今日はその裏にあるものを覗いてみたいと思います。

なお、この記事では以下のような用語の使い分けを意識しています。

  • Stream
    • java.util.stream.Stream インタフェース
  • ストリーム
    • 一連の要素が構成しているデータの流れ

また、この記事で取り上げたソースコードは 2021/12/20 時点の OpenJDK 17 のものになります。

Stream API のおさらい

現在 Java 17 の Stream インタフェースでは以下のメソッドが定義されていますが、ほとんどは Java 8 で定義されたもので、いくつかは 9 と 16 で追加されたものです。

ちなみにこのメソッド一覧の分類は独断と偏見に基づいたもので、箇条書きの一番右にある (9)(16) はそれらのメソッドが追加されたバージョンを示しています。

  • 中間処理
    • 処理の流れを制御するメソッド
      • sequential, parallel, unordered
      • onClose
    • 要素の射影を行うメソッド
      • map, mapToInt, mapToLong, mapToDouble
      • flatMap, flatMapToInt, flatMapToLong, flatMapToDouble
      • mapMulti, mapMultiToInt, mapMultiToLong, mapMultiToDouble (16)
    • 要素の値をいじらないメソッド
      • filter
      • distinct
      • sorted
      • peek
      • limit, skip
      • takeWhile, dropWhile (9)
  • 終端処理
    • コレクション型への変換メソッド
      • toArray, collect
      • toList (16)
    • イテレータ型への変換メソッド
      • iterator, spliterator
    • 順次処理を適用するメソッド
      • forEach, forEachOrdered
    • 値を算出するメソッド
      • reduce, min, max
      • count
    • 条件合否判定メソッド
      • anyMatch, allMatch, noneMatch
    • 条件に合う要素を取るメソッド
      • findFirst, findAny
  • その他
    • isParallel
    • close
  • ファクトリメソッド:Stream を生成するための静的メソッド
    • ofNullable (9)
    • of
    • iterate
    • generate
    • concat

メソッド一つずつ見ていくときりがないので、あとで実際のコード例を元に、それを実現するための舞台裏を覗いていきたいと思います。

ストリームを支えるクラスとインタフェースたち

ストリームとパイプラインを表すもの

Java には 4 種類のストリーム(参照型とプリミティブ型 3 種類)があって、それぞれのストリームにはインタフェース定義とそれを実装する抽象クラスがあります。

これらのクラスとインタフェースをクラス図で表すとこうなります。

ここの PipelineHelperはストリームのパイプライン処理向けのヘルパークラスで、ストリームに関する情報を一箇所に集中保持している仕組みとなっています。

各抽象クラスの内部構成は似ているので ReferencePipeline を中心に見ていきます。

ストリームを実装する抽象クラス(例えば参照型の場合は ReferencePipeline)にはそれを継承する 3 つのクラスがあって、それぞれストリームの状態を表しています。

  • Head
    • ストリームの初期状態を表す具象クラス
  • StatefulOp
    • ストリームのステートフルな中間状態を表す抽象クラス
    • ステートフルな中間処理の戻り値として使用されている
      • 例えば distict, sorted
  • StatelessOp
    • ストリームのステートレスな中間状態を表す抽象クラス
    • ステートレスな中間処理の戻り値として使用されている
      • 例えば filter, unordered

AbstractPipeline には初期状態用と中間状態用の二種類のコンストラクタがあります。

// 初期状態用
AbstractPipeline(Supplier<? extends Spliterator<?>> source, ...) {...}
AbstractPipeline(Spliterator<?> source, int sourceFlags, ...) {...}

// 中間状態用
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, ...) {...}

コンストラクタのシグネチャから中間状態のインスタンスは直前の AbstractPipeline への参照を持っていることを推測できます。

ストリームに対する操作や処理を表すもの

ストリームを表す上記クラスに加えて、操作を表すものあります。ストリームに対する一連の操作は、これらの組み合わせによって実現されています。

Sink

  • GitHub 上の JDK 17 のソースコード:Sink

  • 値を集計するためのインタフェース

  • 実装クラスには initialactive の二つの状態がある

    • begin()initial から active への、end() はその逆の状態遷移を行う
    • accept() による値の受け入れは active 状態のみ可能
  • 参照型や、プリミティブ型用の拡張インタフェースと抽象クラスがある

  • 特殊形として TerminalSink がある

interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }

Node

  • GitHub 上の JDK 17 のソースコード:Node
  • 順序を持つ一連の要素を表すためのインタフェース
  • 再帰的なデータ型で、0 個かそれ以上の子 Node を持つことも可能

TerminalOp

  • GitHub 上の JDK 17 のソースコード:TerminalOp
  • ストリームを元に一つの値を生成する処理を表すインタフェース
  • 実装クラスはシーケンシャルとパラレルの二種類の値評価処理を提供する必要がある

データの反復や分割を支えるもの

データ要素のイテレーションや、データの流れを分割処理を行う際に、 java.util.Spliterator を使用しています。
Spliterator はストリームの並列処理とは切っても切れない深い関係なので、しっかり抑えておきましょう。

  • GitHub 上の JDK 17 のソースコード:Spliterator
  • ざっくり言うと分割もできるイテレータ
    • 名前の "spliterator" は分割 "split" とイテレータ "iterator" をくっつけた造語
  • trySplit() で分割を試みる
    • 分割できる場合は新しい Spliterator のインスタンスを返す、分割できない場合は null
  • tryAdvance() でデータ1個分を進んで処理を試みる
    • 処理できる場合は true、データがなくて処理できない場合は false

パーツ揃ったので次へ…

ストリームを構成するパーツはこれで一通り触れたので、次は実際の Stream のコード例を元にその裏にあるものを見ていきましょう。

Stream の舞台裏

Stream の Javadoc にあるコード例を見ていきましょう。

int sum = widgets.stream()
                 .filter(w -> w.getColor() == RED)
                 .mapToInt(w -> w.getWeight())
                 .sum();

一行ずつ裏の動きを覗いていく

.stream()

最初の Collection.stream() はデフォルトメソッドになっていて、その中で StreamSupport.stream() を呼び出しています。

Collection.java
default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}

StreamSupportspliterator() を渡していますね。そして StreamSupport はそれを元に ReferencePipeline.Head のインスタンスを生成しています。

StreamSupport.java
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
    Objects.requireNonNull(spliterator);
    return new ReferencePipeline.Head<>(spliterator,
                                        StreamOpFlag.fromCharacteristics(spliterator),
                                        parallel);
}

.filter(...)

次の行の Stream.filter() ですが、この時点の実装クラスは ReferencePipeline.Head なので、実際呼び出されるのは ReferencePipeline.filter になります。

ReferencePipeline.java
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
    Objects.requireNonNull(predicate);
    return new StatelessOp<P_OUT, P_OUT>(this, ...) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
            return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                ...
            };
        }
    };
}

ReferencePipeline.StatelessOp が登場しましたね。

ここの opWrapSink()AbstractPipeline で定義されたメソッドで、この処理自身(ここでは filter が該当する)と引数として渡された Sink を内包する新しい Sinkインスタンスを返すようになっています。

新しい Sink は参照型を対象とするので Sink.ChainedReference を使用しています。

.mapToInt(...)

次の行の Stream.mapToInt() ですが、この時点の実装クラスは ReferencePipeline.StatelessOp なので、実際呼び出されるのは ReferencePipeline.mapToInt になります。

ReferencePipeline.java
@Override
public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
    Objects.requireNonNull(mapper);
    return new IntPipeline.StatelessOp<P_OUT>(this, ...) {
        @Override
        Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
            return new Sink.ChainedReference<P_OUT, Integer>(sink) {
                @Override
                public void accept(P_OUT u) {
                    downstream.accept(mapper.applyAsInt(u));
                }
            };
        }
    };
}

このように、戻り値は IntStream になるので、IntPipeline.StatelessOp を元に生成したインスタンスを返しています。

そしてopWrapSink 内にも参照型を int として処理するためのコード( mapper.applyAsInt(u) あたり)が含まれていることが分かりますね。

.sum()

最後の行の IntStream.sum()ですが、実装クラスは IntPipeline.StatelessOp なので、実際呼び出されるのは IntPipeline.sum() になります。

IntPipeline.java
@Override
public final int sum() {
    return reduce(0, Integer::sum);
}

IntPipeline.sum()IntPipeline.reduce() にを利用しています。reduce() の引数として、Integer 同士の加法である Integer::sum と、加法の単位元 0 を指定しています。

IntPipeline.java
@Override
public final int reduce(int identity, IntBinaryOperator op) {
    return evaluate(ReduceOps.makeInt(identity, op));
}

ReduceOps.makeInt(identity, op) の部分は演算とその演算の単位元を元に TerminalOp を生成しています。メソッド内のクラス ReducingSinkは演算と演算の単位元を内包しています。

ReduceOps.java
public static TerminalOp<Integer, Integer>
makeInt(int identity, IntBinaryOperator operator) {
    Objects.requireNonNull(operator);
    class ReducingSink
            implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
        private int state;

        @Override
        public void begin(long size) {
            state = identity;
        }

        @Override
        public void accept(int t) {
            state = operator.applyAsInt(state, t);
        }

        @Override
        public Integer get() {
            return state;
        }

        @Override
        public void combine(ReducingSink other) {
            accept(other.state);
        }
    }
    return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
        @Override
        public ReducingSink makeSink() {
            return new ReducingSink();
        }
    };
}

AbstractPipeline.evaluate() ではストリームがパラレルかどうかを isParallel()で確認して、 その結果に応じて terminalOpの値評価処理を切り替えます。

AbstractPipeline.java
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
    ...
    return isParallel()
           ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
           : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

お気づきでしょうか。ここまですっとパイプラインを組み立てきたけど、最後の最後にやっと実際のデータを扱う処理が現れましたね。

元の例は parallel() 使用していないのでシーケンシャルで評価されるが、せっかくなので両方見ていきましょう。

値の評価

シーケンシャルの場合

AbstractPipeline.sourceSpliterator() の中に isParallel()のチェックが含まれているが、シーケンシャルの場合は条件式成立しないので、初期の Spliterator をそのまま使用します。

AbstractPipeline.java
private Spliterator<?> sourceSpliterator(int terminalFlags) {
    Spliterator<?> spliterator = null;
    ... // 初期の spliterator を取得
    if (isParallel() && sourceStage.sourceAnyStateful) {
        ...
    }
    ...
    return spliterator;
}

ReduceOp.evaluateSequential()ReduceOp.makeSink() で前述の ReduceOps.makeInt() 内の定義した ReducingSink クラスのインスタンスを生成して、Spliterator と一緒に PipelineHelper.wrapAndCopyInto() に渡して評価してもらいます。

ReduceOps.java
private abstract static class ReduceOp ... {
    ...
    @Override
    public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
                                       Spliterator<P_IN> spliterator) {
        return helper.wrapAndCopyInto(makeSink(), spliterator).get();
    }
    ...
}

AbstractPipeline.wrapSink() で今まで見てきた opWrapSink() の戻り値と RecucingSinkインスタンスを元に、合成関数の Sink を作ります。

この合成関数に対して、Spliterator.forEachRemainingで順次評価を行います。

AbstractPipeline.java
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}

@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    ...
    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    ...
}
...
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}

パラレルの場合

もし元の例の .stream() の直後に .parallel()があったら、最後の値評価は並列で行うことになります。

AbstractPipeline.sourceSpliterator() の中に isParallel()のチェックで true になるが、パイプラインにステートフルな処理が含まれていないので sourceStage.sourceAnyStatefulfalse になります。結果としてはシーケンシャルの場合と同じ、初期の Spliterator をそのまま使用します。

AbstractPipeline.java
private Spliterator<?> sourceSpliterator(int terminalFlags) {
    Spliterator<?> spliterator = null;
    ... // 初期の spliterator を取得
    if (isParallel() && sourceStage.sourceAnyStateful) {
        ...
    }
    ...
    return spliterator;
}

パラレルで評価する場合、PipelineHelperSpliterator を元に ReduceTask を生成して実行したあとに、実行結果を取得します。

ReduceOps.java
private abstract static class ReduceOp ... {
    ...
    @Override
    public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
                                     Spliterator<P_IN> spliterator) {
        return new ReduceTask<>(this, helper, spliterator).invoke().get();
    }
    
    private static final class ReduceTask<P_IN, P_OUT, R,
                                          S extends AccumulatingSink<P_OUT, R, S>>
        extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {...}
}

ReduceTask の親クラス AbstractTaskCountedCompleter を継承しているので、ReduceTask.invoke()ForkJoinPool によって実行されることになります。

abstract class AbstractTask<P_IN, P_OUT, R, K extends AbstractTask<P_IN, P_OUT, R, K>>
        extends CountedCompleter<R> {
    ...
}

おわりに

駆け足で Stream API の中身を見てきましたが、少しでもその雰囲気を感じ取れたのでしょうか。

int sum = widgets.stream()
                 .filter(w -> w.getColor() == RED)
                 .mapToInt(w -> w.getWeight())
                 .sum();

このように数行のコードで書かれたストリームの処理の裏では、たくさんのクラスが絡んでいて、それらの実装にはまたたくさんの知識が詰まっています。

関数型プログラミング、遅延評価、並列処理など、どれ一つとっても興味深いものですが、これらのものが混ざったときに現れる課題をどのように克服しているかもまた面白いです。(あくまでも個人の感想)

もし興味がありましたら、ぜひ JDK のソースコードを読み解いてみてください。

4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2