この記事は セゾン情報システムズ Advent Calendar 2021 の 23 日目の記事です。
※ 注意:本記事は Stream API の使い方を説明するものではありません。
Java の Stream API を使ってますか?
Java 開発者のみなさん、Stream API を使っていますか?もちろん、使っていますよね。(反論は認めません)
Stream API をうまく使うと表現力の豊かなソースコードを書けるから、 Java 開発者なら学ばない理由はないでしょう。
もし Stream API の使い方にまだ自信持っていないなら、Java 8 以降を対象とする入門書籍なら必ず取り上げているので、ぜひご自分の好みに合う一冊を見つけて熟読して身につけましょう。
そんな普段何気になく使っている Stream API ですが、今日はその裏にあるものを覗いてみたいと思います。
なお、この記事では以下のような用語の使い分けを意識しています。
-
Stream
-
java.util.stream.Stream
インタフェース
-
- ストリーム
- 一連の要素が構成しているデータの流れ
また、この記事で取り上げたソースコードは 2021/12/20 時点の OpenJDK 17 のものになります。
Stream API のおさらい
現在 Java 17 の Stream
インタフェースでは以下のメソッドが定義されていますが、ほとんどは Java 8 で定義されたもので、いくつかは 9 と 16 で追加されたものです。
ちなみにこのメソッド一覧の分類は独断と偏見に基づいたもので、箇条書きの一番右にある (9)
と (16)
はそれらのメソッドが追加されたバージョンを示しています。
- 中間処理
- 処理の流れを制御するメソッド
-
sequential
,parallel
,unordered
onClose
-
- 要素の射影を行うメソッド
-
map
,mapToInt
,mapToLong
,mapToDouble
-
flatMap
,flatMapToInt
,flatMapToLong
,flatMapToDouble
-
mapMulti
,mapMultiToInt
,mapMultiToLong
,mapMultiToDouble
(16)
-
- 要素の値をいじらないメソッド
filter
distinct
sorted
peek
-
limit
,skip
-
takeWhile
,dropWhile
(9)
- 処理の流れを制御するメソッド
- 終端処理
- コレクション型への変換メソッド
-
toArray
,collect
-
toList
(16)
-
- イテレータ型への変換メソッド
-
iterator
,spliterator
-
- 順次処理を適用するメソッド
-
forEach
,forEachOrdered
-
- 値を算出するメソッド
-
reduce
,min
,max
count
-
- 条件合否判定メソッド
-
anyMatch
,allMatch
,noneMatch
-
- 条件に合う要素を取るメソッド
-
findFirst
,findAny
-
- コレクション型への変換メソッド
- その他
isParallel
close
- ファクトリメソッド:
Stream
を生成するための静的メソッド-
ofNullable
(9) of
iterate
generate
concat
-
メソッド一つずつ見ていくときりがないので、あとで実際のコード例を元に、それを実現するための舞台裏を覗いていきたいと思います。
ストリームを支えるクラスとインタフェースたち
ストリームとパイプラインを表すもの
Java には 4 種類のストリーム(参照型とプリミティブ型 3 種類)があって、それぞれのストリームにはインタフェース定義とそれを実装する抽象クラスがあります。
- 参照型を要素とする
Stream
インタフェースとそれを実装する抽象クラスReferenceipePipeline
- プリミティブ型の
int
を要素とするIntStream
とそれを実装する抽象クラスIntPipeline
- プリミティブ型の
long
を要素とするLongStream
とそれを実装する抽象クラスLongPipeline
- プリミティブ型の
double
を要素とするDoubleStream
とそれを実装する抽象クラスDoublePipeline
これらのクラスとインタフェースをクラス図で表すとこうなります。
ここの PipelineHelper
はストリームのパイプライン処理向けのヘルパークラスで、ストリームに関する情報を一箇所に集中保持している仕組みとなっています。
各抽象クラスの内部構成は似ているので ReferencePipeline
を中心に見ていきます。
ストリームを実装する抽象クラス(例えば参照型の場合は ReferencePipeline
)にはそれを継承する 3 つのクラスがあって、それぞれストリームの状態を表しています。
-
Head
- ストリームの初期状態を表す具象クラス
-
StatefulOp
- ストリームのステートフルな中間状態を表す抽象クラス
- ステートフルな中間処理の戻り値として使用されている
- 例えば
distict
,sorted
- 例えば
-
StatelessOp
- ストリームのステートレスな中間状態を表す抽象クラス
- ステートレスな中間処理の戻り値として使用されている
- 例えば
filter
,unordered
- 例えば
AbstractPipeline
には初期状態用と中間状態用の二種類のコンストラクタがあります。
// 初期状態用
AbstractPipeline(Supplier<? extends Spliterator<?>> source, ...) {...}
AbstractPipeline(Spliterator<?> source, int sourceFlags, ...) {...}
// 中間状態用
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, ...) {...}
コンストラクタのシグネチャから中間状態のインスタンスは直前の AbstractPipeline
への参照を持っていることを推測できます。
ストリームに対する操作や処理を表すもの
ストリームを表す上記クラスに加えて、操作を表すものあります。ストリームに対する一連の操作は、これらの組み合わせによって実現されています。
Sink
-
GitHub 上の JDK 17 のソースコード:Sink
-
値を集計するためのインタフェース
-
実装クラスには initial と active の二つの状態がある
-
begin()
は initial から active への、end()
はその逆の状態遷移を行う -
accept()
による値の受け入れは active 状態のみ可能
-
-
参照型や、プリミティブ型用の拡張インタフェースと抽象クラスがある
-
特殊形として
TerminalSink
がある
interface TerminalSink<T, R> extends Sink<T>, Supplier<R> { }
Node
- GitHub 上の JDK 17 のソースコード:Node
- 順序を持つ一連の要素を表すためのインタフェース
- 再帰的なデータ型で、0 個かそれ以上の子
Node
を持つことも可能
TerminalOp
- GitHub 上の JDK 17 のソースコード:TerminalOp
- ストリームを元に一つの値を生成する処理を表すインタフェース
- 実装クラスはシーケンシャルとパラレルの二種類の値評価処理を提供する必要がある
データの反復や分割を支えるもの
データ要素のイテレーションや、データの流れを分割処理を行う際に、 java.util.Spliterator
を使用しています。
Spliterator
はストリームの並列処理とは切っても切れない深い関係なので、しっかり抑えておきましょう。
- GitHub 上の JDK 17 のソースコード:Spliterator
- ざっくり言うと分割もできるイテレータ
- 名前の "spliterator" は分割 "split" とイテレータ "iterator" をくっつけた造語
-
trySplit()
で分割を試みる- 分割できる場合は新しい
Spliterator
のインスタンスを返す、分割できない場合はnull
- 分割できる場合は新しい
-
tryAdvance()
でデータ1個分を進んで処理を試みる- 処理できる場合は
true
、データがなくて処理できない場合はfalse
- 処理できる場合は
パーツ揃ったので次へ…
ストリームを構成するパーツはこれで一通り触れたので、次は実際の Stream
のコード例を元にその裏にあるものを見ていきましょう。
Stream の舞台裏
Stream の Javadoc にあるコード例を見ていきましょう。
int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();
一行ずつ裏の動きを覗いていく
.stream()
最初の Collection.stream()
はデフォルトメソッドになっていて、その中で StreamSupport.stream()
を呼び出しています。
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
StreamSupport
に spliterator()
を渡していますね。そして StreamSupport
はそれを元に ReferencePipeline.Head
のインスタンスを生成しています。
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator,
StreamOpFlag.fromCharacteristics(spliterator),
parallel);
}
.filter(...)
次の行の Stream.filter()
ですが、この時点の実装クラスは ReferencePipeline.Head
なので、実際呼び出されるのは ReferencePipeline.filter
になります。
@Override
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, ...) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
...
};
}
};
}
ReferencePipeline.StatelessOp
が登場しましたね。
ここの opWrapSink()
は AbstractPipeline
で定義されたメソッドで、この処理自身(ここでは filter
が該当する)と引数として渡された Sink
を内包する新しい Sink
インスタンスを返すようになっています。
新しい Sink
は参照型を対象とするので Sink.ChainedReference
を使用しています。
.mapToInt(...)
次の行の Stream.mapToInt()
ですが、この時点の実装クラスは ReferencePipeline.StatelessOp
なので、実際呼び出されるのは ReferencePipeline.mapToInt
になります。
@Override
public final IntStream mapToInt(ToIntFunction<? super P_OUT> mapper) {
Objects.requireNonNull(mapper);
return new IntPipeline.StatelessOp<P_OUT>(this, ...) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<Integer> sink) {
return new Sink.ChainedReference<P_OUT, Integer>(sink) {
@Override
public void accept(P_OUT u) {
downstream.accept(mapper.applyAsInt(u));
}
};
}
};
}
このように、戻り値は IntStream
になるので、IntPipeline.StatelessOp
を元に生成したインスタンスを返しています。
そしてopWrapSink
内にも参照型を int
として処理するためのコード( mapper.applyAsInt(u)
あたり)が含まれていることが分かりますね。
.sum()
最後の行の IntStream.sum()
ですが、実装クラスは IntPipeline.StatelessOp
なので、実際呼び出されるのは IntPipeline.sum()
になります。
@Override
public final int sum() {
return reduce(0, Integer::sum);
}
IntPipeline.sum()
は IntPipeline.reduce()
にを利用しています。reduce()
の引数として、Integer
同士の加法である Integer::sum
と、加法の単位元 0
を指定しています。
@Override
public final int reduce(int identity, IntBinaryOperator op) {
return evaluate(ReduceOps.makeInt(identity, op));
}
ReduceOps.makeInt(identity, op)
の部分は演算とその演算の単位元を元に TerminalOp
を生成しています。メソッド内のクラス ReducingSink
は演算と演算の単位元を内包しています。
public static TerminalOp<Integer, Integer>
makeInt(int identity, IntBinaryOperator operator) {
Objects.requireNonNull(operator);
class ReducingSink
implements AccumulatingSink<Integer, Integer, ReducingSink>, Sink.OfInt {
private int state;
@Override
public void begin(long size) {
state = identity;
}
@Override
public void accept(int t) {
state = operator.applyAsInt(state, t);
}
@Override
public Integer get() {
return state;
}
@Override
public void combine(ReducingSink other) {
accept(other.state);
}
}
return new ReduceOp<Integer, Integer, ReducingSink>(StreamShape.INT_VALUE) {
@Override
public ReducingSink makeSink() {
return new ReducingSink();
}
};
}
AbstractPipeline.evaluate()
ではストリームがパラレルかどうかを isParallel()
で確認して、 その結果に応じて terminalOp
の値評価処理を切り替えます。
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
...
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
お気づきでしょうか。ここまですっとパイプラインを組み立てきたけど、最後の最後にやっと実際のデータを扱う処理が現れましたね。
元の例は parallel()
使用していないのでシーケンシャルで評価されるが、せっかくなので両方見ていきましょう。
値の評価
シーケンシャルの場合
AbstractPipeline.sourceSpliterator()
の中に isParallel()
のチェックが含まれているが、シーケンシャルの場合は条件式成立しないので、初期の Spliterator
をそのまま使用します。
private Spliterator<?> sourceSpliterator(int terminalFlags) {
Spliterator<?> spliterator = null;
... // 初期の spliterator を取得
if (isParallel() && sourceStage.sourceAnyStateful) {
...
}
...
return spliterator;
}
ReduceOp.evaluateSequential()
は ReduceOp.makeSink()
で前述の ReduceOps.makeInt()
内の定義した ReducingSink
クラスのインスタンスを生成して、Spliterator
と一緒に PipelineHelper.wrapAndCopyInto()
に渡して評価してもらいます。
private abstract static class ReduceOp ... {
...
@Override
public <P_IN> R evaluateSequential(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return helper.wrapAndCopyInto(makeSink(), spliterator).get();
}
...
}
AbstractPipeline.wrapSink()
で今まで見てきた opWrapSink()
の戻り値と RecucingSink
インスタンスを元に、合成関数の Sink
を作ります。
この合成関数に対して、Spliterator.forEachRemaining
で順次評価を行います。
@Override
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
@Override
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
...
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
...
}
...
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for (AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
パラレルの場合
もし元の例の .stream()
の直後に .parallel()
があったら、最後の値評価は並列で行うことになります。
AbstractPipeline.sourceSpliterator()
の中に isParallel()
のチェックで true
になるが、パイプラインにステートフルな処理が含まれていないので sourceStage.sourceAnyStateful
は false
になります。結果としてはシーケンシャルの場合と同じ、初期の Spliterator
をそのまま使用します。
private Spliterator<?> sourceSpliterator(int terminalFlags) {
Spliterator<?> spliterator = null;
... // 初期の spliterator を取得
if (isParallel() && sourceStage.sourceAnyStateful) {
...
}
...
return spliterator;
}
パラレルで評価する場合、PipelineHelper
と Spliterator
を元に ReduceTask
を生成して実行したあとに、実行結果を取得します。
private abstract static class ReduceOp ... {
...
@Override
public <P_IN> R evaluateParallel(PipelineHelper<T> helper,
Spliterator<P_IN> spliterator) {
return new ReduceTask<>(this, helper, spliterator).invoke().get();
}
private static final class ReduceTask<P_IN, P_OUT, R,
S extends AccumulatingSink<P_OUT, R, S>>
extends AbstractTask<P_IN, P_OUT, S, ReduceTask<P_IN, P_OUT, R, S>> {...}
}
ReduceTask
の親クラス AbstractTask
は CountedCompleter
を継承しているので、ReduceTask.invoke()
は ForkJoinPool
によって実行されることになります。
abstract class AbstractTask<P_IN, P_OUT, R, K extends AbstractTask<P_IN, P_OUT, R, K>>
extends CountedCompleter<R> {
...
}
おわりに
駆け足で Stream API の中身を見てきましたが、少しでもその雰囲気を感じ取れたのでしょうか。
int sum = widgets.stream()
.filter(w -> w.getColor() == RED)
.mapToInt(w -> w.getWeight())
.sum();
このように数行のコードで書かれたストリームの処理の裏では、たくさんのクラスが絡んでいて、それらの実装にはまたたくさんの知識が詰まっています。
関数型プログラミング、遅延評価、並列処理など、どれ一つとっても興味深いものですが、これらのものが混ざったときに現れる課題をどのように克服しているかもまた面白いです。(あくまでも個人の感想)
もし興味がありましたら、ぜひ JDK のソースコードを読み解いてみてください。