自分の為に https://docs.julialang.org/en/v1/manual/parallel-computing/ を訳してみる。続き。
クラスタマネージャ
Juliaプロセスを起動し管理しネットワーク接続して論理的なクラスタを構成することは、クラスタマネージャによって行われる。ClusterManager
は以下を行う。
- ワーカプロセスをクラスタ環境で起動する。
- ワーカのライフタイムで発生するイベントを管理する
- データ転送層を提供する(オプショナル)
Juliaクラスタは以下の特徴を持つ。
- 最初のJuliaプロセスはマスタと呼ばれ、特別扱いされる。IDは1。
- マスタプロセスだけがワーカプロセスを追加、削除できる。
- すべてのプロセスは相互に直接通信できる。
ワーカ間の通信(組み込みのTCP/IP層を用いる)は、次のように確立される。
-
addprocs
関数が、ClusterManager
オブジェクトを引数としてマスタプロセスで呼び出される。 -
addprocs
は適切な起動メソッドを呼び出し、要求された数のワーカプロセスを適切な計算機上で起動する。 - 個々のワーカプロセスは、空いているポートに対してlistenし、ホストとポートの情報をstdoutに書き出す。
- クラスタマネージャは、stdoutへの出力を見てマスタプロセスが利用できるようにする。
- マスタプロセスはこの情報をパーズして、ワーカへのTCP/IP接続を確立する。
- すべてのワーカに、クラスタ内の他のワーカの情報が通知される。
- 個々のワーカは自分のIDがよりも若いIDを持つ全てのワーカに対して接続を行う。
- このようにして、すべてのワーカがすべてのワーカに直接接続したメッシュネットワークが確立する。
デフォルトの通信レイヤは通常のTCPSocket
を用いるが、独自の通信層を用いることもできる。
Juliaはデフォルトで2つのクラスタマネージャを提供している。
-
LocalManager
:addprocs()
、addprocs(np::Integer)
が呼ばれた際に用いられる。 -
SSHManager
: ホスト名のリストを引数としてaddprocs(hostnames::Array)
が呼ばれた際に用いられる。
LocalManager
は同じホストにワーカを起動する際に用いられる。マルチコア、マルチプロセッサのハードウェアを利用する。
クラスタマネージャは最低限以下を満たす必要がある。
- 抽象型の
ClusterManager
のサブタイプであること。 - 新しいワーカを起動する
launch
メソッドを実装していること。 - ワーカのライフタイムイベント(インタラプトシグナルの送出など)の際に呼び出される、
manage
メソッドを実装していること。
つまり、addprocs(manager::FooManager)
を実装するには、FooManager
が下記のメソッドを実装していれば良い。
function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
例として、ローカルホストにワーカを起動するLocalManager
がどのように実装されているか見てみよう。
struct LocalManager <: ClusterManager
np::Integer
end
function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
[...]
end
function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
[...]
end
launch
メソッドは以下の引数をとる。
-
manager::ClusterManager
:addprocs
に渡されたクラスタマネージャ -
params::Dict
:addprocs
に渡されたすべてのパラメータ -
launched::Array
: 起動したプロセスのWorkerConfig
オブジェクトを追加する配列 -
c::Condition
: ワーカの起動が終了したらノーティファイする状態変数
launch
メソッドは、独立したタスクで非同期に呼び出される。このタスクの終了によって、要求されたワーカがすべて起動したことを確認する。したがって、launch
関数は要求されたワーカが起動したらすぐに終了しなければならない。
新たに起動したワーカは、相互およびマスタプロセスと全対全で接続する。
Juliaコマンドに引数--worker[=<cookie>]
を付けて起動すると、ワーカとして初期されTCP/IPソケットの接続が用意される。
すべてのワーカはマスタと同じクッキーを共有する。クッキーが--worker
オプションで指定されていない場合ワーカは標準入力からクッキーを読もうとする。LocalManager
もSSHManager
も標準入力からクッキーを与える。
デフォルトでは、ワーカはgetipaddr()
で返されるアドレスの空いているポートに対してlistenする。
特定のアドレスに対してlistenするようにしたければ、--bind-to bind_addr[:port]
を指定する。
これは、複数のアドレスを持つ環境で有用だ。
TCP/IP以外の通信層としては、MPIがある。この場合は--worker
は指定してはいけない。その代わり、新たに起動したワーカは、並列実行に用いるAPIを使う前に、init_worker(cookie)
を実行しなければならない。
すべてのワーカが起動したら、launch
メソッドは、(適切にフィールドを埋めた)WorkerConfig
オブジェクトを、配列launched
に追加する。
mutable struct WorkerConfig
# すべてのクラスタマネジャで用いる共有フィールド
io::Union{IO, Nothing}
host::Union{AbstractString, Nothing}
port::Union{Integer, Nothing}
# 一つのホストで追加のワーカを起動する場合に用いるフィールド
count::Union{Int, Symbol, Nothing}
exename::Union{AbstractString, Cmd, Nothing}
exeflags::Union{Cmd, Nothing}
# カスタムクラスタマネージャがワーカレベルの情報を保持するために使う
# 複数の情報を格納したければdictを用いる。
userdata::Any
# SSHManager / SSH tunnel connections to workers
tunnel::Union{Bool, Nothing}
bind_addr::Union{AbstractString, Nothing}
sshflags::Union{Cmd, Nothing}
max_parallel::Union{Integer, Nothing}
# Local/SSH manager で用いる
connect_at::Any
[...]
end
WorkerConfig
の殆どのフィールドは組み込みのマネージャで利用する。カスタムクラスタマネージャは、多くの場合は io
もしくはhost
とport
だけを指定すればいい。
-
io
が指定されていた場合には、ホスト・ポート情報をそこから読み出す。Juliaワーカはアドレスとポートを起動時に書き出す。こうなっているので、ポートを手で指定せず、任意の空きポートで通信することができる。 -
io
が指定されていなければ、host
とport
を用いて接続する。 -
count
、exename
、exeflags
はそのワーカから追加でワーカを起動するための情報だ。クラスタマネージャはノードごとにワーカを起動し、この情報を用いて追加のワーカを起動する。
-
count
が整数n
の場合、全部でn
個のワーカが起動できる。 -
count
がシンボル:auto
の場合、その計算機のCPUスレッド数(論理コアの数)だけワーカを起動する。 -
exename
はJuliaコマンドのフルパス名である。 -
exeflags
ワーカを起動するために必要なコマンドライン引数である。 -
tunnel
、bind_addr
、sshflags
、max_parallel
はマスタからワーカに接続する際にSSHトンネルが必要な場合に用いられる。
-
userdata
はカスタムマネージャが、固有の情報を保持する際に用いる。
manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
は、ワーカのライフサイクルの間に異なるop
値で呼び出される。
- ワーカが追加、削除された際には
op
が:register
/:deregister
で呼ばれる。 -
interrupt(workers)
が呼ばれた際には:interrupt
で呼ばれる。ClusterManager
は、適切なワーカに対してインタラプトシグナルを送る必要がある。 - クリーンアップには
op
を:finalize
として呼び出される。
カスタムマネージャとカスタム通信層
デフォルトのTCP/IPの全対全ソケット通信をカスタム通信層で置き換えるのは、もう少し面倒だ。個々のJuliaプロセスには、通信先の数と同じだけ通信タスクがある。たとえばJuliaクラスタに32のプロセスがあり、全対全のメッシュネットワークで相互接続しているとしよう。
- 個々のJuliaプロセスは31の通信タスクを持つ
- それぞれのタスクは一つのリモートワーカからのメッセージを、メッセージ処理ループで処理する。
- メッセージ処理ループは、IOオブジェクトに対して待ち(例えばデフォルトでは
TCPSocket
)、メッセージ全対を読み出し、処理して、次のメッセージを待つ。 - メッセージの送信は通信タスク以外の任意のJuliaタスクからも、適切なIOオブジェクトを通じて行う事ができる。
デフォルトの通信層を置き換えるには、新しい実装でリモートワーカに接続する方法と、メッセージ処理ループがwaitすることのできる適切なIOオブジェクトを返す必要がある。また、カスタムマネージャは、以下の2つのコールバック関数を用意する必要がある。
connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)
TCP/IPを用いるデフォルトの実装はconnect(manager::ClusterManager, pid::Integer, config::WorkerConfig)
として実装されている。
connect
はIOオブジェクトのペアを返す。1つはワーカpid
から送られてきたデータを読み出すためのもの、もう一方は、ワーカpid
に対してデータを書き出すためのものだ。カスタムクラスタマネージャでは、インメモリのBufferStream
をパイプとして使い、Julia組み込みの並列実行機能を用いて独自の転送層を提供すれば良いだろう。
BufferStream
はインメモリのIOBuffer
でIOと同じように機能する。つまり非同期に動作する。
Examples
レポジトリのclustermanager/0mq
フォルダには、ZeroMQを使う例が収められている。
この例では、0MQのブローカを中心においたスタートポロジでワーカを接続する。ただし、すべてのJuliaプロセスは論理的には相互に接続されているので、0MQが転送層に使われていることを意識せず、すべてのワーカは他のワーカと直接通信できる。
カスタム通信層を用いるには:
-
Juliaワーカを
--worker
オプションを付けて起動してはならない。--worker
を付けて起動すると、TCP/IPソケット実装のワーカが起動する。 -
ワーカへのすべての論理的な入力接続に対して
Base.process_messages(rd::IO, wr::IO)()
を呼び出さなければならない。こうすると、IOオブジェクトによって表されるワーカからのメッセージを読み出し、そのワーカへのメッセージを書き出す新しいタスクが生成される。 -
ワーカプロセス起動時に
init_worker(cookie, manager::FooManager)
を呼びださなければならない。 -
クラスタマネージャは
launch
を呼び出す際にWorkerConfig
のconnect_at::Any
フィールドを設定することができる。このフィールドは、connect
コールバックが呼ばれる際に必ず設定される。このフィールドはワーカに接続するための情報を格納するためによく用いられる。例えば、TCP/IPソケット通信レイヤはこのフィールドに、ワーカに接続するための(host, port)
タプルを指定する。
クラスタからワーカを削除する際にはkill(manager, pid, config)
が呼び出される。マスタプロセスでは、対応するIOオブジェクトをクローズしてクリーンアップする必要がある。デフォルト実装では、リモートワーカで単純にexit()
を実行する。
Examples
レポジトリのclustermanager/simple
フォルダには、Unixドメインソケットを用いた簡単な実装例が収められている。
LocalManager
とSSHManager
におけるネットーワークへの要請
Juliaのクラスタは、安全が保証された環境で実行するように設計されている。ローカルなラップトップや部門の管理するクラスタ、もしくはクラウドなどだ。この節では、組み込みのLocalManager
とSSHManager
に関する要請をまとめる。
-
マスタプロセスは全くlistenしない。ワーカに対して接続するだけだ。
-
個々のワーカはローカルなインターフェイスに対してbindし、OSから割り当てられるエフェメラルポートをlistenする。
-
addprocs(N)
とすると使われるLocalManager
は、デフォルトでループバックインターフェイスに対してバインドする。このため、あとになってからリモートホストでワーカを起動しても、クラスタに接続できない(悪意のあるプロセスも接続できない)。したがって、addprocs(4)
としてからaddprocs(["remote_host"])
を実行すると失敗する。ローカルプロセスとリモートプロセスから構成されるクラスタが必要な場合もある。これを実現するには、LocalManager
に対して明示的に外部ネットワークインターフェイスにバインドするように指定すれば良い。restrict
キーワード引数を用いてaddprocs(4; restrict=false)
のように書く。 -
addprocs(list_of_remote_hosts)
とすると使われるSSHManager
は、SSHでリモートホストにワーカを起動する。デフォルトではSSHはJuliaのワーカを起動するためのみに使われるので、それ以降のマスタ-ワーカ間通信、ワーカ-ワーカ間通信は通常の暗号化されていないソケットを用いる。リモートホストへはパスワードなしでログインできるように設定しておく必要がある。SSHで用いるフラグやクレデンシャルは、キーワード引数sshflag
で指定する。 -
マスタ-ワーカ間通信にもSSH接続を使いたい場合には、
addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)
とする。典型的な状況としては、ローカルなラップトップでJulia REPLをマスタとして用い、クラスタの残りの部分はAmazon EC2のようなクラウドを用いる場合が考えられる。このような場合には、リモートクラスタでは公開鍵インフラストラクチャで認証されるSSHクライアントで用いる22番ポートだけをオープンすればよい。認証に用いるクレデンシャルはsshflags
を用いて指定する。例えばsshflags="-i <keyfile>"
のように書く。
デフォルトの全対全トポロジでは、すべてのワーカが通常のTCPソケットで相互に接続する。したがって、クラスタノード間のセキュリティポリシが、ワーカノードのエフェメラルポートレンジ(OSによって異なる)に対して、自由な接続を許すように設定する必要がある。
すべてのワーカ間通信をSSHで暗号化したり、個々のメッセージを暗号化することも、カスタムクラスタマネージャをかけば実現できる。
クラスタクッキー
クラスタ内のすべてのプロセスは、同じクッキーを共有する。これはデフォルトでは、マスタプロセスで生成されるランダムな文字列である。
-
cluster_cookie()
はクッキーを返す。cluster_cookie(cookie)
とすると新たにクッキーをセットし、それを返す。 - すべての接続は両端で認証される。これによって同じマスタによって起動されたワーカ同士のみが通信できるようにしている。
- クッキーは、起動時にオプション
--worker=<cookie>
で渡すことができる。--worker
オプションにクッキーが指定されていなかった場合、ワーカは標準入力からクッキーを読み込む。クッキーの読み込みが終わったら標準入力は直ちにクローズされる。 -
ClusterManager
はマスタ上でcluster_cookie()
を呼び出してクッキーを取得できる。TCP/IP通信層を用いないクラスタマネージャは、--worker
オプションを使わないので、マスタと同じクッキーでinit_worker(cookie, manager)
を呼び出さなければならない。
より高度なセキュリティが必要な環境ではカスタムのClusterManager
を用いると良い。例えば、クッキーを事前に共有しておけば、起動時に指定する必要がなくなる。
ネットワークトポロジの指定 (実験的機能)
addprocs
に渡すキーワード引数topology
で、ワーカ間の接続関係を指定する事ができる。
-
:all_to_all
: デフォルト。すべてのワーカが相互に直接接続する -
:master_worker
: ドライバプロセス、すなわちpid 1だけがワーカプロセスへの接続を持つ -
:custom
: クラスタマネージャのlaunch
メソッドは、WorkerConfig
のident
とconnect_idents
フィールドで接続トポロジを指定する。クラスタマネージャが指定したIDident
を持つワーカは、connect_idents
で指定されるすべてのワーカに接続する。
キーワード引数lazy=true|false
は、topology
オプションが:all_to_all
の場合にだけ意味を持つ。これがtrue
になっていると、マスタだけがすべてのワーカと接続された状態で起動する。ワーカとワーカの間の接続は、最初に必要が生じた時点で確率される。こうすると最初にクラスタ間通信をセットアップするのに必要な資源をへらすことができる。実行時にプログラムからの要請に応じて接続が行われる。デフォルトではlazy
はtrue
となる。
現状では、接続されていないワーカ間でメッセージを転送しようとすると、エラーになる。この挙動や、機能やAPIも実験的なものであり、将来のリリースでは変更される可能性がある。
その他の外部パッケージ
Juliaには、他にも様々な外部パッケージが用意されている。例えば、MPI.jl
はMPI用のラッパで、共有配列の項でも述べたがDistributedArrays.jl
もある。また、GPUプログラミングエコシステムについても言及する必要があるだろう。これには、以下のものがある。
-
OpenCL.jl
とCUDAdrv.jl
:Cで書かれた低レベルカーネルを操作するOpenCLとCUDAのラッパである。 -
CUDAnative.jl
:JuliaによるネイティブなCUDA実装を可能にする低レベルインターフェイス -
CuArrays.jl
、CLArrays.jl
などのベンタ固有高レベル抽象レイヤ -
ArrayFire.jl
、GPUArrays.jl
などの高レベルライブラリ
下の例ではDistributedArrays.jl
とCuArrays.jl
を用いて複数のプロセスに分散する。最初にdistribute()
もしくはCuArray()
を用いてデータをばらまく。DistributedArrays.jl
をインポートする際には、@everywhere
を使ってすべてのプロセスでインポートすることを忘れずに。
$ ./julia -p 4
julia> addprocs()
julia> @everywhere using DistributedArrays
julia> using CuArrays
julia> B = ones(10_000) ./ 2;
julia> A = ones(10_000) .* π;
julia> C = 2 .* A ./ B;
julia> all(C .≈ 4*π)
true
julia> typeof(C)
Array{Float64,1}
julia> dB = distribute(B);
julia> dA = distribute(A);
julia> dC = 2 .* dA ./ dB;
julia> all(dC .≈ 4*π)
true
julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}
julia> cuB = CuArray(B);
julia> cuA = CuArray(A);
julia> cuC = 2 .* cuA ./ cuB;
julia> all(cuC .≈ 4*π);
true
julia> typeof(cuC)
CuArray{Float64,1}
現状のCUDAnative.jl
ではJuliaの機能の一部がサポートされていないことに留意しよう1。
例えば、sin
のような関数はCUDAnative.sin(cc: @maleadt)
で置き換えてやる必要がある。
次に示す例では、DistributedArrays.jl
とCuArrays.jl
を用いて配列を複数のプロセスに分散し、汎関数をそれに対して呼び出す。
function power_method(M, v)
for i in 1:100
v = M*v
v /= norm(v)
end
return v, norm(M*v) / norm(v) # or (M*v) ./ v
end
power_method
は新しいベクタを繰り返し生成し、それを正規化する。関数宣言では型を指定していないが、前述のデータ型に対して動作するか見てみよう。
julia> M = [2. 1; 1 1];
julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877
julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)
julia> cuM = CuArray(M);
julia> cuv = CuArray(v);
julia> curesult = power_method(cuM, cuv);
julia> typeof(curesult)
CuArray{Float64,1}
julia> dM = distribute(M);
julia> dv = distribute(v);
julia> dC = power_method(dM, dv);
julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}
この外部パッケージの簡単な紹介の最後に、MPIプロトコルのJuliaラッパであるMPI.jl
を見てみよう。内部関数をすべて見ていくには時間がかかりすぎるので、簡単な実装例を見るだけにしよう。
次の簡単なスクリプトは、個々のサブプロセスでランクを取得して、Reduce
で加算し、マスタプロセスで出力している。
import MPI
MPI.Init()
comm = MPI.COMM_WORLD
MPI.Barrier(comm)
root = 0
r = MPI.Comm_rank(comm)
sr = MPI.Reduce(r, MPI.SUM, root, comm)
if(MPI.Comm_rank(comm) == root)
@printf("sum of ranks: %s\n", sr)
end
MPI.Finalize()
mpirun -np 4 ./julia example.jl