はじめに
一寸した業務要件で2つのソートされたファイルをマージして出力することになりました。
以下のような例です
- インプットファイルは2ファイルで第一キーでソートされている。
- 出力ファイルは2ファイルの全データからソートして出力する。
インプットファイル1
1,AAA
4,BBB
9,CCC
インプットファイル2
2,DDD
5,EEE
6,FFF
出力ファイル
1,AAA
2,DDD
4,BBB
5,EEE
6,FFF
9,CCC
ってことで、
ReadをStremAPIで行い、マージソートできるようにしてみようと思いました。
うわぁ、
またピンポイントすぎて需要が少ない題材ですね。
(´・ω・)
実装したいこと
こんな感じで実装しようと思いました。
- 複数のStreamをパラメータとして、単一のStreamを作る。
- Comparatorを渡してソート順を定義する。
- 前提として、各Streamはソート済みとする。
- ステートレスなのは大前提。
実装
StreamをマージするStreamUtilsを作成。
public class StreamUtils {
private StreamUtils() {
}
/**
* 引数で指定されたストリームをマージソートする
*
* @param streamArray ソート済みのストリーム
* @return
*/
@SafeVarargs
public static final <T> Stream<T> merge(final Stream<T>... streamArray) {
return merge(null, streamArray);
}
/**
* ストリームをマージする
* 独自のSpliteratorで取得要素を制御する。
*
* @param comp ソート条件
* @param streamArray ソート済みのストリーム
* @return
*/
@SafeVarargs
public static final <T> Stream<T> merge(final Comparator<T> comp, final Stream<T>... streamArray) {
try {
MergedIterator<T> iterator = new MergedIterator<>(streamArray);
iterator.setComparetor(comp);
Spliterator<T> spliterator = new SpliteratorAdapter<>(iterator);
return StreamSupport.stream(spliterator, false).onClose(composedClose(streamArray));
} catch (Exception exception) {
for (Stream<?> stream : streamArray) {
try {
stream.close();
} catch (RuntimeException e) {
try {
exception.addSuppressed(e);
} catch (RuntimeException ignore) {
}
}
}
throw exception;
}
}
@SafeVarargs
static <T> Runnable composedClose(final Stream<T>... streamArray) {
return new Runnable() {
@Override
public void run() {
RuntimeException exception = null;
for (Stream<?> stream : streamArray) {
try {
stream.close();
} catch (RuntimeException e) {
try {
if (exception == null) {
exception = e;
} else {
exception.addSuppressed(e);
}
} catch (RuntimeException ignore) {
}
}
}
if (exception != null) {
throw exception;
}
}
};
}
}
Streamを複数保持し、取得する内容を選択するCompareクラス
public class MergedIterator<T> implements Iterator<T> {
/**
* 保持しているStremaインスタンスの次の要素のMap
*/
private Map<Iterator<T>, T> nextMap;
/**
* ソート条件
*/
private Comparator<T> comp = null;
/**
*
* @param streamArray マージするStreamの配列
*/
@SafeVarargs
public MergedIterator(final Stream<T>... streamArray) {
this(Arrays.asList(streamArray).stream().map(stream -> stream.iterator()).collect(Collectors.toList()));
}
/**
*
* @param itrArray マージするIteratorの配列
*/
@SafeVarargs
public MergedIterator(final Iterator<T>... itrArray) {
this(Arrays.asList(itrArray));
}
/**
*
* @param itrList マージするIteratorのList
*/
public MergedIterator(final List<Iterator<T>> itrList) {
this.nextMap = new HashMap<>();
for (Iterator<T> itr : itrList) {
this.nextMap.put(itr, itr.hasNext() ? itr.next() : null);
}
}
/**
* 比較用のインスタンス
* @param comp
*/
public void setComparetor(final Comparator<T> comp) {
this.comp = comp;
}
@Override
public boolean hasNext() {
return this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null).count() > 0L;
}
@Override
public void remove() {
throw new UnsupportedOperationException("Not supported.");
}
@Override
public T next() {
if (!hasNext()) {
return null;
}
Entry<Iterator<T>, T> nextEntry = this.nextMap.entrySet().stream().filter(entry -> entry.getValue() != null)
.min(new Comparator<Entry<Iterator<T>, T>>() {
@Override
public int compare(final Entry<Iterator<T>, T> o1, final Entry<Iterator<T>, T> o2) {
return MergedIterator.this.comp != null
? MergedIterator.this.comp.compare(o1.getValue(), o2.getValue())
: 0;
}
}).orElse(null);
T returnObject = nextEntry.getValue();
nextEntry.setValue(nextEntry.getKey().hasNext() ? nextEntry.getKey().next() : null);
return returnObject;
}
}
Spliterator(Stream内部で使用する要素走査)クラス
public class SpliteratorAdapter<T> extends Spliterators.AbstractSpliterator<T> {
private final Iterator<T> iterator;
/**
*
* @param iter
*/
public SpliteratorAdapter(final Iterator<T> iter) {
super(Long.MAX_VALUE, 0);
this.iterator = iter;
}
@Override
public synchronized boolean tryAdvance(final Consumer<? super T> action) {
if (this.iterator.hasNext()) {
action.accept(this.iterator.next());
return true;
}
return false;
}
}
実行コード
以下のように呼び出す。
Streamのリソースクローズについてはマージ後はやってくれるけど生成時(streamArray)は個別にする必要がある。
スマートにできないかなと考えているのだけど。。。。
そこも含めてStreamUtilsに入れてしまえば楽かもしれない。
// streamArray Stremaの配列
// comp Comparatorの実装
try (Stream<List<String>> mergeStream = StreamUtils
.merge(comp, streamArray)
.onClose(() -> LOG.debug("全Stream処理完了"))) {
/* ここで処理実施この例では標準出力して終わり */
mergeStream.forEach(line -> System.out.println(line));
} catch (final IOException e) {
// 例外処理
}
おわりに
複数のStreamに跨る処理って、直列にくっ付ける以外に結構あるのかなと思います。
DIFFとかも同じようにステートレスでできるので、
その辺りを作りこむのも面白いかと思います。
今回の例はparallelオプションは効かないですが、
効くような処理も何かしらあると思います。
そのあたり一度整理してみたいですね。