LoginSignup
2
0

More than 5 years have passed since last update.

StreamAPIのソート機能を含むマージ処理実装について

Posted at

はじめに

一寸した業務要件で2つのソートされたファイルをマージして出力することになりました。

以下のような例です
1. インプットファイルは2ファイルで第一キーでソートされている。
2. 出力ファイルは2ファイルの全データからソートして出力する。

インプットファイル1

1,AAA
4,BBB
9,CCC

インプットファイル2

2,DDD
5,EEE
6,FFF

出力ファイル

1,AAA
2,DDD
4,BBB
5,EEE
6,FFF
9,CCC

ってことで、
ReadをStremAPIで行い、マージソートできるようにしてみようと思いました。

うわぁ、
またピンポイントすぎて需要が少ない題材ですね。
(´・ω・)

実装したいこと

こんな感じで実装しようと思いました。
1. 複数のStreamをパラメータとして、単一のStreamを作る。
2. Comparatorを渡してソート順を定義する。
3. 前提として、各Streamはソート済みとする。
4. ステートレスなのは大前提。

実装

StreamをマージするStreamUtilsを作成。

public class StreamUtils {

    private StreamUtils() {
    }

    /**
     * 引数で指定されたストリームをマージソートする
     *
     * @param streamArray ソート済みのストリーム
     * @return
     */
    @SafeVarargs
    public static final <T> Stream<T> merge(final Stream<T>... streamArray) {
        return merge(null, streamArray);
    }

    /**
     * ストリームをマージする
     * 独自のSpliteratorで取得要素を制御する。
     *
     * @param comp ソート条件
     * @param streamArray ソート済みのストリーム
     * @return
     */
    @SafeVarargs
    public static final <T> Stream<T> merge(final Comparator<T> comp, final Stream<T>... streamArray) {
        try {
            MergedIterator<T> iterator = new MergedIterator<>(streamArray);
            iterator.setComparetor(comp);
            Spliterator<T> spliterator = new SpliteratorAdapter<>(iterator);
            return StreamSupport.stream(spliterator, false).onClose(composedClose(streamArray));
        } catch (Exception exception) {
            for (Stream<?> stream : streamArray) {
                try {
                    stream.close();
                } catch (RuntimeException e) {
                    try {
                        exception.addSuppressed(e);
                    } catch (RuntimeException ignore) {
                    }
                }
            }
            throw exception;
        }
    }

    @SafeVarargs
    static <T> Runnable composedClose(final Stream<T>... streamArray) {
        return new Runnable() {
            @Override
            public void run() {
                RuntimeException exception = null;
                for (Stream<?> stream : streamArray) {
                    try {
                        stream.close();
                    } catch (RuntimeException e) {
                        try {
                            if (exception == null) {
                                exception = e;
                            } else {
                                exception.addSuppressed(e);
                            }
                        } catch (RuntimeException ignore) {
                        }
                    }
                }
                if (exception != null) {
                    throw exception;
                }
            }
        };
    }

}

Streamを複数保持し、取得する内容を選択するCompareクラス

public class MergedIterator<T> implements Iterator<T> {

    /**
     * 保持しているStremaインスタンスの次の要素のMap
     */
    private Map<Iterator<T>, T> nextMap;

    /**
     * ソート条件
     */
    private Comparator<T> comp = null;

    /**
     * 
     * @param streamArray マージするStreamの配列
     */
    @SafeVarargs
    public MergedIterator(final Stream<T>... streamArray) {
        this(Arrays.asList(streamArray).stream().map(stream -> stream.iterator()).collect(Collectors.toList()));
    }

    /**
     * 
     * @param itrArray マージするIteratorの配列
     */
    @SafeVarargs
    public MergedIterator(final Iterator<T>... itrArray) {
        this(Arrays.asList(itrArray));
    }

    /**
     *
     * @param itrList マージするIteratorのList
     */
    public MergedIterator(final List<Iterator<T>> itrList) {
        this.nextMap = new HashMap<>();
        for (Iterator<T> itr : itrList) {
            this.nextMap.put(itr, itr.hasNext() ? itr.next() : null);
        }
    }

    /**
     * 比較用のインスタンス
     * @param comp
     */
    public void setComparetor(final Comparator<T> comp) {
        this.comp = comp;
    }

    @Override
    public boolean hasNext() {
        return this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null).count() > 0L;
    }

    @Override
    public void remove() {
        throw new UnsupportedOperationException("Not supported.");
    }

    @Override
    public T next() {
        if (!hasNext()) {
            return null;
        }

        Entry<Iterator<T>, T> nextEntry = this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null)
                .min(new Comparator<Entry<Iterator<T>, T>>() {

                    @Override
                    public int compare(final Entry<Iterator<T>, T> o1, final Entry<Iterator<T>, T> o2) {
                        return MergedIterator.this.comp != null
                                ? MergedIterator.this.comp.compare(o1.getValue(), o2.getValue())
                                : 0;
                    }
                }).orElse(null);

        T returnObject = nextEntry.getValue();
        nextEntry.setValue(nextEntry.getKey().hasNext() ? nextEntry.getKey().next() : null);

        return returnObject;

    }

}

Spliterator(Stream内部で使用する要素走査)クラス

public class SpliteratorAdapter<T> extends Spliterators.AbstractSpliterator<T> {

    private final Iterator<T> iterator;

    /**
     *
     * @param iter
     */
    public SpliteratorAdapter(final Iterator<T> iter) {
        super(Long.MAX_VALUE, 0);
        this.iterator = iter;
    }

    @Override
    public synchronized boolean tryAdvance(final Consumer<? super T> action) {
        if (this.iterator.hasNext()) {
            action.accept(this.iterator.next());
            return true;
        }
        return false;
    }
}

実行コード

以下のように呼び出す。
Streamのリソースクローズについてはマージ後はやってくれるけど生成時(streamArray)は個別にする必要がある。
スマートにできないかなと考えているのだけど。。。。

そこも含めてStreamUtilsに入れてしまえば楽かもしれない。

    // streamArray Stremaの配列
    // comp Comparatorの実装
    try (Stream<List<String>> mergeStream = StreamUtils
                .merge(comp, streamArray)
                .onClose(() -> LOG.debug("全Stream処理完了"))) {
        /* ここで処理実施この例では標準出力して終わり */
        mergeStream.forEach(line -> System.out.println(line));

    } catch (final IOException e) {
        // 例外処理
    }

おわりに

複数のStreamに跨る処理って、直列にくっ付ける以外に結構あるのかなと思います。
DIFFとかも同じようにステートレスでできるので、
その辺りを作りこむのも面白いかと思います。

今回の例はparallelオプションは効かないですが、
効くような処理も何かしらあると思います。

そのあたり一度整理してみたいですね。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0