Juliaでのプロセス並列について調べました。MPI.jlを使えばMPIが使えるので簡単にできるのですが、やはりJuliaの基本機能を使って並列化にチャレンジしてみたいところです。
これまでの記事としては
Juliaでforループをプロセス並列化してみる
があります。
Juliaでの分散並列はMPIとは少し異なっています。詳しくは公式にありますが、慣れていないために私はよくわかっていませんでした。やっとわかってきたので、シェアします。
結論から言うと、艦隊指示書が重要です。
そこに注意することで、配列を分散で持って値を更新することに成功しました。
[追記]分散で持っている配列を引数にして関数を用いて更新することにも成功しました。
MPI
MPIについては詳しくは述べません。
拙い図ですが、こんな感じでしょう。
それぞれのCPUコアを船として考えたときに、船同士が通信します。
ポイントはmyrankがそれぞれのコアの番号だとわかっている時に、
Bob = myrank*2
のような形で書くと、それぞれの船でのBobの値がそれぞれの船に保存されます。それぞれのBobの値が欲しい場合には、gatherやallgetherでコアの番号を指定して取り出してきます。
Julia
Juliaの分散並列はこんな感じです。
違いは、「指示を出す別の船(旗艦)がいる」ということです。MPIでもmyrankが0の部分でデータの受け渡しや出力をやっていたりしていましたから、これはそんなに変わらない、と最初は思っていました。しかし、違っていました。図にあるFuturesというのがポイントです。Juliaの場合、
Bob = @spawnat myrank myrank*2
のような形でそれぞれの船にBobの値をセットすること
for myrank=1:3
Bob = @spawnat myrank myrank*2
end
は「できません」。なぜならば、ここで出てきているBobという名前は「旗艦から見た名前」だからです。このコードだと、ループの最後のmyrank=3の時だけBobが入り、myrank=1とmyrank=2には入っていません。
もし、それぞれの船にBobを定義したければ、
fut = Array{Future}(undef,3)
for myrank=1:3
fut[myrank] = @spawnat myrank myrank*2
end
としなければなりません。
ここで、Futureという型が非常に重要となります。これは旗艦が「どの船に何があるか」を把握するための情報(艦隊指示書)です。ですので、このFutureをうまく扱わないと分散並列をするのが難しいです。
分散並列による行列の分散確保
以上のことをおさえることで、分散並列による行列の値の分散保持と値の更新に成功しました。以下がコードの例です。
using Distributed
addprocs(3) #workerを3個にした。
@everywhere module Parallelmodule
using Distributed
struct Parallelarray{T}
numworkers::Int64
workers::Array{Int64,1}
futures::Array{Future,1}
numelements::Int64
function Parallelarray{T}(n) where T<: Number
numworkers = nworkers() #ワーカーの数を取得
@assert n % numworkers == 0 #配列サイズはワーカー数で割り切れる事
nbun = n ÷ numworkers #一つのワーカーが持つ配列サイズ
workids = workers() #使っているワーカープロセスのIDを取得
futures = Array{Future,1}(undef,numworkers) #艦隊指示書の作成
for (id,myID) in enumerate(workids)
futures[id] = @spawnat myID zeros(T,nbun) #ワーカーmyIDにzero(T,nbun)を作成するぞ、と旗艦指示書に書き込み
end
return new{T}(numworkers,workids,futures,nbun)
end
end
function Base.setindex!(A::Parallelarray,v,id,i) #A[id,i] = vの動作の定義
myid = A.workers[id]
@spawnat myid fetch(A.futures[id])[i] = v #ワーカーmyidでfetchしているので、自分自身が持っている配列をゲットし、i番目にvという値を入れている
return
end
function Base.getindex(A::Parallelarray,id,i) #A[id,i]の動作の定義
myid = A.workers[id]
f = @spawnat myid fetch(A.futures[id])[i] #fetchを直接やってしまうと中のデータがworkerから消されてしまうので、fというFutureを作ってみた。うまくいっているかは不明。
fetch(f)
#fetch(A.futures[id])[i] #ここでは、旗艦に値を転送している
end
function get_numelements(A::Parallelarray)
return A.numelements
end
end
@everywhere using .Parallelmodule
function test()
numworkers = nworkers()
println(numworkers)
println(workers())
n = 12
A = Parallelmodule.Parallelarray{Float64}(n)
@sync for id=1:numworkers
for i=1:A.numelements
A[id,i] = 100*id+i
end
end
@sync for id=1:numworkers
for i=1:A.numelements
println("id = $id,i = $i")
println(A[id,i])
end
end
return
end
test()
説明はコメントに記述しました。このモジュールを使う事で、何も考えずにworkerIDと配列のindexを指定すれば行列を更新することができるようになりました。
追記
fetch(A.futures[id])[i] #ここでは、旗艦に値を転送している
としていたところですが、これだと一度getindexすると値がworkerから消えてしまっているようです。理由は、fetchをしてしまうとworkerから旗艦にデータが移ってしまい、workerから消えてしまうから、のようです。
ということで、少し変更してみました。これでうまくいっているかはまだ不明です。
この書き方だと
function test()
numworkers = nworkers()
println(numworkers)
println(workers())
n = 12
A = Parallelmodule.Parallelarray{Float64}(n)
@sync for id=1:numworkers
for i=1:A.numelements
A[id,i] = 100*id+i
end
end
@sync for id=1:numworkers
for i=1:A.numelements
println("id = $id,i = $i")
println(A[id,i])
end
end
println("done")
@sync for id=1:numworkers
for i=1:A.numelements
A[id,i] = A[id,i]+10000*id+1000*i
end
end
@sync for id=1:numworkers
for i=1:A.numelements
println("id = $id,i = $i")
println(A[id,i])
end
end
end
test()
がちゃんと
3
[2, 3, 4]
id = 1,i = 1
101.0
id = 1,i = 2
102.0
id = 1,i = 3
103.0
id = 1,i = 4
104.0
id = 2,i = 1
201.0
id = 2,i = 2
202.0
id = 2,i = 3
203.0
id = 2,i = 4
204.0
id = 3,i = 1
301.0
id = 3,i = 2
302.0
id = 3,i = 3
303.0
id = 3,i = 4
304.0
done
id = 1,i = 1
11101.0
id = 1,i = 2
12102.0
id = 1,i = 3
13103.0
id = 1,i = 4
14104.0
id = 2,i = 1
21201.0
id = 2,i = 2
22202.0
id = 2,i = 3
23203.0
id = 2,i = 4
24204.0
id = 3,i = 1
31301.0
id = 3,i = 2
32302.0
id = 3,i = 3
33303.0
id = 3,i = 4
34304.0
となっていますので、配列の更新がうまくいっているように見えます。
追記2
それぞれのコアが持っている配列をそれぞれで更新する方法もわかりましたので、追記します。それを行うには、moduleに
function paralleldo(A::Parallelarray,id,func!::Function,indices...)
myid = A.workers[id]
@spawnat myid func!(fetch(A.futures[id]),indices...)
end
を足せばOKです。これで、引数としてfunctionを入れることができ、さらにそのfunctionには好きなだけ変数を入れることができます。
例えば、
@everywhere function test!(data,id,i)
#println(data[i])
data[i] = data[i]+10000*id+1000*i
end
function test()
numworkers = nworkers()
println(numworkers)
println(workers())
n = 12
A = Parallelmodule.Parallelarray{Float64}(n)
@sync for id=1:numworkers
for i=1:A.numelements
A[id,i] = 100*id+i
end
end
@sync for id=1:numworkers
for i=1:A.numelements
println("id = $id,i = $i")
println(A[id,i])
end
end
println("dd")
@sync for id=1:numworkers
for i=1:A.numelements
Parallelmodule.paralleldo(A,id,test!,id,i)
end
end
@sync for id=1:numworkers
for i=1:A.numelements
println("id = $id,i = $i")
println(A[id,i])
end
end
return
end
test()
とすれば、関数test!の中身を好きなようにいじることで、並列計算ができることになります。