2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

[解決] Juliaの分散並列パッケージDistributedArraysが遅い

Last updated at Posted at 2021-06-13

Juliaはfor文を並列化するなど簡単な並列化は非常に優秀です。
一方、クラスターマシンやスパコンなどでプロセス並列を実行するときに細かな制御をしたいときには、どうすればいいのでしょうか。まず思いつく方法はDistributedArraysを使う方法です。DistributedArraysは配列を各プロセスが分割して持つために、例えば配列が大きすぎて1ノードのメモリに収まりきらないような計算をしたいときに、計算が可能となります。他の言語、例えばFortranやcでは、このような並列計算はMPIで実行し、それぞれのMPIのプロセスが配列の別の領域を持つことで問題を解いていました。
DistributedArraysは配列がどこにあるかということをほとんど意識せずに巨大な配列を扱えるという意味で素晴らしいパッケージです。
しかしながら、いくつか問題があります。

  1. 作った配列をそのまま書き換えられない
  2. 1コアで普通に配列を操作するよりも非常に遅い

1.に関しましては、

A[1,2] = 3

のような配列のindexを与えて値を代入する方法をDistributedArraysでは実装していないからです。なぜ実装していないかと言いますと、多分ですが、与えたindexに対応する配列の値がどのプロセスにあるかを調べる必要があるからだと思います。これを解決する方法は(ドキュメントには全然記載がありませんが)、

localpart(A)[1,2] = 3

A.localpart[1,2] = 3

とするかです。このlocalpartというものはプロセスが実際に配列として持っている中身です。インデックスは元の配列のインデックスではなく、localpartという配列のインデックスになっていることに気をつけてください。つまり、元の配列のインデックスからどのプロセスが持っているかを調べてそのプロセスが持つlocalpartにアクセスする、というコードを「自前で」書く必要があります。これは面倒なので、別のライブラリを作ることにしました。

2.の遅いについては、このコードを見てください。


function test(A)
    N1,N2,N3 = size(A)
    v = 0
    for i3 = 1:N3
        for i2 = 1:N2
            for i1 =1:N1
                v += cos(A[i1,i2,i3]^2)+i1^2+i2^2+i3^2
            end
        end
    end
    return v
end

using Distributed
using DistributedArrays
using BenchmarkTools

function test3()
    NC = 3
    NV = 256
    A_normal = zeros(ComplexF64,NC,NC,NV)
    A_dist = dzeros(ComplexF64,(NC,NC,NV),workers(),[1,1,1])

    println("Normal Array")
    @btime v = test($A_normal)
    v = test(A_normal)
    println(v)
    println("Distributed Array")
    @btime v = test($A_dist)
    v = test(A_dist)
    println(v)
end
test3()

このコードでは、testという関数で、配列の中身を使って何かを計算して和を取っています。このコードを1並列で(つまり並列化なしで)実行すると、

Normal Array
  17.403 μs (0 allocations: 0 bytes)
5.0650752e7 - 0.0im
Distributed Array
  3.814 ms (23040 allocations: 828.00 KiB)
5.0650752e7 - 0.0im

となります。普通の配列の場合とDistributedArraysの配列の速度の差がひどいことになっています。DistributedArraysのソースコードを見ますと、あるプロセスが自分自身が持っている配列にアクセスするときですらremotecallを使っており、これがメモリアロケーションの増大と計算時間増大を引き起こしているのでしょう。

新しいパッケージ

そこで、新しいパッケージを作ってみました。

module FastDistributedArrays
    using Distributed
    using DistributedArrays
    using LinearAlgebra
    export FastDArray

    struct FastDArray{T,N} <: AbstractArray{T,N}
        source::DArray{T,N}
        _workers::Array{Int64,1}

        function FastDArray{T}(parallel,i...) where T <: Number
            @assert nworkers() == prod(parallel) "There are $(workers()) but $parallel"
            _workers = workers()
            indices = i
            N = length(indices)
            g = dzeros(T,indices,_workers,parallel)
            return new{T,N}(g,_workers)

        end
    end

    Base.display(A::FastDArray) = display(A.source)
    Base.size(A::FastDArray) = size(A.source)

    function Base.getindex(A::FastDArray,i1,i2,i3)
        indices = localindices(A.source)
        lenind = 3
        isinside = true
        isinside *= i1 in indices[1]
        isinside *= i2 in indices[2]
        isinside *= i3 in indices[3]

        if isinside
            index = (i1-indices[1][1]+1,i2-indices[2][1]+1,i3-indices[3][1]+1)
            v = localpart(A.source)[index...]
            return v
        else
            return A.source[i...]
        end

    end
    
end

です。これは、インデックスが自分自身が持っているlocalpartの配列の中にある場合にはremotecallをせずに値を取り出す、というFastDArrayという型です。速度を見てみますと、

using .FastDistributedArrays
function test2()
    NC = 3
    NV = 256
    fg = FastDArray{ComplexF64}([1,1,1],NC,NC,NV)
    fg2 = zeros(ComplexF64,NC,NC,NV)
    g = dzeros(ComplexF64,(NC,NC,NV),workers(),[1,1,1])
    println(fg[1,1,1])


    println("time")
    println("Fast Distributed Array")
    @btime v = test($fg)
    v = test(fg)
    println(v)
    println("Normal Array")
    @btime v = test($fg2)
    v = test(fg2)
    println(v)
    println("Distributed Array")
    @btime v = test($g)
    v = test(g)
    println(v)

    #println(v)
end

を実行すると、

0.0 + 0.0im
time
Fast Distributed Array
  452.113 μs (18235 allocations: 536.94 KiB)
5.0650752e7 - 0.0im
Normal Array
  18.995 μs (0 allocations: 0 bytes)
5.0650752e7 - 0.0im
Distributed Array
  3.798 ms (23040 allocations: 828.00 KiB)
5.0650752e7 - 0.0im

3.798 msから452.113 μsになりましたので、9倍くらい速くなりました。一方、普通の配列は18.995 μsですから、まだ20倍くらい遅いです。遅い原因として考えられるのは、18235 allocationsでしょう。このメモリーアロケーションを減らせば速くなりそうです。

問題

しかし、

function Base.getindex(A::FastDArray,i1,i2,i3)
        indices = localindices(A.source)
        lenind = 3
        isinside = true
        isinside *= i1 in indices[1]
        isinside *= i2 in indices[2]
        isinside *= i3 in indices[3]

        if isinside
            index = (i1-indices[1][1]+1,i2-indices[2][1]+1,i3-indices[3][1]+1)
            v = localpart(A.source)[index...]
            return v
        else
            return A.source[i...]
        end
    end

をみてもメモリアロケーションしてそうな場所が無いように見えます。実は、このコードのv = localpart(A.source)[index...]がメモリアロケーションを引き起こしています。
例えば、

function Base.getindex(A::FastDArray,i1,i2,i3)
        indices = localindices(A.source)
        lenind = 3
        isinside = true
        isinside *= i1 in indices[1]
        isinside *= i2 in indices[2]
        isinside *= i3 in indices[3]

        if isinside
            index = (i1-indices[1][1]+1,i2-indices[2][1]+1,i3-indices[3][1]+1)
            @time v = localpart(A.source)[index...]
            return v
        else
            return A.source[i...]
        end
    end

として実行すると、 0.000000 seconds (1 allocation: 32 bytes)が繰り返されます。こんなに複雑なことをやっているから問題なんだ、と考えるかたもいるかもしれませんので、これを

function Base.getindex(A::FastDArray,i1,i2,i3)
        v = localpart(A.source)[i1,i2,i3]
        return v
end

と単純化してみます。これは並列化されている場合はちゃんと動きませんが、プロセスが1つなら同じ結果になります。しかし、実行すると、

Fast Distributed Array
  383.884 μs (18235 allocations: 536.94 KiB)
5.0650752e7 - 0.0im
Normal Array
  17.700 μs (0 allocations: 0 bytes)
5.0650752e7 - 0.0im
Distributed Array
  3.794 ms (23040 allocations: 828.00 KiB)
5.0650752e7 - 0.0im

となり、メモリアロケーションを大量にしているのです。もちろん、

function Base.getindex(A::FastDArray,i1,i2,i3)
        @time v = localpart(A.source)[i1,i2,i3]
        return v
end

のように@timeをしてみると、この場所にメモリーアロケーションが生じているのが見えます。

この原因が全くわかりません。

追記

以下のコードのようにlocalpartを直接取り出す形で書きますと、

function test_local(A)
    N1,N2,N3 = size(A)
    v = 0
    for i3 = 1:N3
        for i2 = 1:N2
            for i1 =1:N1
                v += cos(A.localpart[i1,i2,i3]^2)+i1^2+i2^2+i3^2
            end
        end
    end
    return v
end
  22.693 μs (0 allocations: 0 bytes)
5.0650752e7 - 0.0im

となり、メモリアロケーションが消えました。structの問題なのでしょうか...

追記2

次に、

function test_local2(A)
    N1,N2,N3 = size(A)
    v = 0
    for i3 = 1:N3
        for i2 = 1:N2
            for i1 =1:N1
                v += cos(A.source.localpart[i1,i2,i3]^2)+i1^2+i2^2+i3^2
            end
        end
    end
    return v
end

というコードを書き、struct FastDArrayの要素に直接アクセスするようにしてみました。
しかしこれは

Fast Distributed Array2
  352.043 μs (18235 allocations: 536.94 KiB)
5.0650752e7 - 0.0im

となり、同じになります。つまり、structに入れているか入れていないかでメモリーアロケーションの有無が発生しています。どういうことでしょう。

解決策

Twitterでgenkurokiさんに解決策を教えてもらいました。

    struct FastDArray{T,N} <: AbstractArray{T,N}
        source::DArray{T,N}
        _workers::Array{Int64,1}

        function FastDArray{T}(parallel,i...) where T <: Number
            @assert nworkers() == prod(parallel) "There are $(workers()) but $parallel"
            _workers = workers()
            indices = i
            N = length(indices)
            g = dzeros(T,indices,_workers,parallel)
            return new{T,N}(g,_workers)

        end
    end

    struct FastDArray{T,N} <: AbstractArray{T,N}
        source::DArray{T,N,Array{T,N}}
        _workers::Array{Int64,1}

        function FastDArray{T}(parallel,i...) where T <: Number
            @assert nworkers() == prod(parallel) "There are $(workers()) but $parallel"
            _workers = workers()
            indices = i
            N = length(indices)
            g = dzeros(T,indices,_workers,parallel)
            return new{T,N}(g,_workers)

        end
    end

とすれば解決するようです。どうやらDArrayDArray{T,N,A}と三つを指定して初めて型安定になるようで、structに何かを定義するときは型を安定させる、ということが重要のようです。Aはそれぞれのプロセスが持っているlocalな配列の型のようです。
しかしDistributedArraysのソースコードを見ないとわからないレベル、というのは少し不親切ですね。というかT,Nと決めたらAは常にArray{T,N}に設定してしまって良いと思うのですが、なぜわざわざAを指定するようになっているのでしょう...

2
2
2

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
2

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?