LoginSignup
2
0

More than 3 years have passed since last update.

Julia Parallel Computing の訳 (4) リモート参照とチャンネル

Posted at

自分の為に https://docs.julialang.org/en/v1/manual/parallel-computing/ を訳してみる。続き。

リモート参照とAbstractChannel

リモート参照は常にAbstractChannelの何らかの実装を参照している。

AbstractChannelの具象実装(Channelなど)は、put!take!fetchisreadywaitを実装しなければならない。Futureで参照されるリモートオブジェクトはChannel{Any}(1)、すなわちサイズ1で任意の型のオブジェクトを保持できるチャンネルに保持される。

書き換え可能なRemoteChannelは、任意の型とサイズのチャンネルもしくは任意のAbstractChannelの実装を指すことができる。

コンストラクタRemoteChannel(f::Function, pid)()を実行すると、特定の型の一つ以上の値を保持できるチャンネルへの参照を作成する。fpid上で実行される関数で、AbstractChannelを返すものでなければならない。

例えば、RemoteChannel(()->Channel{Int}(10), pid)はInt型でサイズ10のチャンネルへの参照を返す。
そのチャンネルはワーカpid上に存在する。

RemoteChannelはプロキシになっており、これに対するput!take!fetchisreadywaitの呼び出しは、リモートプロセス上の実際のチャンネルにフォワードされる。

したがって、RemoteChannelを、ユーザが実装したAbstractChannelオブジェクトに対する参照として使うこともできる。簡単な例がExamplesレポジトリのdictchannel.jlにある。これはリモートストアとしてディクショナリを用いるものが。

ChannelとRemoteChannel

  • Channelはプロセスローカルなものだ。ワーカ2はワーカ3上のChannelを直接参照することはできないし、逆も同じだ。一方、RemoteChannelはワーカをまたがって値のやりとりができる。
  • RemoteChannelChannelに対するハンドルだと考えてよい。
  • RemoteChannelに関連付けられたプロセスIDpidは、そのRemoteChannelの本体であるChannelがあるプロセスを指す。
  • RemoteChannelへの参照を持っていれば、任意のプロセスからそのチャンネルへ書き込み、読み出すことができる。データは自動的にそのRemoteChannelが関連付けられたプロセスへと送信(もしくはそのプロセスから受信)される。
  • Channelをシリアライズするとチャンネル内に存在するデータもシリアライズされ、それをデシリアライズすると元のオブジェクトのコピーができる。
  • これに対してRemoteChannelをシリアライズしても、それが参照しているチャンネルを特定するための場所と識別子しかシリアライズされない。これを任意のプロセスでデシリアライズすると元のRemoteChannelと同じチャンネルを参照したRemoteChannelが出来上がる。

これまでに示したチャンネルのサンプルをプロセス間通信するように書き直す事ができる。

4つのワーカが一つのリモートチャンネルjobsからのジョブを処理するようにする。ジョブはジョブID(job_id)で識別され、チャンネルに書き込まれる。リモートで実行されるタスクは、job_idを読み出しランダムな時間だけ待ち、job_idと実行時間とプロセスのpidresultsチャンネルに書き込む。最後に、マスタプロセスですべての結果を表示する。

julia> addprocs(4); # add worker processes

julia> const jobs = RemoteChannel(()->Channel{Int}(32));

julia> const results = RemoteChannel(()->Channel{Tuple}(32));

julia> @everywhere function do_work(jobs, results) # define work function everywhere
           while true
               job_id = take!(jobs)
               exec_time = rand()
               sleep(exec_time) # simulates elapsed time doing actual work
               put!(results, (job_id, exec_time, myid()))
           end
       end

julia> function make_jobs(n)
           for i in 1:n
               put!(jobs, i)
           end
       end;

julia> n = 12;

julia> @async make_jobs(n); # feed the jobs channel with "n" jobs

julia> for p in workers() # start tasks on the workers to process requests in parallel
           remote_do(do_work, p, jobs, results)
       end

julia> @elapsed while n > 0 # print out results
           job_id, exec_time, where = take!(results)
           println("$job_id finished in $(round(exec_time; digits=2)) seconds on worker $where")
           global n = n - 1
       end
1 finished in 0.18 seconds on worker 4
2 finished in 0.26 seconds on worker 5
6 finished in 0.12 seconds on worker 4
7 finished in 0.18 seconds on worker 4
5 finished in 0.35 seconds on worker 5
4 finished in 0.68 seconds on worker 2
3 finished in 0.73 seconds on worker 3
11 finished in 0.01 seconds on worker 3
12 finished in 0.02 seconds on worker 3
9 finished in 0.26 seconds on worker 5
8 finished in 0.57 seconds on worker 4
10 finished in 0.58 seconds on worker 2
0.055971741

リモート参照と分散ガベージコレクション

リモート参照で参照されているオブジェクトは、すべての参照がクラスタから削除されるまで、解放することができない。

値を保持しているノードは、どのワーカがその値に対する参照を持っているかを管理している。RemoteChannelや(まだfetchされていない) Futureが他のワーカにシリアライズされて転送された場合には、その参照が指している先のノードにノーティフィケーションが送信される。また、RemoteChannelや(まだfetchされていない) FutureがローカルにGCされた場合にも、値を保持しているノードにノーティフィケーションが送られる。

この挙動はクラスタ環境対応のシリアライザで実現されている。リモート参照はクラスタ実行している場合にだけ意味を持つ。通常のI/Oオブジェクトに/から、参照をシリアライズ/デシリアライズすることはできない。

ノーティフィケーションは、2つの管理メッセージの送信で行われる。リモート参照がシリアライズされて他のプロセスに送られた場合には「参照追加」メッセージが、リモート参照がローカルにGCされた場合には「参照削除」メッセージが送信される。

Futureは一度だけし書き込めないしローカルにキャッシュされるので、Futureに対してfetchが行われると、値の保有しているノード上の参照数管理情報も更新される。

ある値を保有しているノードは、その値を指しているすべての参照がなくなるとその値を開放する。

すでに値をfetchしたFutureをシリアライズして別のノードに転送した場合には、キャッシュされている値そのものも転送される。この時点でもとの参照先の値がすでに回収されているかもしれないからだ。

オブジェクトがローカルなGCで回収されるタイミングは、オブジェクトのサイズとその時点でのメモリの逼迫状況に依存することに注意しよう。

リモート参照の場合、ローカルに保持される参照オブジェクトのサイズは非常に小さいが、リモートに保持される値は非常に大きい場合がある。ローカルオブジェクトはすぐには回収されないので、ローカルのRemoteChannelfetchしていないFutureオブジェクトに対して、明示的にfinalizeを呼び出してやるとよい。fetchするとリモートノードに対する参照が削除されるので、fetch済みのFutureに関してはfinalizeを行う意味はない。
明示的にfinalizeを行うと即座にリモートノードに対してメッセージが送信され、その値に対する参照が削除される。

finalize済みの参照は無効となり、以後使うことはできない。

ローカルノードでの実行

リモートノードで計算を実行するにはデータをコピーしなければならない。これは、リモート呼び出しの場合でもデータが他のノードのRemoteChannelまたはFutureに保持されている場合でも同じだ。当然だが、この要な場合にはオブジェクトをシリアライズしてコピーすることになる。しかし、実行ノードがローカルノードだった場合、つまり呼び出しプロセスIDとリモートのノードIDが同じだった場合には、そのリモート呼び出しはローカル呼び出しになる。実行は通常別のタスクとして実行される(そうでない場合もある)が、シリアライズは行われない。したがって、呼び出しには元のオブジェクトと同じものが用いられ、コピーは作られない。次のコードでこの挙動を見てみよう。


julia> using Distributed;

julia> rc = RemoteChannel(()->Channel(3));   # RemoteChannel をローカルに作成

julia> v = [0];

julia> for i in 1:3
           v[1] = i                          # `v`を再利用
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[3], [3], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 1

julia> addprocs(1);

julia> rc = RemoteChannel(()->Channel(3), workers()[1]);   # RemoteChannelをリモートに作成

julia> v = [0];

julia> for i in 1:3
           v[1] = i
           put!(rc, v)
       end;

julia> result = [take!(rc) for _ in 1:3];

julia> println(result);
Array{Int64,1}[[1], [2], [3]]

julia> println("Num Unique objects : ", length(unique(map(objectid, result))));
Num Unique objects : 3

ローカルに保持されているRemoteChannelに、書き換えながら同じオブジェクトvを3回put!している。その結果、同じオブジェクトのインスタンスが保持されている事がわかる。一方、rcが別のノードに作られた場合には、vのコピーが作られている。

これは一般には問題にならないことに注意しよう。これが問題になるのは、オブジェクトをローカルに保持していて、変更してリモート呼び出しする場合だけだ。このような場合には、オブジェクトをdeepcopyしておけばよい。

同じ問題が、リモート呼び出しの場合にも起きる。

julia> using Distributed; addprocs(1);

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), myid(), v);     # ローカルノードで実行

julia> println("v=$v, v2=$v2, ", v === v2);
v=[1], v2=[1], true

julia> v = [0];

julia> v2 = remotecall_fetch(x->(x[1] = 1; x), workers()[1], v); # リモートノードで実行

julia> println("v=$v, v2=$v2, ", v === v2);
v=[0], v2=[1], false

ローカルノードに対するリモート呼び出しは、直接実行するのと同じ挙動になる。
呼び出された側で引数のオブジェクトを変更すると、呼び出しに用いたオブジェクトが更新されていることがわかる。
リモートノードに対して呼び出した場合には、引数のコピーが作られる。

もう一度いうが、これは一般には問題にならない。ローカルノードが計算ノードとしても使用され、引数を呼び出しの後も利用する場合には、この挙動を考慮にいれ、ローカルノードを呼び出す際には引数のディープコピーを用いればよい。リモートノードに対する呼び出し時には、引数は必ずコピーされる。

共有配列(SharedArray)

共有配列は、システムの提供する共有メモリを用いて同じ配列を複数のプロセスで共有する。SharedArrayDArrayと似ているが、挙動は全く異なる。DArrayでは各プロセスはデータの一部分にしかアクセスできないし、同じデータ領域を複数のプロセスが共有することはない。これに対してSharedArrayでは、参加しているすべてのプロセスが配列全体にアクセスできる。SharedArrayは、大量のデータに同じ計算機上の複数のプロセスからアクセスしたい場合に有用となる。

共有配列はSharedArraysモジュールで提供される。このモジュールを利用するすべてのワーカで明示的にロードする必要がある。

SharedArrayのインデックスによる書き込みと読み出しは通常の配列と同じように書け、背後にあるメモリ領域もローカルプロセスから直接アクセスできるためアクセスの効率も良い。
したがって、ほとんどのアルゴリズムは、SharedArrayでも同じように動作する。シングルプロセスモードでも動作する。アルゴリズム関数が入力をArrayに限定している場合には、SharedArrayの背後にある配列をsdata関数で取り出してやれば良い。SharedArray以外のAbstractArray型では、sdata関数はそのオブジェクトそのものを返すので、任意の配列っぽいオブジェクトに対してsdataを呼び出しても害はない。

共有配列のコンストラクタは次のようになっている。

SharedArray{T,N}(dims::NTuple; init=false, pids=Int[])

こうすると、pidsで指定されたプロセスから共有された、N次元T型サイズTの共有配列が作成される。分散配列と異なり、共有配列は引数pidsで指定されたワーカからしかアクセスできない(生成したプロセスが同じホストにあれば、このプロセスからもアクセスできる)。

initfn(S::SharedArray)というシグネチャを持った初期化関数がinit引数で与えられていれば、
この初期化関数すべてのワーカ上で実行される。個々のワーカが配列の1部にだけ初期化関数を実行するようにすれば初期化を並列化することができる。

簡単な例を示す。

julia> using Distributed

julia> addprocs(3)
3-element Array{Int64,1}:
 2
 3
 4

julia> @everywhere using SharedArrays

julia> S = SharedArray{Int,2}((3,4), init = S -> S[localindices(S)] = repeat([myid()], length(localindices(S))))
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  3  4  4

julia> S[3,2] = 7
7

julia> S
3×4 SharedArray{Int64,2}:
 2  2  3  4
 2  3  3  4
 2  7  4  4

SharedArrays.localindicesは、重なりのない1次元インデックスの集合を作る。これを用いてタスクを分割してプロセスに割り当てると便利だ。もちろん、どんな方法でタスクを分割しても良い。

julia> S = SharedArray{Int,2}((3,4), init = S -> S[indexpids(S):length(procs(S)):length(S)] = repeat([myid()], length( indexpids(S):length(procs(S)):length(S))))
3×4 SharedArray{Int64,2}:
 2  2  2  2
 3  3  3  3
 4  4  4  4

すべてのプロセスが背後にあるデータにアクセスできるので、衝突の起きないように注意する必要がある。例を見てみよう。

@sync begin
    for p in procs(S)
        @async begin
            remotecall_wait(fill!, p, S, p)
        end
    end
end

このようにすると、結果は未定義になる。それぞれのプロセスが配列全体を自分のpidで埋めようとするので、あるSの要素に対して最後に実行したプロセスのpidが残ることになる。

より複雑な例として次の計算カーネルを並列に実行することを考えよう。

q[i,j,t+1] = q[i,j,t] + u[i,j,t]

この場合、1次元目のインデックスで分割するとまずいことになる。q[i,j,t]があるワーカに割り当てられた領域の最後のあたりにあり、q[i,j,t+1]が別のワーカに割り当てられた領域の先頭のあたりにあると、q[i,j,t+1]を計算しようとしたときに、q[i,j,t]がまだ準備できていないことになる。この場合は、2次元目で分割するとよい。まず、ワーカに割り当てられるインデックス(irange, jrange)を返す関数を定義する。

julia> @everywhere function myrange(q::SharedArray)
           idx = indexpids(q)
           if idx == 0       # このワーカには割り当てない
               return 1:0, 1:0
           end
           nchunks = length(procs(q))
           splits = [round(Int, s) for s in range(0, stop=size(q,2), length=nchunks+1)]
           1:size(q,1), splits[idx]+1:splits[idx+1]
       end

さて、カーネルを定義しよう。(訳注: advection - 伝導)

julia> @everywhere function advection_chunk!(q, u, irange, jrange, trange)
           @show (irange, jrange, trange)  # 動作を理解するために表示する
           for t in trange, j in jrange, i in irange
               q[i,j,t+1] = q[i,j,t] + u[i,j,t]
           end
           q
       end

ラッパ関数を定義しておこう。。

julia> @everywhere advection_shared_chunk!(q, u) =
           advection_chunk!(q, u, myrange(q)..., 1:size(q,3)-1)

3つのバージョンを比較してみよう。まずは、シングルプロセスで動くバージョン。

julia> advection_serial!(q, u) = advection_chunk!(q, u, 1:size(q,1), 1:size(q,2), 1:size(q,3)-1);

次に@distributedを使うもの。

julia> function advection_parallel!(q, u)
           for t = 1:size(q,3)-1
               @sync @distributed for j = 1:size(q,2)
                   for i = 1:size(q,1)
                       q[i,j,t+1]= q[i,j,t] + u[i,j,t]
                   end
               end
           end
           q
       end;

そして、チャンクごとに実行を委譲するバージョンだ。

julia> function advection_shared!(q, u)
           @sync begin
               for p in procs(q)
                   @async remotecall_wait(advection_shared_chunk!, p, q, u)
               end
           end
           q
       end;

SharedArrayを作成してこれらの関数の実行時間を計測してみよう。結果は次のようになる(julia -p 4として実行):

julia> q = SharedArray{Float64,3}((500,500,500));

julia> u = SharedArray{Float64,3}((500,500,500));

JITコンパイルのために一度実行してから、@timeで実行時間を測定してみよう。

julia> @time advection_serial!(q, u);
(irange,jrange,trange) = (1:500,1:500,1:499)
 830.220 milliseconds (216 allocations: 13820 bytes)

julia> @time advection_parallel!(q, u);
   2.495 seconds      (3999 k allocations: 289 MB, 2.09% gc time)

julia> @time advection_shared!(q,u);
        From worker 2:       (irange,jrange,trange) = (1:500,1:125,1:499)
        From worker 4:       (irange,jrange,trange) = (1:500,251:375,1:499)
        From worker 3:       (irange,jrange,trange) = (1:500,126:250,1:499)
        From worker 5:       (irange,jrange,trange) = (1:500,376:500,1:499)
 238.119 milliseconds (2264 allocations: 169 KB)

advection_shared!の最大の利点はワーカ間の通信を最小化することで、割り当てられたデータを処理するための時間を長くすることだ。

共有配列と分散ガベージコレクション

リモート参照と同様に、共有配列もそれが作られたノード上のガベージコレクションに依存して、すべての関わったワーカから参照を取り除いている。たくさんの短命な共有アレイオブジェクトを作る場合には、可能な限り早く明示的にこれらのオブジェクトを終了するとよい。こうするとメモリだけでなく共有セグメントのマッピングに用いられるファイルハンドルを早く開放できる。

2
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2
0