Juliaはfor文を並列化するなど簡単な並列化は非常に優秀です。
一方、クラスターマシンやスパコンなどでプロセス並列を実行するときに細かな制御をしたいときには、どうすればいいのでしょうか。まず思いつく方法はDistributedArraysを使う方法です。DistributedArraysは配列を各プロセスが分割して持つために、例えば配列が大きすぎて1ノードのメモリに収まりきらないような計算をしたいときに、計算が可能となります。他の言語、例えばFortranやcでは、このような並列計算はMPIで実行し、それぞれのMPIのプロセスが配列の別の領域を持つことで問題を解いていました。
DistributedArraysは配列がどこにあるかということをほとんど意識せずに巨大な配列を扱えるという意味で素晴らしいパッケージです。
しかしながら、いくつか問題があります。
- 作った配列をそのまま書き換えられない
- 1コアで普通に配列を操作するよりも非常に遅い
1.に関しましては、
A[1,2] = 3
のような配列のindexを与えて値を代入する方法をDistributedArraysでは実装していないからです。なぜ実装していないかと言いますと、多分ですが、与えたindexに対応する配列の値がどのプロセスにあるかを調べる必要があるからだと思います。これを解決する方法は(ドキュメントには全然記載がありませんが)、
localpart(A)[1,2] = 3
か
A.localpart[1,2] = 3
とするかです。このlocalpartというものはプロセスが実際に配列として持っている中身です。インデックスは元の配列のインデックスではなく、localpartという配列のインデックスになっていることに気をつけてください。つまり、元の配列のインデックスからどのプロセスが持っているかを調べてそのプロセスが持つlocalpartにアクセスする、というコードを「自前で」書く必要があります。これは面倒なので、別のライブラリを作ることにしました。
2.の遅いについては、このコードを見てください。
function test(A)
N1,N2,N3 = size(A)
v = 0
for i3 = 1:N3
for i2 = 1:N2
for i1 =1:N1
v += cos(A[i1,i2,i3]^2)+i1^2+i2^2+i3^2
end
end
end
return v
end
using Distributed
using DistributedArrays
using BenchmarkTools
function test3()
NC = 3
NV = 256
A_normal = zeros(ComplexF64,NC,NC,NV)
A_dist = dzeros(ComplexF64,(NC,NC,NV),workers(),[1,1,1])
println("Normal Array")
@btime v = test($A_normal)
v = test(A_normal)
println(v)
println("Distributed Array")
@btime v = test($A_dist)
v = test(A_dist)
println(v)
end
test3()
このコードでは、testという関数で、配列の中身を使って何かを計算して和を取っています。このコードを1並列で(つまり並列化なしで)実行すると、
Normal Array
17.403 μs (0 allocations: 0 bytes)
5.0650752e7 - 0.0im
Distributed Array
3.814 ms (23040 allocations: 828.00 KiB)
5.0650752e7 - 0.0im
となります。普通の配列の場合とDistributedArraysの配列の速度の差がひどいことになっています。DistributedArraysのソースコードを見ますと、あるプロセスが自分自身が持っている配列にアクセスするときですらremotecallを使っており、これがメモリアロケーションの増大と計算時間増大を引き起こしているのでしょう。
新しいパッケージ
そこで、新しいパッケージを作ってみました。
module FastDistributedArrays
using Distributed
using DistributedArrays
using LinearAlgebra
export FastDArray
struct FastDArray{T,N} <: AbstractArray{T,N}
source::DArray{T,N}
_workers::Array{Int64,1}
function FastDArray{T}(parallel,i...) where T <: Number
@assert nworkers() == prod(parallel) "There are $(workers()) but $parallel"
_workers = workers()
indices = i
N = length(indices)
g = dzeros(T,indices,_workers,parallel)
return new{T,N}(g,_workers)
end
end
Base.display(A::FastDArray) = display(A.source)
Base.size(A::FastDArray) = size(A.source)
function Base.getindex(A::FastDArray,i1,i2,i3)
indices = localindices(A.source)
lenind = 3
isinside = true
isinside *= i1 in indices[1]
isinside *= i2 in indices[2]
isinside *= i3 in indices[3]
if isinside
index = (i1-indices[1][1]+1,i2-indices[2][1]+1,i3-indices[3][1]+1)
v = localpart(A.source)[index...]
return v
else
return A.source[i...]
end
end
end
です。これは、インデックスが自分自身が持っているlocalpartの配列の中にある場合にはremotecallをせずに値を取り出す、というFastDArray
という型です。速度を見てみますと、
using .FastDistributedArrays
function test2()
NC = 3
NV = 256
fg = FastDArray{ComplexF64}([1,1,1],NC,NC,NV)
fg2 = zeros(ComplexF64,NC,NC,NV)
g = dzeros(ComplexF64,(NC,NC,NV),workers(),[1,1,1])
println(fg[1,1,1])
println("time")
println("Fast Distributed Array")
@btime v = test($fg)
v = test(fg)
println(v)
println("Normal Array")
@btime v = test($fg2)
v = test(fg2)
println(v)
println("Distributed Array")
@btime v = test($g)
v = test(g)
println(v)
#println(v)
end
を実行すると、
0.0 + 0.0im
time
Fast Distributed Array
452.113 μs (18235 allocations: 536.94 KiB)
5.0650752e7 - 0.0im
Normal Array
18.995 μs (0 allocations: 0 bytes)
5.0650752e7 - 0.0im
Distributed Array
3.798 ms (23040 allocations: 828.00 KiB)
5.0650752e7 - 0.0im
3.798 msから452.113 μsになりましたので、9倍くらい速くなりました。一方、普通の配列は18.995 μsですから、まだ20倍くらい遅いです。遅い原因として考えられるのは、18235 allocationsでしょう。このメモリーアロケーションを減らせば速くなりそうです。
問題
しかし、
function Base.getindex(A::FastDArray,i1,i2,i3)
indices = localindices(A.source)
lenind = 3
isinside = true
isinside *= i1 in indices[1]
isinside *= i2 in indices[2]
isinside *= i3 in indices[3]
if isinside
index = (i1-indices[1][1]+1,i2-indices[2][1]+1,i3-indices[3][1]+1)
v = localpart(A.source)[index...]
return v
else
return A.source[i...]
end
end
をみてもメモリアロケーションしてそうな場所が無いように見えます。実は、このコードのv = localpart(A.source)[index...]
がメモリアロケーションを引き起こしています。
例えば、
function Base.getindex(A::FastDArray,i1,i2,i3)
indices = localindices(A.source)
lenind = 3
isinside = true
isinside *= i1 in indices[1]
isinside *= i2 in indices[2]
isinside *= i3 in indices[3]
if isinside
index = (i1-indices[1][1]+1,i2-indices[2][1]+1,i3-indices[3][1]+1)
@time v = localpart(A.source)[index...]
return v
else
return A.source[i...]
end
end
として実行すると、 0.000000 seconds (1 allocation: 32 bytes)
が繰り返されます。こんなに複雑なことをやっているから問題なんだ、と考えるかたもいるかもしれませんので、これを
function Base.getindex(A::FastDArray,i1,i2,i3)
v = localpart(A.source)[i1,i2,i3]
return v
end
と単純化してみます。これは並列化されている場合はちゃんと動きませんが、プロセスが1つなら同じ結果になります。しかし、実行すると、
Fast Distributed Array
383.884 μs (18235 allocations: 536.94 KiB)
5.0650752e7 - 0.0im
Normal Array
17.700 μs (0 allocations: 0 bytes)
5.0650752e7 - 0.0im
Distributed Array
3.794 ms (23040 allocations: 828.00 KiB)
5.0650752e7 - 0.0im
となり、メモリアロケーションを大量にしているのです。もちろん、
function Base.getindex(A::FastDArray,i1,i2,i3)
@time v = localpart(A.source)[i1,i2,i3]
return v
end
のように@time
をしてみると、この場所にメモリーアロケーションが生じているのが見えます。
この原因が全くわかりません。
追記
以下のコードのようにlocalpartを直接取り出す形で書きますと、
function test_local(A)
N1,N2,N3 = size(A)
v = 0
for i3 = 1:N3
for i2 = 1:N2
for i1 =1:N1
v += cos(A.localpart[i1,i2,i3]^2)+i1^2+i2^2+i3^2
end
end
end
return v
end
22.693 μs (0 allocations: 0 bytes)
5.0650752e7 - 0.0im
となり、メモリアロケーションが消えました。structの問題なのでしょうか...
追記2
次に、
function test_local2(A)
N1,N2,N3 = size(A)
v = 0
for i3 = 1:N3
for i2 = 1:N2
for i1 =1:N1
v += cos(A.source.localpart[i1,i2,i3]^2)+i1^2+i2^2+i3^2
end
end
end
return v
end
というコードを書き、struct FastDArrayの要素に直接アクセスするようにしてみました。
しかしこれは
Fast Distributed Array2
352.043 μs (18235 allocations: 536.94 KiB)
5.0650752e7 - 0.0im
となり、同じになります。つまり、structに入れているか入れていないかでメモリーアロケーションの有無が発生しています。どういうことでしょう。
解決策
Twitterでgenkurokiさんに解決策を教えてもらいました。
struct FastDArray{T,N} <: AbstractArray{T,N}
source::DArray{T,N}
_workers::Array{Int64,1}
function FastDArray{T}(parallel,i...) where T <: Number
@assert nworkers() == prod(parallel) "There are $(workers()) but $parallel"
_workers = workers()
indices = i
N = length(indices)
g = dzeros(T,indices,_workers,parallel)
return new{T,N}(g,_workers)
end
end
を
struct FastDArray{T,N} <: AbstractArray{T,N}
source::DArray{T,N,Array{T,N}}
_workers::Array{Int64,1}
function FastDArray{T}(parallel,i...) where T <: Number
@assert nworkers() == prod(parallel) "There are $(workers()) but $parallel"
_workers = workers()
indices = i
N = length(indices)
g = dzeros(T,indices,_workers,parallel)
return new{T,N}(g,_workers)
end
end
とすれば解決するようです。どうやらDArray
はDArray{T,N,A}
と三つを指定して初めて型安定になるようで、structに何かを定義するときは型を安定させる、ということが重要のようです。Aはそれぞれのプロセスが持っているlocalな配列の型のようです。
しかしDistributedArraysのソースコードを見ないとわからないレベル、というのは少し不親切ですね。というかT,Nと決めたらAは常にArray{T,N}
に設定してしまって良いと思うのですが、なぜわざわざAを指定するようになっているのでしょう...