LoginSignup
14
15

More than 5 years have passed since last update.

Java8でListをある程度まとめて並列処理

Last updated at Posted at 2014-05-19

すげー簡単に書けて感動したので。

普通にListで並列処理を書く

List<Map<String, Object>> rows = new ArrayList<>(); // データ入れる

rows.parallelStream().forEach(row -> {
    LOG.info("something to do.");
    LOG.info(row);
});

すげー簡単に書けるんですが、これだと1つずつスレッド作られる感じ。
DB処理でトランザクションである程度まとめたいとか、そういうのをやりたい。

ある程度まとめて並行処理する

List<Map<String, Object>> rows = new ArrayList<>(); // データ入れる

for (int start = 0; start < rows.size();) {
    int end = start + 100;
    if (end > oldRows.size()) {
        end = oldRows.size();
    }

    List<Map<String, Object>> subList = rows.subList(start, end);

    ForkJoinPool.commonPool().execute(() -> {
        LOG.info("something to doooooooooooooo.");
        subList.forEach(row -> {
            LOG.info(row);
        });
    });

    start = end;
}
});

// 終了待機
while (!ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.HOURS));

アー……Java8遥かに良いです。
野暮ったくはなりましたが、同じ物をJava7以前で書いたかと思うとすげー楽になったはず。
subListが実質finalで内側からアクセスできるのがポイントだと思ってる。

以上

デッドロックに気をつけてね!(やらかした奴)
あまり調べず書いたので、collectとかでもうちょっといい感じにできるとかあるかも
教えていただけたのなら、それはとってもうれしいなって、思ってしまうのでした。

2016-01-21 再帰でやる

import org.apache.commons.lang3.math.NumberUtils;

void doSomething(List<String> list){
    if(list.isEmpty()){
        return;
    }

    List<String> subList = list.subList(0,NumberUtils.min(100,list.size()));

    ForkJoinPool.commonPool().execute(() -> {
        LOG.info("something to doooooooooooooo.");
        subList.forEach(row -> {
            LOG.info(row);
        });
    });

    this.doSomething(list.subList(NumberUtils.min(100,list.size()));
}

// 要終了待機

startとかendとか、for文とかダルいよね……ということで最近はこんな感じで書くことが多いです。
実際に動かしたコードじゃないので何かしくってたらすみません。

2016-01-21 つーかgroupingByでできんじゃね?

org.apache.commons.lang3.mutable.MutableInt;

void doSomething(List<String> list) {
    MutableInt counter = new MutableInt(-1);

    Map<Integer, List<String>> splitMap = list.stream()
            .peek(s -> counter.increment())
            .collect(Collectors.groupingBy(s -> counter.intValue() / 100));

    splitMap.values().forEach(
            subList -> ForkJoinPool.commonPool().execute(() -> {
                LOG.info("something to doooooooooooooo.");
                    subList.forEach(row -> {
                        LOG.info(row);
                    });
                }));

    // 終了待機
    while (!ForkJoinPool.commonPool().awaitQuiescence(1, TimeUnit.HOURS));
}

ラムダ式内はfinalしか使えないので数値とかめんどくさかったりするんですが、
MutableIntという邪道っぽいクラスがあるのでそれを使って要素数を取ります。

streamはシリアルの場合、1要素ごと実行していくので、
counter.increment() と counter.intValue() は要素ごとになる……はず。
(なのでanyMatchみたいなのがあれば、評価が決まった段階で後の要素では評価しない)
すっごいパズル感と関数型ガン無視で副作用使いまくり感があり、アカン気がしますが……。

この辺はオライリーの本がわかりやすかったです。
http://www.oreilly.co.jp/books/9784873117041/

14
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
14
15