自分の為に https://docs.julialang.org/en/v1/manual/parallel-computing/ を訳してみる。続き。
マルチコア・分散処理
Juliaでは、標準ライブラリの一つとして提供されているDistributed
モジュールで分散メモリ並列計算を実装している。
現代の計算機には複数のCPUがあるのが普通だ。さらに複数の計算機を束ねてクラスタにすることもできる。
複数のCPUを束ねることで計算をより高速に実行する事ができる。
計算速度には2つの要素が関わる。ひとつはCPUそのものの速度、もう一つはメモリへのアクセス速度だ。
クラスタでは、同じ計算機(ノード)のRAMにアクセスするのが一番速いのは明らかだろう。
実は、同じようなことが典型的なマルチコアラップトップでも生じる。これはメインメモリとキャッシュの速度差によるものだ。
したがって、よいマルチプロセス環境はメモリ領域に対する「所有権」を制御できなければならない。
Juliaのマルチプロセス環境はメッセージパッシングに基づいている。
分離したメモリ領域に対する複数のプロセスでプログラムを実行することができる。
Juliaのメッセージパッシング実装は、MPI 1 などとは異なる。Juliaの通信は一般に「片方向」だ。つまり2つのプロセスが関わる操作であっても、プログラマが明示的に管理するのは一方だけでよい。さらに、通信操作は「メッセージ送信」「メッセージ受信」のようなものではなく、ユーザ関数の呼び出しのような高レベルの操作に似せてある。
Juliaの分散プログラミングは2つのプリミティブで構成されている。リモート参照とリモート呼び出しだ。リモート参照は特定のプロセスに格納されたオブジェクトを任意のプロセスから利用できるようにするものだ。リモート呼び出しはあるプロセスから、特定の関数を特定の引数で他のプロセス上で(同じプロセスでもよい)よびだすことだ。
リモート参照はに2種類ある。Future
とRemoteChannel
だ。
リモート呼び出しはFuture
を返り値として返す。リモート呼び出しは即座にリターンする。呼び出したプロセスは、リモート呼び出しがどこかのノードで実行されている間も、処理を続行することができる。Future
に対してwait
を呼び出せば、リモート呼び出しの終了を待つことができる。結果を取り出すにはfetch
を行う。
一方、RemoteChannel
は書き換え可能だ。たとえば複数のプロセスが一つのRemoteChannel
を参照するようにすれば、協調して動作させる事ができる。
それぞれのプロセスにはIDが付与される。Juliaのインタラクティブなプロンプトを提供しているプロセスは常にID 1となる。
デフォルトで並列実行に用いられるプロセスを「ワーカ」と呼ぶ。1つしかプロセスがなければプロセス1がワーカとなる。それ以外の場合にはプロセス1以外のすべてのプロセスがワーカとなる。
試してみよう。julia -p n
とするとローカル計算機にn
個のプロセスを起動する。
一般に、n
はCPUスレッドの数(論理的なコア数)にするのが良いだろう。
引数-p
をつけると暗黙のうちにDistributed
モジュールがロードされる。
$ ./julia -p 2
julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)
julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.18526 1.50912
1.16296 1.60607
remotecall
の最初の引数は呼び出す関数だ。多くの場合Juliaの並列プログラムでは特定のプロセスや利用可能なプロセス数を参照したりしないが、remotecall
は詳細な制御を行うための低レベルなインターフェースなので例外だ。remotecall
の2つ目の引数は、この仕事を実行するべきプロセスのIDだ。残りの引数は、呼び出された関数に引数として渡される。
上のプログラムでは、1行目でプロセス2に2x2のランダム行列の構築をリモートで行い、2行目で行列の要素に1を加えている。2つの計算の結果は2つのFuturer
とs
に収められる。@spawnat
マクロは第2引数の式を第1引数で指定されるプロセスで評価する。
リモートで計算した値を即座に利用したい場合もあるだろう。例えば次のローカルな計算に、リモートオブジェクトから読み出した値を利用したいときなどだ。これには関数remotecall_fetch
を用いる。fetch(remotecall(...))
と書くのと等価だが、こちらのほうが効率がいい。
julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.18526337335308085
getindex(r,1,1)
はr[1,1]
と等価なので、futurer
の最初の要素を取り出していることになる。
簡易化のためにシンボル:any
が用意されている。これを@spawnat
に渡すと、自動的に実行場所を選択してくれる。
julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)
julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)
julia> fetch(s)
2×2 Array{Float64,2}:
1.38854 1.9098
1.20939 1.57158
1 .+ r
ではなく1 .+ fetch(r)
と書いていることに注意しよう。どこでコードが実行されるかわからないので、一般にはfetch
を用いてr
を加算を行うプロセスに動かす必要があるのだ。この場合、@spawnat
は賢くてr
が存在するプロセスで計算を実行するのでfetch
は何も行わない。
(@spawnat
はJulia組み込みの機能ではなくマクロとして実装されていることに注意しよう。このような機構をユーザが独自に実装することも可能だ。)
一度fetch
するとFuture
はfetch
した値をキャッシュすることを覚えておこう。その後fetch
した際にはネットワーク通信は発生しない。リモートに存在する値に対するすべてのFuture
がfetch
されたら、その値は削除される。
@async
は@spawnat
に似ているが、ローカルプロセスで実行をおこなう。この@async
を用いて、「フィーダ」タスクを個々のプロセスに対して作る。個々のタスクは、計算するべきもののリストの次のインデックスを取り出し、そのプロセスが終了するのを待つ。これをリストのインデックスが尽きるまで繰り返し行う。メインタスクが@sync
ブロックの終わりにたどり着くまでは、フィーダタスクの実行が開始されない事に注意しよう。この時点で時点でメインタスクは制御を手放し、他のローカルタスクが終了するのを待つ。それから関数を抜けてリターンする。v0.7以降ではフィーダタスクはnextidx
を通じて互いに状態を共有する。これが可能なのはこれらがすべて同じプロセスで実行されているからだ。タスクは協調してスケジューリングされいるが、非同期I/Oのような場合にはやはりロックが必要となる。つまりコンテキストスイッチは、既知の点でしか発生しない。この場合remotecall_fetch
が呼ばれた時点だけだ。現状のJuliaの実装ではこうなっているが、JuliaはNタスクがMプロセスで動作する M:Nスレッディング を目指している。これが実現されると、複数のプロセスが同時に一つの資源に読み書きするのは安全ではなくなるので、nextidx
に対するロックの取得・解放モデルが必要になる。
コードが利用できる範囲とパッケージのロード
実行されるコードは、実行するプロセスから見えていなければならない。例えば、次のコードをJuliaのプロンプトから入力してみよう。
julia> function rand2(dims...)
return 2*rand(dims...)
end
julia> rand2(2,2)
2×2 Array{Float64,2}:
0.153756 0.368514
1.15119 0.918912
julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]
Process 1は関数rand2
を知っているがProcess 2は知らないのでエラーがでたのだ。
一般にコードはファイルもしくはパッケージからロードされる。どのプロセスでどのコードを読み込むかは柔軟に制御する事ができる。ファイルDummyModule.jl
に以下のコードが格納されているとしよう。
module DummyModule
export MyType, f
mutable struct MyType
a::Int
end
f(x) = x^2+1
println("loaded")
end
すべてのプロセスからMyType
が見えるようにするために、DummyModule.jl
をすべてのプロセスでロードする必要がある。include("DummyModule.jl")
としても一つプロセスでしかロードされない。すべてのプロセスでロードするためには、@everywhere
マクロを用いる(Juliaをjulia -p 2
として起動する)。
julia> @everywhere include("DummyModule.jl")
loaded
From worker 3: loaded
From worker 2: loaded
いつものように、このようにしてもDummyModule
は、どのプロセスのスコープにも入らない。using
もしくはimport
を行う必要があるのだ。さらに、DummyModule
をひとつのプロセスに導入しても、ほかのプロセスには入らない。
julia> using .DummyModule
julia> MyType(7)
MyType(7)
julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: MyType not defined
⋮
julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)
しかし、``DummyModuleをロードしただけでスコープには入っていないプロセスに対して
MyType`オブジェクトを送ることはできる。
julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)
複数のプロセスに対して-L
フラグを用いて起動時に読み込むファイルを指定することができる。また、ドライバスクリプトを計算の管理に用いることもできる。
julia -p <n> -L file1.jl -L file2.jl driver.jl
このようにすると、IDが1のJuliaプロセスがドライバスクリプトを実行する。インタラクティブプロンプトがID 1のプロセスで実行されるので同じだ。
また、DummyModule.jl
が独立したファイルではなくパッケージであった場合には、using DummyModule
とすると、すべてのプロセスでDummyModule.jl
がロードされるが、using
が呼ばれるまではスコープには入らない。
ワーカプロセスの起動と管理
Juliaのベースインストール状態では、2種類のクラスタがサポートされる。
- ローカルクラスタ。上で示したように
-p
オプションで指定する。 - 複数のマシンからなるクラスタ。
--machine-file
オプションで指定する。パスワードが不要なSSHログインを用いてJuliaのワーカプロセスを指定した計算機で(ローカルホストでのカレントディレクトリで)起動する。
関数addprocs
、rmprocs
、workers
を用いるとプログラムからプロセスを追加・削除し、状態を確認する事ができる。
julia> using Distributed
julia> addprocs(2)
2-element Array{Int64,1}:
2
3
マスタプロセスではaddprocs
を呼び出す前に、明示的にDistributedモジュールをロードする必要がある。ワーカプロセスでは自動的にロードされる。ワーカプロセスは起動スクリプト~/.julia/config/startup.jl
を読まないし、グローバルな状態(グローバル変数、新たなメソッド定義、ろーどされたモジュール)を、他の実行中のプロセスと共有することも無いことに注意しよう。addprocs(exeflags="--project")
を用いて指定した環境でワーカを初期化することができる。それから@everywhere using <modulename>
もしくは@everywhere include("file.jl")
として必要なファイルを読み込めばよい。
他の種類のクラスタも、カスタムのClusterManager
を書けばサポートできる。これについてはClusterManager
のセクションで説明する。
データの移動
分散プログラムのオーバヘッドのほとんどはメッセージの送信とデータの移動である。性能とスケーラビリティを達成するためには、メッセージの数をへらし、送信されるデータの量をへらさなければならない。このためには、Juliaの様々な分散プログラム機構によるデータの移動を理解することが重要だ。
fetch
はオブジェクトをローカルマシンに移動するので、明示的なデータ移動操作だ。@spawnat
(や関連した機構)もデータを移動するが、それほど自明ではないので、暗黙のデータ移動操作と呼ぶことができる。ランダム行列を作って2乗する方法を2つ考えてみよう。
メソッド 1:
julia> A = rand(1000,1000);
julia> Bref = @spawnat :any A^2;
[...]
julia> fetch(Bref);
メソッド 2:
julia> Bref = @spawnat :any rand(1000,1000)^2;
[...]
julia> fetch(Bref);
この2つの方法の違いは些細なものに見えるかもしれないが、実はかなり大きい。@spawnat
の挙動のせいだ。メソッド1ではランダム行列はローカルに生成され、別のプロセスに送られてそこで2乗される。メソッド2では、ランダム行列の生成も2乗の計算も、別のプロセスで行われる。したがってメソッド2のほうが送信するデータ量ははるかにすくない。
この簡単な例では、2つの方法を区別することも、そのうちの1方を選ぶのも簡単だ。しかし、実際のプログラムでは、データの移動を設計するにはよく考え、実際に測定して見る必要がある。例えば、最初のプロセスでも行列A
を使うならメソッド1のほうが良いだろう。A
の計算が高価で現在のプロセスにしか無いのなら、A
を他のプロセスに移動することは避けられない。@spawnat
とfetch(Bref)
の間に現在のプロセスでできることがあまりないなら、そもそも並列に実行しないほうがいいのかもしれない。rand(1000,1000)
がもっと複雑な計算だったら、別の@spawnat
を使ってこのステップを並列化したほうがいいかもしれない。
グローバル変数
@spawnat
でリモートで実行される式や、remotecall
でリモートで実行するように指定されたクロージャがグローバル変数を参照している場合がある。Main
モジュールのグローバル束縛は、他のモジュールのグローバル束縛とは扱いが少し異なる。次のコード例を考えてみよう。
A = rand(10,10)
remotecall_fetch(()->sum(A), 2)
この場合、sum
はリモートプロセスで定義されていなければならない。A
がローカルなワークスペースで定義されたグローバル変数だとしよう。ワーカ2のMain
にはA
という変数はない。クロージャ()->sum(A)
をワーカ2に送り出すと、ワーカ2にMain.A
が定義される。Main.A
は、remotecall_fetch
がリターンした後もワーカ2に残り続ける。(Main
モジュールの)グローバルな参照がリモート呼び出しに埋め込まれていると、次のようになる。
-
リモート呼び出しでグローバル変数を参照すると、対象ワーカでもグローバルな束縛が作られる。
-
グローバルな定数はリモートノードでも定数となる。
-
グローバル変数はリモート呼び出しのコンテクストでのみ対象ワーカに再送される。再送されるのは値が変更された場合だけだ。クラスタはグローバル変数の値をノードにまたがって同期したりはしない。例を見てみよう。
A = rand(10,10)
remotecall_fetch(()->sum(A), 2) # worker 2
A = rand(10,10)
remotecall_fetch(()->sum(A), 3) # worker 3
A = nothing
上のコードを実行すると、ワーカ2の Main.A
とワーカ3のMain.A
が別の値になり、さらに、ワーカ1のMain.A
はnothing
になる。
もう気がついたかもしれないが、マスタプロセスのグローバル変数に紐付けられたメモリ領域は、グローバル変数に別の値を与えると解放されるが、他のワーカでは有効なままなので解放されない。clear!
を使って手動で、不要になったリモートノードの特定のグローバル変数にnothing
を代入する事もできる。こうすると、グローバル変数が参照していた値のメモリは通常のガベージコレクションのなかで解放される。
このため、グローバル変数をリモートコールで参照する際には注意が必要だ。可能ならば避けるべきだ。どうしてもグローバル変数を参照する必要があるなら、let
ブロックを使ってグローバル変数をローカル化することを検討してみよう。
例を示す。
julia> A = rand(10,10);
julia> remotecall_fetch(()->A, 2);
julia> B = rand(10,10);
julia> let B = B
remotecall_fetch(()->B, 2)
end;
julia> @fetchfrom 2 InteractiveUtils.varinfo()
name size summary
––––––––– ––––––––– ––––––––––––––––––––––
A 800 bytes 10×10 Array{Float64,2}
Base Module
Core Module
Main Module
これからわかるように、グローバル変数A
はワーカ2でも定義されているが、B
はローカル変数にキャプチャされているので、ワーカ2には存在しない
並列Mapとループ
幸運なことに、データの移動を要さない有用な並列計算がたくさんある。一般的な例の一つがモンテカルロ・シミュレーションで、複数のプロセスが独立したシミュレーション試行を同時に行うものだ。@spawnat
を用いてコイントスを2つのプロセスで同時に行ってみよう。まず次のような関数をcount_heads.jl
に定義する。
function count_heads(n)
c::Int = 0
for i = 1:n
c += rand(Bool)
end
c
end
関数count_heads
は単にn個のランダムビットを加算するものだ。これを2つの計算機で行って結果を集計するには次のようにすればよい。
julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")
julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)
julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)
julia> fetch(a)+fetch(b)
100001564
この例は、強力で多用される並列プログラミングパターンの例となっている。複数のプロセスで多数の繰り返しが独立に行われ、その結果が何らかの関数で集計される。集計プロセスはリダクションと呼ばれる。これは一般にテンソルのランクリダクションに相当するからだ。ベクトルはスカラ値にリダクションされ、行列は列ベクタもしくは行ベクタにリダクションされる。
コードでは、x = f(x,v[i])
のように書く。x
はアキュムレータ、f
はリダクション関数で、v[i]
はリダクションされる要素だ。
f
は結合的であることが望ましい。リダクションの順番が結果に影響しないからだ。
ここでcount_heads
を用いたパターンは一般化できることに注意しよう。ここでは2つの@spawnat
文を明示的に書いたので、並列度は2プロセスに限定された。分散メモリ環境で任意の数のプロセスで実行できるようにするには、並列forループを用いる。Juliaでは次のように@distributed
を用いて書く。
nheads = @distributed (+) for i = 1:200000000
Int(rand(Bool))
end
この書き方は、複数のプロセスに繰り返しを割り当てて、結果を指定したリダクション関数(ここでは+
)で集計する。並列ループ式全体を評価した結果が最終的な答えとなる。
並列forループは逐次のforループに似ているが、その挙動は大きく異る。それぞれの繰り返しが別のプロセスで行われるので、繰り返しは指定した順番では発生しないし、変数や配列への書き込みは、グローバルに観測可能にならない。並列ループで使われる変数はすべてのプロセスにコピーしてブロードキャストされる。
たとえば下のコードは意図したとおりに動かない。
a = zeros(100000)
@distributed for i = 1:100000
a[i] = i
end
このコードはa
すべてを初期化しない。個々のプロセスがそれぞれa
のコピーを持つからだ。
このような並列forループは避けるべきだ。
幸運にも共有配列(SharedArray)を用いれば、この制約を回避できる。
using SharedArrays
a = SharedArray{Float64}(10)
@distributed for i = 1:10
a[i] = i
end
「外部」変数を並列ループで使うことは、その変数が読み出されるだけであればまったく問題ない。
a = randn(1000)
@distributed (+) for i = 1:100000
f(a[rand(1:end)])
end
このコードではすべてのプロセスに共有された配列a
のなかからランダムに選択した要素に対してf
を適用している。
こレまでの例でわかるように、リダクション関数が不要な場合には省略できる。その場合にはループは非同期に実行される。つまり、利用できるすべてのワーカを使って独立したタスクを起動し、計算の終了を待たずにFuture
の配列を返す。呼び出したプロセスは、後からFuture
に対してfetch
して計算の終了を待つか、@sync
を頭につけてループの終了を待つ。つまり@sync @distributed for
のように書く。
リダクション操作が不要である範囲のすべての整数(より一般にはある集合のすべての要素)に対して同じ関数を適用したい場合もある。これは並列マップ(parallel map)と呼ばれる有用な操作で、Juliaではpmap
関数として実装されている。例えば複数の大きなランダム行列の固有値を次のようにして並列に計算する事ができる。
julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];
julia> pmap(svdvals, M);
Juliaのpmap
はそれぞれの関数がかなりの量の作業を行う場合を想定して設計されている。これに対して@distributed for
は個々の計算量が小さい場合、例えば2つの数を足すだけの要な場合でも扱うことができる。pmap
や@distributed for
では並列計算には用いられるのはワーカプロセスだけだ。@distributed for
では最後のリダクションは呼び出したプロセスで行われる。
-
ここで言うMPIはMPI-1標準を指す。MPI-2以降では、RMA(Remote Memory Access)と総称される新たな通信機構が導入されている。RMAをMPIに追加したのは片方向通信パターンを実現するためだ。最新のMPI標準についてはhttps://mpi-forum.org/docs を参照。 ↩