LoginSignup
20
15

More than 5 years have passed since last update.

[Groovy]GParsで並列処理(基本&コレクション編)

Last updated at Posted at 2014-09-23

参考

GPars
ユーザーガイド
APIドキュメント(GroovyDoc)

GParsってなに?

Groovyで並列/並行処理を簡単に実行できるようにするライブラリです。
ライブラリなのでjarです。

  • 並列コレクション
  • CSP(Communicating Sequential Processes)
  • Actors(アクター)
  • Agents(エージェント)
  • Dataflow(データフロー)
  • STM(Software Transactional Memory)

がGroovyで実現できます。
なんか他の言語で見かけた感じがするぞ?と思ったら公式にも書いてあるとおり、いろんな言語のいい部分にインスパイアされてるよ、とのことです。

なお、言葉の使い方として並列と並行を特に意識して使いわけていません。
同時に実行する、というフワっとした感じで受け止めてください。
厳密に言うと異なると思いますが、この記事のレベルとして底まで掘り下げるものではありません。

GParsライブラリの使い方

Groovy2.3には標準でバンドルされているのでimport書くだけでOK!
もしそんなもの無いよ!という場合もGroovyの必殺技grabを以下のように書けばOK
@Grab(group='org.codehaus.gpars', module='gpars', version='1.2.1')

コレクションの並列実行

基本

すべての基本はParallel Arraysにあり。
これは本来jsr-166yのものですが、GParsは2つの方法でより便利にしています。
それがGParsPoolGParsExecutorsPoolだ!
この2つのクラスはGroovy標準のcollect()とかeach、findAllといったコレクション操作用のメソッドを並列実行用に拡張して提供しています。

パット見では普通のGroovyコードと大差ないですね?

def selfPortraits = images.findAllParallel{it.contains me}.collectParallel {it.resize()}

もっと関数型っぽくmap/reduceなんてのも使えます。

def smallestSelfPortrait = images.parallel.filter{it.contains me}.map{it.resize()}.min{it.sizeInMB}

GParsPool

ParallelArraysを操る、2つあるクラスのうちの一つです。
jsr-166yベースです。

基本

もっともベーシックなソースは以下のとおりです。

import groovyx.gpars.*
import jsr166y.ForkJoinPool

GParsPool.withPool { ForkJoinPool pool ->
    (1..5).eachParallel { print it }
}

なんとこれだけです!
コレだけでコレクションの並列実行が実現できます。
実行結果は、1から5の整数が一行に表示されます(毎回実行結果が異なります。)
ちなみに、このソースは以下のように更に短くかけます。

import static groovyx.gpars.GParsPool.*

withPool {
    (1..5).eachParallel { print it }
}

このように、コレクションを並列化して処理したい場合は、withPool句の中でGroovy標準のコレクション操作メソッド名 + Parallelというメソッドを使うだけでOKです。

GParsPool.withPoolで利用できる並列化用メソッドは以下のとおりです。

  • eachParallel()
  • eachWithIndexParallel()
  • collectParallel()
  • collectManyParallel()
  • findAllParallel()
  • findAnyParallel
  • findParallel()
  • everyParallel()
  • anyParallel()
  • grepParallel()
  • groupByParallel()
  • foldParallel()
  • minParallel()
  • maxParallel()
  • sumParallel()
  • splitParallel()
  • countParallel()
  • foldParallel()

以降、型などを可能な限り省略しないように記述していきます。

色々試す

では、サンプルとして、リストの各要素を表示する前に1秒スリープする処理を考えます。
実行すると当然5秒かかります。

println "START"
(1..5).each {
    sleep(1000)
    println it
}
println "END"

これをGParsPoolを使って以下のように書き直します。

import groovyx.gpars.*
import jsr166y.ForkJoinPool

println "START"
GParsPool.withPool { ForkJoinPool pool ->
    (1..5).eachParallel {
        sleep(1000)
        println it
    }
}
println "END"

なんと!コレだけで実行時間が1秒になりました!
これはGParsによってコレクションが並列処理されたためです。
なお、環境によって実行時間が異なります。
というのも、利用されるスレッド数が、デフォルトだとマシンのCPUコア数+1のためです。
私の環境はCPUコア数が4つなので合計4+1の5スレッドなので、体感としては1秒の処理時間となりました。
この一度に利用されるスレッド数はwithPoolの第1引数で指定できます。

// 10スレッド利用する
withPool(10) {
    (1..5).eachParallel { print it }
}

既存のライブラリなどに手を加えずに並列化する方法

そんな夢のようなことが実現できます。
GparsPool.withPoolに渡すクロージャの中で、並列化したいコレクション自体を変換します。
そのために以下の3つのメソッドが用意されています。

メソッド名 機能
makeConcurrent() コレクションを並列実行用に変換
makeSequential 並列実行用コレクションを通常のコレクションに戻す
asConcurrent() 役割はmakeConcurrent()と同じだが、非破壊的。

じゃあ実際にどうやって使うの?というのは以下を参照。

import groovyx.gpars.*
import jsr166y.ForkJoinPool


GParsPool.withPool { ForkJoinPool pool ->

    List nameList = ['Joe', 'Alice', 'Dave', 'Jason']

    // 通常のGroovyのコレクションを並列実行用に変換することが出来る。
    // GParsを意識せずに実装されているメソッド(つまり普通のリストを扱うことを想定している)
    // hogeメソッドに渡されたコレックションは、すでに並列実行される状態になっている。
    // そのため、以下の実行結果は"JASON,ALICE,"か”ALICE,JASON,”のどちらになり、不定。
    // 当然実行時間は、1秒となる。(CPUは数が3個以上の場合)
    nameList.makeConcurrent()
    assert ['ALICE', 'JASON'] == hoge(nameList)

    // 並列用に変換されているコレクションを元に戻す
    // 実行結果は
    // つまり普通のコレクションに戻るので、実行結果は並列化されず、5秒かかる。
    nameList.makeSequential()
    assert ['ALICE', 'JASON'] == hoge(nameList)

    // 破壊的メソッドを使わずに、コレクションを並列実行用に変換することも出来る。
    nameList.asConcurrent {
        assert ['ALICE', 'JASON'] == hoge(nameList)
    }
}

/**
 * 
 * @param names
 * @return List
 * このメソッド自体は普通のGroovy標準のListを受け取ることを想定している。
 */
def hoge(List<String> names) {
    println "execute"
    names.each{
        println it
        sleep(1000)
    }.findAll{
        it.size() > 4
    }.collect{
        print "${it.toUpperCase()},"
        it.toUpperCase()
    }
}

withPoolを飛び出す!

毎回毎回Gpars.withPoolもしくはwithPoolって指定したくない、インデントが深くなるのがイヤというのは当然出てくる不満。
ということで、groovyx.gpars.ParallelEnhance.enhanceInstance()を使うことで、ワザワザwithPoolを使う必要がなくなります。

import groovyx.gpars.*

List list = (1..5)
ParallelEnhancer.enhanceInstance(list)

// withPoolの中じゃないのにxxxParallelメソッドが使える!
list.eachParallel { print it }

// asConncurrentも使えちゃう
list.asConcurrent { list.each{print it} }

// makeConcurrentも使えちゃう
list.makeConcurrent()
list.each {print it}

// ということは当然makeSequentialも使える
list.makeSequential()
list.each {print it}

注意、メモ

並列化された状態で処理されるコレクションは、その処理自体は確かに並列化されて同時に実行されるけど、その実行結果は普通のコレクションと同じように順番が保持されます。
例えば以下のソース

GParsPool.withPool {
    def list = [1,2,3,4,5,6]
    assert list == list.collectParallel {
        println it
        it
    }
}

もちろん並列実行されるので、画面に表示される値は不定です。
ただし、collectParallelが最終的に返す値(言い換えるとコレクションの変換結果)は、元の順番を保持するので、上記のサンプルコードは必ずassertをパスします。

なお、以下のようなコードは絶対に実行してはイケません。

def thumbnails = []
images.eachParallel {thumbnails << it.thumbnail}  //Concurrently accessing a not-thread-safe collection of thumbnails, don't do this!

GParsExecutorsPool

ParallelArraysを操る、2つあるクラスのうちの一つです。
Java Executorsベースです。
そのため、余計なjsr-166y用jarファイルが必要ありません。
でも、パフォーマンス的にはGParsPoolの方が良いです。

公式のサンプルソース自体動作しない上に、GParsPoolに比べて旨味が感じられないので思い切って省略。

まとめ

調べながら書いているので多分間違っている部分が多々有ります。
また、map/redueceとか調べきれていない箇所が山ほどあるので都度別記事として書いていく予定です。

20
15
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
20
15