0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 1 year has passed since last update.

Stream API を自前に実装するときにおさえておきたいこと

Last updated at Posted at 2023-02-08

Stream API をちゃんと使えてますか?

Java 開発者のみなさん、Stream API を使っていますか?Stream API の使い方、ちゃんとおさえていますか?

Stream API をうまく使うと表現力の豊かなソースコードを書けるから、Java 開発者なら、使い方をおさえておいて損はないと思います。

この記事は自作のコレクションで、Stream API 対応のために機能を実装する際に、おさえておきたいポイントを、サンプルコードを交えて説明したいと思います。

なお、この記事は 2023/01 現在最新の LTS である Java 17 をベースに説明し、実際使用する JDK ディストリビューションは Temurin 17.0.6 となります。

また、この記事では、Java クラスのみ指す場合に Class のような表記を使っていますが、一般的な用語として捉えても問題ない箇所は平文のまま(日本語または英語)で記載します。

Stream API おさらいのお供

以前この Advent Calendar の記事で Stream API の裏を覗いて、その構成と仕組みについてざっくり解説しました。

もし Stream API の習熟度に自信がなければ、参考として事前に読んでいただけると、この先に出てくる説明の理解に役立つと思います。

Stream API を実装したいと思ったら

Stream API を実装するのポイントを紹介する記事ですが、最初におさえてほしいポイントは「できるだけ Stream API の実装が不要なやり方を探る」ことです。

普通に処理を Stream API で表現するだけなら、JDK で提供している機能だけで事足りるはずだと思います。

では、どのようなシーンで Stream API 対応のための機能実装が必要になるのでしょうか?

「Stream API の自前実装を」と思うシーン

あくまでも想像ですが、以下のこのようなシーンに遭遇したら「Stream API の自前実装を…」という考えが出てくるのでは、と思います。

  1. 扱うデータの構造が特殊で、既存のライブラリでは上手に扱えない事情ある(例えば要件を満たすだけの性能が出ない、データそのものの特性を適切に表せないなど)⇒ Stream をサポートする Collection (またはそれに準ずるもの)を作りたい

  2. 扱うデータの構造は既存のライブラリで問題なく扱えるが、Stream API の提供していない演算が複数必要で、かつそれらを複数の具象クラスで利用する予定がある ⇒ Stream インタフェースの拡張を作りたい

正直、通常のアプリケーション・サービス開発ではどちらのシーンもあまりないものです。

仮にこういったシーンに遭遇したとしても、複雑さや可読性の観点を考慮すると、Stream API の自前実装ではなく、実際必要な具体的な処理をそのまま書いたほうが適切かもしれません。

極論を言うとソースコードさえ書ければ最終的には動くものが出来上がるから、その自前実装が本質的にやりたいことからフォーカスを外していないかを意識する必要があります。

目的に対して手段は適切か?

では、前述の2つケースについて、それぞれ目的に対して手段は適切かを確認していましょう。

  1. 扱うデータの構造が特殊で、既存のライブラリでは上手に扱えない事情ある(例えば要件を満たすだけの性能が出ない、データそのものの特性を適切に表せないなど)⇒ Stream をサポートする Collection (またはそれに準ずるもの)を作りたい

ケース1では、扱う特殊な性質を持つデータを自前のコレクションに通して、Stream API の枠組みに当てはめようとしています。

データの扱い方を自前のコレクション内に集約して、コレクションとしての振る舞いを保証することで、外側で Stream API の表現力と機能を活用できる形になるはずです。

記事の後半では掘り下げて、こういった Collection を実装する際のポイントを解説していきたいと思います。

  1. 扱うデータの構造は既存のライブラリで問題なく扱えるが、Stream API の提供していない演算が複数必要で、かつそれらを複数の具象クラスで利用する予定がある ⇒ Stream インタフェースの拡張を作りたい

ケース 2 では、新しい演算のために Stream API の枠組みそのものを拡張しようとしています。

仮にできたとしたら、最終的には演算の実装そのものだけでなく、拡張した枠組みのメンテナンスも必要となるので、結果的にやりたいことからフォーカスが外れてしまうことになります。

実際、Stream API は汎用的なメソッドを利用して、必要な演算を関数インタフェースの実装クラスとして提供すること1で、やりたいことをだいたい実現できるので、Stream インタフェースを拡張する必要性はないでしょう。

「作らない」という選択肢も

これまでの話を雑にまとめると、Stream API をなんらかの形で実装して提供する必要性が出てくるのは、基本的にはライブラリやフレームワークを作っている場合(作ったあとのメンテナンスも考慮に入れた上での決定)だけです。

それ以外の場合で Stream API を実装し始めてしまったら、何かが間違っているかまたは何かを勘違いしているかの可能性が大いので、一回振り返るようにしましょう。

Stream API に限らず、標準ライブラリの機能を自前で実装しようという気持ちが出てきたとき、目的を振り返って「作らない」選択肢があるかを冷静に考えるといいでしょう。

Stream をサポートする Collection 実装の作り方

続いて Stream をサポートする Collection 実装の作り方を見ていきましょう。

Collection のデフォルトメソッドたち

Collection インターフェースのコードを読むと、入れ物として機能を提供するメソッドやオブジェクトとしての基本的なメソッド以外に、いくつかのデフォルトメソッドが存在するのに気づくでしょう。

Collection.java
default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}

default Stream<E> parallelStream() {
    return StreamSupport.stream(spliterator(), true);
}

Stream<E> を返すこの二つメソッド stream()parallelStream() こそが、 Collection API と Stream API の架け橋となっています。

どちらのメソッドもデフォルト実装として spliterator() を利用しているので、spliterator() さえ実装されていれば最低限の機能は提供できる作りになってます。

そして件の spliterator() の中を見ると、そこにもデフォルト実装があるから、結果的に何もしなくても各メソッドのデフォルト実装が最低限の機能を提供してくれる作りになっています。

Collection.java
@Override
default Spliterator<E> spliterator() {
    return Spliterators.spliterator(this, 0);
}

しかし、何もしなくても動くのであれば、なぜわざわざ手間をかけて実装する必要あるのでしょうか?

それはデフォルト実装の振る舞いが望ましくない場合があるからです。

上記の Spliterators.spliterator(this, 0) ですが、それの返しているスプリッテレータはイテレータを利用した汎用的な実装で、そのクラスの trySplit() メソッドは以下のような作りになっています。

Spliterators.java
@Override
public Spliterator<T> trySplit() {
    Iterator<? extends T> i; // 内部のイテレータに指すローカル変数
    long s; // サイズ
    ... // 省略:イテレータとサイズの取得
    if (s > 1 && i.hasNext()) {
        int n = batch + BATCH_UNIT; // 仮のバッチサイズ
        ... // 省略: バッチサイズ調整
        Object[] a = new Object[n]; // 取得した要素の入れ物
        int j = 0; // 取得した要素数
        ... // 省略: バッチサイズ分の要素を取得、サイズ概算値の調整
        return new ArraySpliterator<>(a, 0, j, characteristics); // 配列ベースの新しい Spliterator
    }
    return null;
}

上記のソースコードで示したように、 trySplit() する際に常にバッチサイズ分の要素を取得しています。

普通にメモリ上でデータを保持するコレクションであればこれでも特に問題ありませんが、もし要素の取得に何らかコストが掛かる場合、終端処理で消費されない分の要素まで取ってしまうとそれだけのコストがムダになります。

実際 Collection.spliterator()ドキュメントにも以下の記載があります。

デフォルトの実装は、より効率的なスプリッテレータを返すことができるサブクラスによってオーバーライドされるべきです。

The default implementation should be overridden by subclasses that can return a more efficient spliterator.

効率的なスプリッテレータを提供することによって、 stream()parallelStream() のデフォルト実装でもムダなく機能するようになります。

一応 stream()parallelStream()ドキュメントにある以下記載のように、スプリッテレータを提供しないで代わりに stream()parallelStream() をオーバーライトするやり方もありますが、スプリッテレータなしで Stream<E> を実装するのはかなり苦労するので、余程のことがない限り spliterator() の実装をおすすめします。

このメソッドは、IMMUTABLECONCURRENT、または遅延バインディングであるスプリッテレータを spliterator() メソッドが返せない場合にオーバーライドされるべきです。

This method should be overridden when the spliterator() method cannot return a spliterator that is IMMUTABLE, CONCURRENT, or late-binding.

これまでの話で spliterator() を実装するだけでだいたい事足ることが分かったので、引き続きそのやり方はもう少し詳しく説明してきたいと思います。

スプリッテレータの特性

スプリッテレータのドキュメントの冒頭で、スプリッテレータがどんなものかを以下のようにまとめてあります。

ソースの要素をトラバースおよびパーティション化するためのオブジェクトです。 スプリッテレータが適用される要素のソースは、配列、Collection、IOチャネル、ジェネレータ関数などです。

ソースの種類が異なると、スプリッテレータの性質がそれに従って変わります。

いくつか直感的な例をあげてみます:

  • ローカルマシンのメモリ上で全ての要素を保持するコレクションなら、要素数は既知(測定可能)
    • コレクションが List なら、要素間の順序性があって、要素の重複は許容する
    • コレクションが Set なら、要素間の順序性がなく、要素の重複は許容しない
  • 動的に要素を生成するジェネレーター関数なら、終端処理の結果が出るまで要素数は未知(測定不可)

このように、要素数が測定可能かどうか、順序性あるかどうか、重複許容するかどうか、などの特性がソースによって変わることが分かります。

これらの特性は Stream API の処理に影響するので、扱うデータの性質を正しく表現する必要があります。

スプリッテレータの特性を示す定数は、以下のように定義されています。

名前 値(十進数) 値(十六進数)
DISTINCT 1 0x00000001
SORTED 4 0x00000004
ORDERED 16 0x00000010
SIZED 64 0x00000040
NONNULL 256 0x00000100
IMMUTABLE 1024 0x00000400
CONCURRENT 4096 0x00001000
SUBSIZED 16384 0x00004000

それぞれの特性の意味合い(太字表記の部分)と、その特性に対応する大まかな振る舞いについて説明します。

  • DISTINCT
    • 要素が重複していないこと
    • 任意の二つの要素 x と y に対して常に x.equals(y)false になります。
  • SORTED
    • 要素が定義済み基準でソートされていること
    • スプリッテレータは以下のとちらかの条件を満しています。
      1. Spliterator.getCompartor() が有効な比較演算子を返す
      2. 要素のデータ型が Comparable でかつ要素が自然順になっていれば、 Spliterator.getCompartor()null を返す
  • ORDERED
    • 要素が所定の順序で出現すること
    • スプリッテレータは以下の動作をすべて保証します。
      • trySplit() は先頭にある要素から分割する
      • tryAdvance(Consumer<? super T>) は最先頭の要素を対象に実行する
      • forEachRemaining(Consumer<? super T>) は要素の出現順通りに処理する
  • SIZED
    • 要素数が分かること
    • スプリッテレータは estimateSize() の動作を以下のように保証します。
      • トラバース、または分割する前に、有限サイズの要素数を正確に取得できる
  • NONNULL
    • 要素は必ず null でないこと
    • ソースが出現する要素が null でないことを保証します。
  • IMMUTABLE
    • ソースは構造的に変更できないこと
    • トラバース中に、要素の追加・削除・置換といった変更は発生しません。
  • CONCURRENT
    • 外部同期なしでソースを並列的に安全に変更できること
    • 最上位2のスプリッテレータはそれを使った計算について何も保証できません。
    • ソース側の変更が分割で下位スプリッテレータに反映されていない場合、下位3スプリッテレータは IMMUTABLESIZED などの別の特性 を持つことがあります。
  • SUBSIZED
    • 下位のスプリッテレータがすべて SIZED かつ SUBSIZED であること
    • trySplit() での分割で下位スプリッテレータはすべて SIZED です。

これらの特性のうち、CONCURRENT の意味合いは IMMUTABLESIZED と相互排他的になっているので、併用することはできません4

実際コードを実装する際に、ソースの特徴をどうやってスプリッテレータの特性に落とし込むかを悩むと思いますが、ありのままの特徴をすべてそのまま落とし込むよりも、利用制限(例えばスプリッテレータ生成後、ソースの変更をスプリッテレータに反映しないといった制限)を設けて不変性(IMMUTABLE 特性)を持たせたほうがシンプルになります。

Spliterator のドキュメントにも詳細な説明があるので不明点があればそちらに参照していただければと思います。

スプリッテレータのメソッド

スプリッテレータのインタフェースには以下のようなメソッドが定義されています。

  • トラバース・分割
    • boolean tryAdvance(Consumer<? super T>)
    • void forEachRemaining(Consumer<? super T>)
    • Spliterator<T> trySplit()
  • 要素数
    • long estimateSize()
    • long getExactSizeIfKnown()
  • 特性値
    • int characteristics()
    • boolean hasCharacteristics(int)
  • 比較演算子
    • Comparator<? super T> getComparator()

トラバース・分割関連のメソッドはスプリッテレータの内部状態を変更するのに対し、他のメソッド(要素数、特性値、比較演算子関連)はスプリッテレータを内部状態に影響しません。

前述の特性でも少し触れましたが、一部メソッドの振る舞いはスプリッテレータ特性によって決まってしまうので、実装する際にその決め事を守る必要があります。

それを踏まえて各メソッドの概要と、実装するポイントを説明します。

トラバース・分割

boolean tryAdvance(Consumer<? super T>)

  • 次の要素が存在する場合、その要素に対して操作を行い、true を返すが、それ以外の場合は false を返す
  • ORDERED 特性を持つ場合、操作は要素の出現順に行う

一要素分だけ操作するので、次の要素の有無に応じ処理する直感的な実装で問題ないと思います。

void forEachRemaining(Consumer<? super T>)

  • 現在のスレッド内で残りの要素を全て消費し切るまで、各要素に対して指定された操作を行う
  • ORDERED 特性を持つ場合、操作は要素の出現順に行う

デフォルト実装は tryAdvance(Consumer<? super T>)false を返すまでに繰り返して呼び出す形になっていて、一要素ずつ存在確認が挟むことになるので、可能であればより適切な実装を提供しましょう。

Spliterator<T> trySplit()

  • スプリッテレータが分割可能な場合、分割を行い、一部の要素をカバーする新しいスプリッテレータを返す
    • 新しいスプリッテレータにカバーされる要素は分割元のスプリッテレータのカバー対象外となる
  • ORDERED 特性を持つ場合、新しいスプリッテレータは先頭にある要素を出現順を維持したままでカバーする
  • 要素が無限でなければ、呼び出しの結果は最終的に null をなる
  • null でないものを返した際、estimateSize() の振る舞いが以下のようになる
    • 分割前に estimateSize() の報告した値(size(t0) とする)と、分割後にestimateSize() の報告した値(size(t1) とする)は、以上の値でなければならない(即ち size(t0) >= size(t1)
    • SUBSIZED を持つ場合、分割前に上位スプリッテレータ(aとする)の estimateSize() の報告した値(a.size(t0))は、分割後の両方のスプリッテレータ(新しいものを bとする)の estimateSize() の報告した値(a.size(t1)b.size(t1))の和と等しいでなければならない(即ち a.size(t0) = a.size(t1) + b.size(t_1)

実装する際におさえるべきポイントは、分割可能かどうか、要素の出現順維持が必要かどうか、分割前後のサイズ一貫性、の3つになります。

分割可能かどうかの判断は、ソースの分割しやすさはもちろん、残要素が少なくなるほど分割によるオーバーヘッドの割合が大きくなるので、残要素が一定数以下になったら分割しないようにするといいかもしれません。

出現順維持が必要な場合、要素を新しいスプリッテレータに渡す際の処理順と、新しいスプリッテレータでの要素の持ち方をさえ意識すれば、大きなミスを回避しやすくなります。

そしてサイズ一貫性について、要素を新しいスプリッテレータに渡す際に、渡した要素の数分だけサイズの概算から引く形で実装できれば、分かりやすいコードになるのではと思います。

要素数

long estimateSize()

  • forEachRemaining(Consumer<? super T>) を利用する際の要素数を概算する
  • 以下のいずれかの場合は Long.MAX_VALUE を返す
    • 要素数が無限
    • 要素数が未知
    • 算出コストが高すぎる
  • 以下の状態では算出した値は正確な要素数となるが、それ以外の場合は正確な値とは限らない
    • SIZEDを持つ場合、トラバース・分割していない状態
    • SUBSIZED を持つ場合、トラバース状態
  • 報告した値の正確さ問わず、trySplit() 前後での呼び出しで報告された値は変わらないかまたは減少する

実装する際に抑えるべきポイントは、要素数の算出ができるかどうかと、正確な値が必要かどうか、あたりだと思います。

要素数の算出はソースに大きく依存するから、正確な計算が無理な場合はどうしても無理なので潔く SIZEDSUBSIZED 特性を諦めるも一つの手です。

諦めたとしても、概算で出した値は分割処理で判断材料として利用できるので、参考になる値を出せるように、何らかの基準を決めたうえそれを元に計算するといいかもしれません。

long getExactSizeIfKnown()

  • SIZED を持つ場合は estimateSize() の値を返す、そうでない場合は -1 を返す
  • デフォルト実装あり
    default long getExactSizeIfKnown() {
        return (characteristics() & SIZED) == 0 ? -1L : estimateSize();
    }
    

便利なメソッドとしてデフォルト実装が十分なので、自前で実装しなくても問題ありません。

特性値

int characteristics()

  • スプリッテレータの持っている全ての特性のビット論理和を返す
  • trySplit() による分割の前後で異なる特性を持つ可能性があるが、分割されるまでの間は変わらない
    • trySplit() 呼び出す前に、繰り返して呼び出すと常に同じ結果となる
    • 二回の trySplit() 呼び出しの間に、繰り返して呼び出すと常に同じ結果となる

分割によって特性が変わらない場合、特性値を final のフィールドとしてコンストラクタで初期化しておいて後でそれを返す形、もしくは呼び出す時にビット論理和を組み立てる形のような、直感的な実装で十分です。

分割によって特性が変わる場合、分割されたかどうかの状態チェックを入れるよりも、分割する際の状態変化に合わせて内部の特性値も更新する実装が分かりやすいのではと思います。

boolean hasCharacteristics(int)

  • 引数として指定された特性値を持つかどうかを判断する
  • デフォルト実装あり
    default boolean hasCharacteristics(int characteristics) {
        return (characteristics() & characteristics) == characteristics;
    }
    

分かりやすいデフォルト実装が用意されているので、自前で実装しなくても問題ありません。

比較演算子

Comparator<? super T> getComparator()

  • SORTED を持つ場合は以下の動きとなる
    • 要素が比較演算子によってソートされたのあれば、その比較演算子を返す
    • 要素が自然順でソートされたのあれば、null を返す
  • それ以外の場合は IllegalStateException を投げる
  • デフォルト実装あり
    default Comparator<? super T> getComparator() {
        throw new IllegalStateException();
    }
    

デフォルト実装があるので SORTED でない場合はそれで十分です。

SORTED の場合だけ要素の性質に合わせた比較演算子を返す処理が必要になります。

応用例

記事を書き始めたときにソースコードの実装例を出す予定でしたが、書いていてみたら文章の量が想定したより多くて、これ以上だと消化不良になると思うので、「スプリッテレータの実装を使えばこんなこともできるよ」という風にざっくりした応用例をいくつか紹介します。

複数のログファイルをまとめて扱う

何かのアプリケーションの吐き出した複数のログファイルを処理したい場合、すべてのログファイルをまとめて一つのかたまりとして扱うコレクションがあると便利です。

スプリッテレータはこんなイメージになります:

  • 要素は一行分のログ
  • 要素はタイムスタンプによってソート済み
  • スプリッテレータは内部に対象ログファイル一覧のリストを持つ
  • トラバースする際に
    • 開いているログファイルがなければ、リストから次のファイルを開く
    • 開いているログファイルがあれば、そこから一行分のログを取得する
    • 一つログファイルを読み切ったら終わったら閉じる
  • 分割する際に
    • (分割するなら)開いていないログファイルのいくつかを新しいスプリッテレータに任せる
    • (さらに細かく分割するなら)ファイル一覧が空の場合は、先頭の一定数の行を取得して新しいスプリッテレータに任せる

巨大なデータセットのソート

若干トリッキーな例になりますが、メモリに収まらない量のデータを、一つのソート済みのコレクションにして扱いたい場合は、既存のコレクションを流用しつつ新しい処理を書き足すことで実現できます。

データの全量はそのままではメモリに収まらないので、コレクションに入れるときは逐次に読み込みます。

その際のコレクションの動きは以下のイメージになります:

  • 内部にソートできる何らかの既存コレクションを持つ
    • データを一件ずつ挿入したあとに、順番に取り出すとソート済みとなるようなもの(例えば PriorityQueue とか)
    • 一定量のデータが溜まったら、溜まった分をファイルに吐き出して保存する(=ファイルの中身はソート済みの一部のデータ)
    • 未ソートデータの読み込みと、ソート済み分の保存ができるように、別のコレクションにするといいかも
  • 読み終わった時点は、内部コレクションに加え、いくつかのソート済みファイルがある

読み終わった後の、ソート済みコレクションのデータを Stream API で扱うために、以下のようなスプリッテレータを定義します:

  • 内部に前述のソート用コレクションと、ソート済みファイル一覧と、ソート用の比較演算子を持つ
  • トラバースする際に
    • ソート用コレクションとソート済みファイルから、未消費の最先頭要素を候補として取得する
    • 取得した要素候補のうち、ソート順の最も先にある要素を取得して消費する
    • 落選した候補は次回のトラバースで自動的に候補となる
  • 分割する際に
    • (分割するなら)トラバースと同じ取得処理を繰り返して、一定数の要素を取得してから新しいスプリッテレータに任せる

基本的なアイディアは external sort と k-way merge の併用になるので、より詳しいことが知りたい方はそれらのアルゴリズム調べてみるといいと思います。

まとめ

駆け足ですが Stream API を自前に実装するときにおさえておきたいことを簡単に紹介しました。

まとめるとこんな内容になっています。

  • Stream API を実装したいと思ったら
    • まずは Stream API を実装しないで済む方法を探す
    • 達成したい目的に対して手段が適切かを再確認する
  • 実装することを決めたら
    • 扱うデータとそのデータのソースの特性を把握する
    • スプリッテレータの特性として正しく表現する
    • 特性によって決められたメソッドの振る舞いを意識して実装する

説明を読むだけでは理解しきれないところもあるので、機会があればぜひ手を動かして作ってみてください。

  1. 具体的にいうと、演算を中間処理として提供したい場合はStreammap に合わせて演算を Function で実装します。終端処理としての提供したい場合は Streamcollect に合わせて演算を Consumer BiConsumer Collector のいずれかの組み合わせで実装します。

  2. 最上位スプリッテレータ(top-level spliterators)とはソースそのものもしくはイテレータから生成したものです。

  3. 下位スプリッテレータ(sub-splitertors)とは、最上位スプリッテレータが分割する際にに生成したもの、もしくはそこから更に派生したものです。

  4. 一応併用するコードは書けるけど、その場合はおかしいやつと思われる一貫性のない、振る舞いの不適切なスプリッテレータと見なされるので、そのスプリッテレータを使った演算処理について何も保証されません。

0
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?