8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Julia Parallel Computing の訳 (3) - マルチコア・分散処理

Posted at

自分の為に https://docs.julialang.org/en/v1/manual/parallel-computing/ を訳してみる。続き。

マルチコア・分散処理

Juliaでは、標準ライブラリの一つとして提供されているDistributedモジュールで分散メモリ並列計算を実装している。

現代の計算機には複数のCPUがあるのが普通だ。さらに複数の計算機を束ねてクラスタにすることもできる。
複数のCPUを束ねることで計算をより高速に実行する事ができる。
計算速度には2つの要素が関わる。ひとつはCPUそのものの速度、もう一つはメモリへのアクセス速度だ。
クラスタでは、同じ計算機(ノード)のRAMにアクセスするのが一番速いのは明らかだろう。
実は、同じようなことが典型的なマルチコアラップトップでも生じる。これはメインメモリとキャッシュの速度差によるものだ。
したがって、よいマルチプロセス環境はメモリ領域に対する「所有権」を制御できなければならない。
Juliaのマルチプロセス環境はメッセージパッシングに基づいている。
分離したメモリ領域に対する複数のプロセスでプログラムを実行することができる。

Juliaのメッセージパッシング実装は、MPI 1 などとは異なる。Juliaの通信は一般に「片方向」だ。つまり2つのプロセスが関わる操作であっても、プログラマが明示的に管理するのは一方だけでよい。さらに、通信操作は「メッセージ送信」「メッセージ受信」のようなものではなく、ユーザ関数の呼び出しのような高レベルの操作に似せてある。

Juliaの分散プログラミングは2つのプリミティブで構成されている。リモート参照とリモート呼び出しだ。リモート参照は特定のプロセスに格納されたオブジェクトを任意のプロセスから利用できるようにするものだ。リモート呼び出しはあるプロセスから、特定の関数を特定の引数で他のプロセス上で(同じプロセスでもよい)よびだすことだ。

リモート参照はに2種類ある。FutureRemoteChannelだ。

リモート呼び出しはFutureを返り値として返す。リモート呼び出しは即座にリターンする。呼び出したプロセスは、リモート呼び出しがどこかのノードで実行されている間も、処理を続行することができる。Futureに対してwaitを呼び出せば、リモート呼び出しの終了を待つことができる。結果を取り出すにはfetchを行う。

一方、RemoteChannelは書き換え可能だ。たとえば複数のプロセスが一つのRemoteChannelを参照するようにすれば、協調して動作させる事ができる。
それぞれのプロセスにはIDが付与される。Juliaのインタラクティブなプロンプトを提供しているプロセスは常にID 1となる。
デフォルトで並列実行に用いられるプロセスを「ワーカ」と呼ぶ。1つしかプロセスがなければプロセス1がワーカとなる。それ以外の場合にはプロセス1以外のすべてのプロセスがワーカとなる。

試してみよう。julia -p n とするとローカル計算機にn個のプロセスを起動する。
一般に、nはCPUスレッドの数(論理的なコア数)にするのが良いだろう。
引数-pをつけると暗黙のうちにDistributedモジュールがロードされる。

$ ./julia -p 2

julia> r = remotecall(rand, 2, 2, 2)
Future(2, 1, 4, nothing)

julia> s = @spawnat 2 1 .+ fetch(r)
Future(2, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.18526  1.50912
 1.16296  1.60607

remotecallの最初の引数は呼び出す関数だ。多くの場合Juliaの並列プログラムでは特定のプロセスや利用可能なプロセス数を参照したりしないが、remotecallは詳細な制御を行うための低レベルなインターフェースなので例外だ。remotecallの2つ目の引数は、この仕事を実行するべきプロセスのIDだ。残りの引数は、呼び出された関数に引数として渡される。

上のプログラムでは、1行目でプロセス2に2x2のランダム行列の構築をリモートで行い、2行目で行列の要素に1を加えている。2つの計算の結果は2つのFuturersに収められる。@spawnatマクロは第2引数の式を第1引数で指定されるプロセスで評価する。

リモートで計算した値を即座に利用したい場合もあるだろう。例えば次のローカルな計算に、リモートオブジェクトから読み出した値を利用したいときなどだ。これには関数remotecall_fetchを用いる。fetch(remotecall(...))と書くのと等価だが、こちらのほうが効率がいい。

julia> remotecall_fetch(getindex, 2, r, 1, 1)
0.18526337335308085

getindex(r,1,1)r[1,1]と等価なので、futurerの最初の要素を取り出していることになる。

簡易化のためにシンボル:anyが用意されている。これを@spawnatに渡すと、自動的に実行場所を選択してくれる。

julia> r = @spawnat :any rand(2,2)
Future(2, 1, 4, nothing)

julia> s = @spawnat :any 1 .+ fetch(r)
Future(3, 1, 5, nothing)

julia> fetch(s)
2×2 Array{Float64,2}:
 1.38854  1.9098
 1.20939  1.57158

1 .+ rではなく1 .+ fetch(r)と書いていることに注意しよう。どこでコードが実行されるかわからないので、一般にはfetchを用いてrを加算を行うプロセスに動かす必要があるのだ。この場合、@spawnatは賢くてrが存在するプロセスで計算を実行するのでfetchは何も行わない。

(@spawnatはJulia組み込みの機能ではなくマクロとして実装されていることに注意しよう。このような機構をユーザが独自に実装することも可能だ。)

一度fetchするとFuturefetchした値をキャッシュすることを覚えておこう。その後fetchした際にはネットワーク通信は発生しない。リモートに存在する値に対するすべてのFuturefetchされたら、その値は削除される。

@async@spawnatに似ているが、ローカルプロセスで実行をおこなう。この@asyncを用いて、「フィーダ」タスクを個々のプロセスに対して作る。個々のタスクは、計算するべきもののリストの次のインデックスを取り出し、そのプロセスが終了するのを待つ。これをリストのインデックスが尽きるまで繰り返し行う。メインタスクが@syncブロックの終わりにたどり着くまでは、フィーダタスクの実行が開始されない事に注意しよう。この時点で時点でメインタスクは制御を手放し、他のローカルタスクが終了するのを待つ。それから関数を抜けてリターンする。v0.7以降ではフィーダタスクはnextidxを通じて互いに状態を共有する。これが可能なのはこれらがすべて同じプロセスで実行されているからだ。タスクは協調してスケジューリングされいるが、非同期I/Oのような場合にはやはりロックが必要となる。つまりコンテキストスイッチは、既知の点でしか発生しない。この場合remotecall_fetchが呼ばれた時点だけだ。現状のJuliaの実装ではこうなっているが、JuliaはNタスクがMプロセスで動作する M:Nスレッディング を目指している。これが実現されると、複数のプロセスが同時に一つの資源に読み書きするのは安全ではなくなるので、nextidxに対するロックの取得・解放モデルが必要になる。

コードが利用できる範囲とパッケージのロード

実行されるコードは、実行するプロセスから見えていなければならない。例えば、次のコードをJuliaのプロンプトから入力してみよう。

julia> function rand2(dims...)
           return 2*rand(dims...)
       end

julia> rand2(2,2)
2×2 Array{Float64,2}:
 0.153756  0.368514
 1.15119   0.918912

julia> fetch(@spawnat :any rand2(2,2))
ERROR: RemoteException(2, CapturedException(UndefVarError(Symbol("#rand2"))
Stacktrace:
[...]

Process 1は関数rand2を知っているがProcess 2は知らないのでエラーがでたのだ。

一般にコードはファイルもしくはパッケージからロードされる。どのプロセスでどのコードを読み込むかは柔軟に制御する事ができる。ファイルDummyModule.jlに以下のコードが格納されているとしよう。

module DummyModule

export MyType, f

mutable struct MyType
    a::Int
end

f(x) = x^2+1

println("loaded")

end

すべてのプロセスからMyTypeが見えるようにするために、DummyModule.jlをすべてのプロセスでロードする必要がある。include("DummyModule.jl")としても一つプロセスでしかロードされない。すべてのプロセスでロードするためには、@everywhereマクロを用いる(Juliaをjulia -p 2として起動する)。

julia> @everywhere include("DummyModule.jl")
loaded
      From worker 3:    loaded
      From worker 2:    loaded

いつものように、このようにしてもDummyModuleは、どのプロセスのスコープにも入らない。usingもしくはimportを行う必要があるのだ。さらに、DummyModuleをひとつのプロセスに導入しても、ほかのプロセスには入らない。

julia> using .DummyModule

julia> MyType(7)
MyType(7)

julia> fetch(@spawnat 2 MyType(7))
ERROR: On worker 2:
UndefVarError: MyType not defined
⋮

julia> fetch(@spawnat 2 DummyModule.MyType(7))
MyType(7)

しかし、``DummyModuleをロードしただけでスコープには入っていないプロセスに対してMyType`オブジェクトを送ることはできる。

julia> put!(RemoteChannel(2), MyType(7))
RemoteChannel{Channel{Any}}(2, 1, 13)

複数のプロセスに対して-Lフラグを用いて起動時に読み込むファイルを指定することができる。また、ドライバスクリプトを計算の管理に用いることもできる。

julia -p <n> -L file1.jl -L file2.jl driver.jl

このようにすると、IDが1のJuliaプロセスがドライバスクリプトを実行する。インタラクティブプロンプトがID 1のプロセスで実行されるので同じだ。

また、DummyModule.jlが独立したファイルではなくパッケージであった場合には、using DummyModuleとすると、すべてのプロセスでDummyModule.jlがロードされるが、usingが呼ばれるまではスコープには入らない。

ワーカプロセスの起動と管理

Juliaのベースインストール状態では、2種類のクラスタがサポートされる。

  • ローカルクラスタ。上で示したように-pオプションで指定する。
  • 複数のマシンからなるクラスタ。--machine-fileオプションで指定する。パスワードが不要なSSHログインを用いてJuliaのワーカプロセスを指定した計算機で(ローカルホストでのカレントディレクトリで)起動する。

関数addprocsrmprocsworkersを用いるとプログラムからプロセスを追加・削除し、状態を確認する事ができる。

julia> using Distributed

julia> addprocs(2)
2-element Array{Int64,1}:
 2
 3

マスタプロセスではaddprocsを呼び出す前に、明示的にDistributedモジュールをロードする必要がある。ワーカプロセスでは自動的にロードされる。ワーカプロセスは起動スクリプト~/.julia/config/startup.jlを読まないし、グローバルな状態(グローバル変数、新たなメソッド定義、ろーどされたモジュール)を、他の実行中のプロセスと共有することも無いことに注意しよう。addprocs(exeflags="--project")を用いて指定した環境でワーカを初期化することができる。それから@everywhere using <modulename>もしくは@everywhere include("file.jl")として必要なファイルを読み込めばよい。

他の種類のクラスタも、カスタムのClusterManagerを書けばサポートできる。これについてはClusterManagerのセクションで説明する。

データの移動

分散プログラムのオーバヘッドのほとんどはメッセージの送信とデータの移動である。性能とスケーラビリティを達成するためには、メッセージの数をへらし、送信されるデータの量をへらさなければならない。このためには、Juliaの様々な分散プログラム機構によるデータの移動を理解することが重要だ。

fetchはオブジェクトをローカルマシンに移動するので、明示的なデータ移動操作だ。@spawnat(や関連した機構)もデータを移動するが、それほど自明ではないので、暗黙のデータ移動操作と呼ぶことができる。ランダム行列を作って2乗する方法を2つ考えてみよう。

メソッド 1:

julia> A = rand(1000,1000);

julia> Bref = @spawnat :any A^2;

[...]

julia> fetch(Bref);

メソッド 2:

julia> Bref = @spawnat :any rand(1000,1000)^2;

[...]

julia> fetch(Bref);

この2つの方法の違いは些細なものに見えるかもしれないが、実はかなり大きい。@spawnatの挙動のせいだ。メソッド1ではランダム行列はローカルに生成され、別のプロセスに送られてそこで2乗される。メソッド2では、ランダム行列の生成も2乗の計算も、別のプロセスで行われる。したがってメソッド2のほうが送信するデータ量ははるかにすくない。

この簡単な例では、2つの方法を区別することも、そのうちの1方を選ぶのも簡単だ。しかし、実際のプログラムでは、データの移動を設計するにはよく考え、実際に測定して見る必要がある。例えば、最初のプロセスでも行列Aを使うならメソッド1のほうが良いだろう。Aの計算が高価で現在のプロセスにしか無いのなら、Aを他のプロセスに移動することは避けられない。@spawnatfetch(Bref)の間に現在のプロセスでできることがあまりないなら、そもそも並列に実行しないほうがいいのかもしれない。rand(1000,1000)がもっと複雑な計算だったら、別の@spawnatを使ってこのステップを並列化したほうがいいかもしれない。

グローバル変数

@spawnatでリモートで実行される式や、remotecallでリモートで実行するように指定されたクロージャがグローバル変数を参照している場合がある。Mainモジュールのグローバル束縛は、他のモジュールのグローバル束縛とは扱いが少し異なる。次のコード例を考えてみよう。

A = rand(10,10)
remotecall_fetch(()->sum(A), 2)

この場合、sumはリモートプロセスで定義されていなければならない。Aがローカルなワークスペースで定義されたグローバル変数だとしよう。ワーカ2のMainにはAという変数はない。クロージャ()->sum(A)をワーカ2に送り出すと、ワーカ2にMain.Aが定義される。Main.Aは、remotecall_fetchがリターンした後もワーカ2に残り続ける。(Mainモジュールの)グローバルな参照がリモート呼び出しに埋め込まれていると、次のようになる。

  • リモート呼び出しでグローバル変数を参照すると、対象ワーカでもグローバルな束縛が作られる。

  • グローバルな定数はリモートノードでも定数となる。

  • グローバル変数はリモート呼び出しのコンテクストでのみ対象ワーカに再送される。再送されるのは値が変更された場合だけだ。クラスタはグローバル変数の値をノードにまたがって同期したりはしない。例を見てみよう。

A = rand(10,10)
remotecall_fetch(()->sum(A), 2) # worker 2
A = rand(10,10)
remotecall_fetch(()->sum(A), 3) # worker 3
A = nothing

上のコードを実行すると、ワーカ2の Main.Aとワーカ3のMain.Aが別の値になり、さらに、ワーカ1のMain.Anothingになる。

もう気がついたかもしれないが、マスタプロセスのグローバル変数に紐付けられたメモリ領域は、グローバル変数に別の値を与えると解放されるが、他のワーカでは有効なままなので解放されない。clear!を使って手動で、不要になったリモートノードの特定のグローバル変数にnothingを代入する事もできる。こうすると、グローバル変数が参照していた値のメモリは通常のガベージコレクションのなかで解放される。

このため、グローバル変数をリモートコールで参照する際には注意が必要だ。可能ならば避けるべきだ。どうしてもグローバル変数を参照する必要があるなら、letブロックを使ってグローバル変数をローカル化することを検討してみよう。

例を示す。

julia> A = rand(10,10);

julia> remotecall_fetch(()->A, 2);

julia> B = rand(10,10);

julia> let B = B
           remotecall_fetch(()->B, 2)
       end;

julia> @fetchfrom 2 InteractiveUtils.varinfo()
name           size summary
––––––––– ––––––––– ––––––––––––––––––––––
A         800 bytes 10×10 Array{Float64,2}
Base                Module
Core                Module
Main                Module

これからわかるように、グローバル変数Aはワーカ2でも定義されているが、Bはローカル変数にキャプチャされているので、ワーカ2には存在しない

並列Mapとループ

幸運なことに、データの移動を要さない有用な並列計算がたくさんある。一般的な例の一つがモンテカルロ・シミュレーションで、複数のプロセスが独立したシミュレーション試行を同時に行うものだ。@spawnatを用いてコイントスを2つのプロセスで同時に行ってみよう。まず次のような関数をcount_heads.jlに定義する。

function count_heads(n)
    c::Int = 0
    for i = 1:n
        c += rand(Bool)
    end
    c
end

関数count_headsは単にn個のランダムビットを加算するものだ。これを2つの計算機で行って結果を集計するには次のようにすればよい。

julia> @everywhere include_string(Main, $(read("count_heads.jl", String)), "count_heads.jl")

julia> a = @spawnat :any count_heads(100000000)
Future(2, 1, 6, nothing)

julia> b = @spawnat :any count_heads(100000000)
Future(3, 1, 7, nothing)

julia> fetch(a)+fetch(b)
100001564

この例は、強力で多用される並列プログラミングパターンの例となっている。複数のプロセスで多数の繰り返しが独立に行われ、その結果が何らかの関数で集計される。集計プロセスはリダクションと呼ばれる。これは一般にテンソルのランクリダクションに相当するからだ。ベクトルはスカラ値にリダクションされ、行列は列ベクタもしくは行ベクタにリダクションされる。
コードでは、x = f(x,v[i])のように書く。xはアキュムレータ、はリダクション関数で、v[i]はリダクションされる要素だ。
fは結合的であることが望ましい。リダクションの順番が結果に影響しないからだ。

ここでcount_headsを用いたパターンは一般化できることに注意しよう。ここでは2つの@spawnat文を明示的に書いたので、並列度は2プロセスに限定された。分散メモリ環境で任意の数のプロセスで実行できるようにするには、並列forループを用いる。Juliaでは次のように@distributedを用いて書く。

nheads = @distributed (+) for i = 1:200000000
    Int(rand(Bool))
end

この書き方は、複数のプロセスに繰り返しを割り当てて、結果を指定したリダクション関数(ここでは+)で集計する。並列ループ式全体を評価した結果が最終的な答えとなる。

並列forループは逐次のforループに似ているが、その挙動は大きく異る。それぞれの繰り返しが別のプロセスで行われるので、繰り返しは指定した順番では発生しないし、変数や配列への書き込みは、グローバルに観測可能にならない。並列ループで使われる変数はすべてのプロセスにコピーしてブロードキャストされる。

たとえば下のコードは意図したとおりに動かない。

a = zeros(100000)
@distributed for i = 1:100000
    a[i] = i
end

このコードはaすべてを初期化しない。個々のプロセスがそれぞれaのコピーを持つからだ。
このような並列forループは避けるべきだ。
幸運にも共有配列(SharedArray)を用いれば、この制約を回避できる。


using SharedArrays

a = SharedArray{Float64}(10)
@distributed for i = 1:10
    a[i] = i
end

「外部」変数を並列ループで使うことは、その変数が読み出されるだけであればまったく問題ない。

a = randn(1000)
@distributed (+) for i = 1:100000
    f(a[rand(1:end)])
end

このコードではすべてのプロセスに共有された配列aのなかからランダムに選択した要素に対してfを適用している。

こレまでの例でわかるように、リダクション関数が不要な場合には省略できる。その場合にはループは非同期に実行される。つまり、利用できるすべてのワーカを使って独立したタスクを起動し、計算の終了を待たずにFutureの配列を返す。呼び出したプロセスは、後からFutureに対してfetchして計算の終了を待つか、@syncを頭につけてループの終了を待つ。つまり@sync @distributed forのように書く。

リダクション操作が不要である範囲のすべての整数(より一般にはある集合のすべての要素)に対して同じ関数を適用したい場合もある。これは並列マップ(parallel map)と呼ばれる有用な操作で、Juliaではpmap関数として実装されている。例えば複数の大きなランダム行列の固有値を次のようにして並列に計算する事ができる。


julia> M = Matrix{Float64}[rand(1000,1000) for i = 1:10];

julia> pmap(svdvals, M);

Juliaのpmapはそれぞれの関数がかなりの量の作業を行う場合を想定して設計されている。これに対して@distributed forは個々の計算量が小さい場合、例えば2つの数を足すだけの要な場合でも扱うことができる。pmap@distributed forでは並列計算には用いられるのはワーカプロセスだけだ。@distributed forでは最後のリダクションは呼び出したプロセスで行われる。

  1. ここで言うMPIはMPI-1標準を指す。MPI-2以降では、RMA(Remote Memory Access)と総称される新たな通信機構が導入されている。RMAをMPIに追加したのは片方向通信パターンを実現するためだ。最新のMPI標準についてはhttps://mpi-forum.org/docs を参照。

8
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
8

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?