自分の為に https://docs.julialang.org/en/v1/manual/parallel-computing/ を訳してみる。続き。
リモート参照とAbstractChannel
リモート参照は常にAbstractChannel
の何らかの実装を参照している。
AbstractChannel
の具象実装(Channel
など)は、put!
、take!
、fetch
、isready
、wait
を実装しなければならない。Future
で参照されるリモートオブジェクトはChannel{Any}(1)
、すなわちサイズ1で任意の型のオブジェクトを保持できるチャンネルに保持される。
書き換え可能なRemoteChannel
は、任意の型とサイズのチャンネルもしくは任意のAbstractChannel
の実装を指すことができる。
コンストラクタRemoteChannel(f::Function, pid)()
を実行すると、特定の型の一つ以上の値を保持できるチャンネルへの参照を作成する。f
はpid
上で実行される関数で、AbstractChannel
を返すものでなければならない。
例えば、RemoteChannel(()->Channel{Int}(10), pid)はInt
型でサイズ10のチャンネルへの参照を返す。
そのチャンネルはワーカpid
上に存在する。
RemoteChannel
はプロキシになっており、これに対するput!
、take!
、fetch
、isready
、wait
の呼び出しは、リモートプロセス上の実際のチャンネルにフォワードされる。
したがって、RemoteChannel
を、ユーザが実装したAbstractChannel
オブジェクトに対する参照として使うこともできる。簡単な例がExamplesレポジトリのdictchannel.jl
にある。これはリモートストアとしてディクショナリを用いるものが。
ChannelとRemoteChannel
-
Channel
はプロセスローカルなものだ。ワーカ2はワーカ3上のChannel
を直接参照することはできないし、逆も同じだ。一方、RemoteChannel
はワーカをまたがって値のやりとりができる。 -
RemoteChannel
はChannel
に対するハンドルだと考えてよい。 -
RemoteChannel
に関連付けられたプロセスIDpid
は、そのRemoteChannel
の本体であるChannel
があるプロセスを指す。 -
RemoteChannel
への参照を持っていれば、任意のプロセスからそのチャンネルへ書き込み、読み出すことができる。データは自動的にそのRemoteChannel
が関連付けられたプロセスへと送信(もしくはそのプロセスから受信)される。 -
Channel
をシリアライズするとチャンネル内に存在するデータもシリアライズされ、それをデシリアライズすると元のオブジェクトのコピーができる。 - これに対して
RemoteChannel
をシリアライズしても、それが参照しているチャンネルを特定するための場所と識別子しかシリアライズされない。これを任意のプロセスでデシリアライズすると元のRemoteChannel
と同じチャンネルを参照したRemoteChannel
が出来上がる。
これまでに示したチャンネルのサンプルをプロセス間通信するように書き直す事ができる。
4つのワーカが一つのリモートチャンネルjobs
からのジョブを処理するようにする。ジョブはジョブID(job_id
)で識別され、チャンネルに書き込まれる。リモートで実行されるタスクは、job_id
を読み出しランダムな時間だけ待ち、job_id
と実行時間とプロセスのpid
をresults
チャンネルに書き込む。最後に、マスタプロセスですべての結果を表示する。
julia> addprocs(4); # add worker processes
julia> const jobs = RemoteChannel(()->Channel{Int}(32));
julia> const results = RemoteChannel(()->Channel{Tuple}(32));
julia> @everywhere function do_work(jobs, results) # define work function everywhere
while true
job_id = take!(jobs)
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
put!(results, (job_id, exec_time, myid()))
end
end
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> for p in workers() # start tasks on the workers to process requests in parallel
remote_do(do_work, p, jobs, results)
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time, where = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
global n = n - 1
end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741
リモート参照と分散ガベージコレクション
リモート参照で参照されているオブジェクトは、すべての参照がクラスタから削除されるまで、解放することができない。
値を保持しているノードは、どのワーカがその値に対する参照を持っているかを管理している。RemoteChannel
や(まだfetch
されていない) Future
が他のワーカにシリアライズされて転送された場合には、その参照が指している先のノードにノーティフィケーションが送信される。また、RemoteChannel
や(まだfetch
されていない) Future
がローカルにGCされた場合にも、値を保持しているノードにノーティフィケーションが送られる。
この挙動はクラスタ環境対応のシリアライザで実現されている。リモート参照はクラスタ実行している場合にだけ意味を持つ。通常のI/Oオブジェクトに/から、参照をシリアライズ/デシリアライズすることはできない。
ノーティフィケーションは、2つの管理メッセージの送信で行われる。リモート参照がシリアライズされて他のプロセスに送られた場合には「参照追加」メッセージが、リモート参照がローカルにGCされた場合には「参照削除」メッセージが送信される。
Future
は一度だけし書き込めないしローカルにキャッシュされるので、Future
に対してfetch
が行われると、値の保有しているノード上の参照数管理情報も更新される。
ある値を保有しているノードは、その値を指しているすべての参照がなくなるとその値を開放する。
すでに値をfetch
したFuture
をシリアライズして別のノードに転送した場合には、キャッシュされている値そのものも転送される。この時点でもとの参照先の値がすでに回収されているかもしれないからだ。
オブジェクトがローカルなGCで回収されるタイミングは、オブジェクトのサイズとその時点でのメモリの逼迫状況に依存することに注意しよう。
リモート参照の場合、ローカルに保持される参照オブジェクトのサイズは非常に小さいが、リモートに保持される値は非常に大きい場合がある。ローカルオブジェクトはすぐには回収されないので、ローカルのRemoteChannel
やfetch
していないFuture
オブジェクトに対して、明示的にfinalize
を呼び出してやるとよい。fetch
するとリモートノードに対する参照が削除されるので、fetch
済みのFuture
に関してはfinalize
を行う意味はない。
明示的にfinalize
を行うと即座にリモートノードに対してメッセージが送信され、その値に対する参照が削除される。
finalize
済みの参照は無効となり、以後使うことはできない。
ローカルノードでの実行
リモートノードで計算を実行するにはデータをコピーしなければならない。これは、リモート呼び出しの場合でもデータが他のノードのRemoteChannel
またはFuture
に保持されている場合でも同じだ。当然だが、この要な場合にはオブジェクトをシリアライズしてコピーすることになる。しかし、実行ノードがローカルノードだった場合、つまり呼び出しプロセスIDとリモートのノードIDが同じだった場合には、そのリモート呼び出しはローカル呼び出しになる。実行は通常別のタスクとして実行される(そうでない場合もある)が、シリアライズは行われない。したがって、呼び出しには元のオブジェクトと同じものが用いられ、コピーは作られない。次のコードでこの挙動を見てみよう。
julia> using Distributed;
julia> rc = RemoteChannel(()->Channel(3)); # RemoteChannel をローカルに作成
julia> v = [0];
julia> for i in 1:3
v[1] = i # `v`を再利用
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[3], [3], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1
julia> addprocs(1);
julia> rc = RemoteChannel(()->Channel(3), workers()[1]); # RemoteChannelをリモートに作成
julia> v = [0];
julia> for i in 1:3
v[1] = i
put!(rc, v)
end;
julia> result = [take!(rc) for _ in 1:3];
julia> println(result);
Array{Int64,1}[[1], [2], [3]]
julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3
ローカルに保持されているRemoteChannel
に、書き換えながら同じオブジェクトv
を3回put!
している。その結果、同じオブジェクトのインスタンスが保持されている事がわかる。一方、rc
が別のノードに作られた場合には、v
のコピーが作られている。
これは一般には問題にならないことに注意しよう。これが問題になるのは、オブジェクトをローカルに保持していて、変更してリモート呼び出しする場合だけだ。このような場合には、オブジェクトをdeepcopy
しておけばよい。
同じ問題が、リモート呼び出しの場合にも起きる。
julia> using Distributed; addprocs(1);
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v); # ローカルノードで実行
julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true
julia> v = [0];
julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # リモートノードで実行
julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false
ローカルノードに対するリモート呼び出しは、直接実行するのと同じ挙動になる。
呼び出された側で引数のオブジェクトを変更すると、呼び出しに用いたオブジェクトが更新されていることがわかる。
リモートノードに対して呼び出した場合には、引数のコピーが作られる。
もう一度いうが、これは一般には問題にならない。ローカルノードが計算ノードとしても使用され、引数を呼び出しの後も利用する場合には、この挙動を考慮にいれ、ローカルノードを呼び出す際には引数のディープコピーを用いればよい。リモートノードに対する呼び出し時には、引数は必ずコピーされる。
共有配列(SharedArray)
共有配列は、システムの提供する共有メモリを用いて同じ配列を複数のプロセスで共有する。SharedArray
はDArray
と似ているが、挙動は全く異なる。DArray
では各プロセスはデータの一部分にしかアクセスできないし、同じデータ領域を複数のプロセスが共有することはない。これに対してSharedArray
では、参加しているすべてのプロセスが配列全体にアクセスできる。SharedArray
は、大量のデータに同じ計算機上の複数のプロセスからアクセスしたい場合に有用となる。
共有配列はSharedArrays
モジュールで提供される。このモジュールを利用するすべてのワーカで明示的にロードする必要がある。
SharedArray
のインデックスによる書き込みと読み出しは通常の配列と同じように書け、背後にあるメモリ領域もローカルプロセスから直接アクセスできるためアクセスの効率も良い。
したがって、ほとんどのアルゴリズムは、SharedArray
でも同じように動作する。シングルプロセスモードでも動作する。アルゴリズム関数が入力をArray
に限定している場合には、SharedArray
の背後にある配列をsdata
関数で取り出してやれば良い。SharedArray
以外のAbstractArray
型では、sdata
関数はそのオブジェクトそのものを返すので、任意の配列っぽいオブジェクトに対してsdata
を呼び出しても害はない。
共有配列のコンストラクタは次のようになっている。
SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])
こうすると、pids
で指定されたプロセスから共有された、N次元T型サイズTの共有配列が作成される。分散配列と異なり、共有配列は引数pids
で指定されたワーカからしかアクセスできない(生成したプロセスが同じホストにあれば、このプロセスからもアクセスできる)。
initfn(S::SharedArray)
というシグネチャを持った初期化関数がinit
引数で与えられていれば、
この初期化関数すべてのワーカ上で実行される。個々のワーカが配列の1部にだけ初期化関数を実行するようにすれば初期化を並列化することができる。
簡単な例を示す。
julia> using Distributed
julia> addprocs(3)
3-element Array{Int64,1}:
2
3
4
julia> @everywhere using SharedArrays
julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 3 4 4
julia> S[3,2] = 7
7
julia> S
3×4 SharedArray{Int64,2}:
2 2 3 4
2 3 3 4
2 7 4 4
SharedArrays.localindices
は、重なりのない1次元インデックスの集合を作る。これを用いてタスクを分割してプロセスに割り当てると便利だ。もちろん、どんな方法でタスクを分割しても良い。
julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
2 2 2 2
3 3 3 3
4 4 4 4
すべてのプロセスが背後にあるデータにアクセスできるので、衝突の起きないように注意する必要がある。例を見てみよう。
@sync begin
for p in procs(S)
@async begin
remotecall_wait(fill!, p, S, p)
end
end
end
このようにすると、結果は未定義になる。それぞれのプロセスが配列全体を自分のpid
で埋めようとするので、あるS
の要素に対して最後に実行したプロセスのpid
が残ることになる。
より複雑な例として次の計算カーネルを並列に実行することを考えよう。
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
この場合、1次元目のインデックスで分割するとまずいことになる。q[i,j,t]
があるワーカに割り当てられた領域の最後のあたりにあり、q[i,j,t+1]
が別のワーカに割り当てられた領域の先頭のあたりにあると、q[i,j,t+1]
を計算しようとしたときに、q[i,j,t]
がまだ準備できていないことになる。この場合は、2次元目で分割するとよい。まず、ワーカに割り当てられるインデックス(irange, jrange)
を返す関数を定義する。
julia> @everywhere function myrange(q::SharedArray)
idx = indexpids(q)
if idx == 0 # このワーカには割り当てない
return 1:0, 1:0
end
nchunks = length(procs(q))
splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
1:size(q,1), splits[idx]+1:splits[idx+1]
end
さて、カーネルを定義しよう。(訳注: advection - 伝導)
julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
@show (irange, jrange, trange) # 動作を理解するために表示する
for t in trange, j in jrange, i in irange
q[i,j,t+1] = q[i,j,t] + u[i,j,t]
end
q
end
ラッパ関数を定義しておこう。。
julia> @everywhere advection_shared_chunk!(q, u) =
advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)
3つのバージョンを比較してみよう。まずは、シングルプロセスで動くバージョン。
julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);
次に@distributed
を使うもの。
julia> function advection_parallel!(q, u)
for t = 1:size(q,3)-1
@sync @distributed for j = 1:size(q,2)
for i = 1:size(q,1)
q[i,j,t+1]= q[i,j,t] + u[i,j,t]
end
end
end
q
end;
そして、チャンクごとに実行を委譲するバージョンだ。
julia> function advection_shared!(q, u)
@sync begin
for p in procs(q)
@async remotecall_wait(advection_shared_chunk!, p, q, u)
end
end
q
end;
SharedArray
を作成してこれらの関数の実行時間を計測してみよう。結果は次のようになる(julia -p 4
として実行):
julia> q = SharedArray{Float64,3}((500,500,500));
julia> u = SharedArray{Float64,3}((500,500,500));
JITコンパイルのために一度実行してから、@time
で実行時間を測定してみよう。
julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
830.220 milliseconds (216 allocations: 13820 bytes)
julia> @time advection_parallel!(q, u);
2.495 seconds (3999 k allocations: 289 MB, 2.09% gc time)
julia> @time advection_shared!(q,u);
From worker 2: (irange,jrange,trange) = (1:500,1:125,1:499)
From worker 4: (irange,jrange,trange) = (1:500,251:375,1:499)
From worker 3: (irange,jrange,trange) = (1:500,126:250,1:499)
From worker 5: (irange,jrange,trange) = (1:500,376:500,1:499)
238.119 milliseconds (2264 allocations: 169 KB)
advection_shared!
の最大の利点はワーカ間の通信を最小化することで、割り当てられたデータを処理するための時間を長くすることだ。
共有配列と分散ガベージコレクション
リモート参照と同様に、共有配列もそれが作られたノード上のガベージコレクションに依存して、すべての関わったワーカから参照を取り除いている。たくさんの短命な共有アレイオブジェクトを作る場合には、可能な限り早く明示的にこれらのオブジェクトを終了するとよい。こうするとメモリだけでなく共有セグメントのマッピングに用いられるファイルハンドルを早く開放できる。