3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Julia Parallel Computing の訳 (5) - ClusterManager

Posted at

自分の為に https://docs.julialang.org/en/v1/manual/parallel-computing/ を訳してみる。続き。

クラスタマネージャ

Juliaプロセスを起動し管理しネットワーク接続して論理的なクラスタを構成することは、クラスタマネージャによって行われる。ClusterManagerは以下を行う。

  • ワーカプロセスをクラスタ環境で起動する。
  • ワーカのライフタイムで発生するイベントを管理する
  • データ転送層を提供する(オプショナル)

Juliaクラスタは以下の特徴を持つ。

  • 最初のJuliaプロセスはマスタと呼ばれ、特別扱いされる。IDは1。
  • マスタプロセスだけがワーカプロセスを追加、削除できる。
  • すべてのプロセスは相互に直接通信できる。

ワーカ間の通信(組み込みのTCP/IP層を用いる)は、次のように確立される。

  • addprocs関数が、ClusterManagerオブジェクトを引数としてマスタプロセスで呼び出される。
  • addprocsは適切な起動メソッドを呼び出し、要求された数のワーカプロセスを適切な計算機上で起動する。
  • 個々のワーカプロセスは、空いているポートに対してlistenし、ホストとポートの情報をstdoutに書き出す。
  • クラスタマネージャは、stdoutへの出力を見てマスタプロセスが利用できるようにする。
  • マスタプロセスはこの情報をパーズして、ワーカへのTCP/IP接続を確立する。
  • すべてのワーカに、クラスタ内の他のワーカの情報が通知される。
  • 個々のワーカは自分のIDがよりも若いIDを持つ全てのワーカに対して接続を行う。
  • このようにして、すべてのワーカがすべてのワーカに直接接続したメッシュネットワークが確立する。

デフォルトの通信レイヤは通常のTCPSocketを用いるが、独自の通信層を用いることもできる。

Juliaはデフォルトで2つのクラスタマネージャを提供している。

  • LocalManager: addprocs()addprocs(np::Integer)が呼ばれた際に用いられる。
  • SSHManager: ホスト名のリストを引数としてaddprocs(hostnames::Array)が呼ばれた際に用いられる。

LocalManagerは同じホストにワーカを起動する際に用いられる。マルチコア、マルチプロセッサのハードウェアを利用する。

クラスタマネージャは最低限以下を満たす必要がある。

  • 抽象型のClusterManagerのサブタイプであること。
  • 新しいワーカを起動するlaunchメソッドを実装していること。
  • ワーカのライフタイムイベント(インタラプトシグナルの送出など)の際に呼び出される、manageメソッドを実装していること。

つまり、addprocs(manager::FooManager) を実装するには、FooManagerが下記のメソッドを実装していれば良い。

function launch(manager::FooManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

例として、ローカルホストにワーカを起動するLocalManagerがどのように実装されているか見てみよう。

struct LocalManager <: ClusterManager
    np::Integer
end

function launch(manager::LocalManager, params::Dict, launched::Array, c::Condition)
    [...]
end

function manage(manager::LocalManager, id::Integer, config::WorkerConfig, op::Symbol)
    [...]
end

launchメソッドは以下の引数をとる。

  • manager::ClusterManager: addprocsに渡されたクラスタマネージャ
  • params::Dict: addprocsに渡されたすべてのパラメータ
  • launched::Array: 起動したプロセスのWorkerConfigオブジェクトを追加する配列
  • c::Condition: ワーカの起動が終了したらノーティファイする状態変数

launchメソッドは、独立したタスクで非同期に呼び出される。このタスクの終了によって、要求されたワーカがすべて起動したことを確認する。したがって、launch関数は要求されたワーカが起動したらすぐに終了しなければならない。

新たに起動したワーカは、相互およびマスタプロセスと全対全で接続する。
Juliaコマンドに引数--worker[=<cookie>]を付けて起動すると、ワーカとして初期されTCP/IPソケットの接続が用意される。

すべてのワーカはマスタと同じクッキーを共有する。クッキーが--workerオプションで指定されていない場合ワーカは標準入力からクッキーを読もうとする。LocalManagerSSHManagerも標準入力からクッキーを与える。

デフォルトでは、ワーカはgetipaddr()で返されるアドレスの空いているポートに対してlistenする。
特定のアドレスに対してlistenするようにしたければ、--bind-to bind_addr[:port]を指定する。
これは、複数のアドレスを持つ環境で有用だ。

TCP/IP以外の通信層としては、MPIがある。この場合は--workerは指定してはいけない。その代わり、新たに起動したワーカは、並列実行に用いるAPIを使う前に、init_worker(cookie)を実行しなければならない。

すべてのワーカが起動したら、launchメソッドは、(適切にフィールドを埋めた)WorkerConfigオブジェクトを、配列launchedに追加する。

mutable struct WorkerConfig
    # すべてのクラスタマネジャで用いる共有フィールド
    io::Union{IO, Nothing}
    host::Union{AbstractString, Nothing}
    port::Union{Integer, Nothing}

    # 一つのホストで追加のワーカを起動する場合に用いるフィールド
    count::Union{Int, Symbol, Nothing}
    exename::Union{AbstractString, Cmd, Nothing}
    exeflags::Union{Cmd, Nothing}

    # カスタムクラスタマネージャがワーカレベルの情報を保持するために使う
    # 複数の情報を格納したければdictを用いる。
    userdata::Any

    # SSHManager / SSH tunnel connections to workers
    tunnel::Union{Bool, Nothing}
    bind_addr::Union{AbstractString, Nothing}
    sshflags::Union{Cmd, Nothing}
    max_parallel::Union{Integer, Nothing}

    # Local/SSH manager で用いる
    connect_at::Any

    [...]
end

WorkerConfigの殆どのフィールドは組み込みのマネージャで利用する。カスタムクラスタマネージャは、多くの場合は ioもしくはhostportだけを指定すればいい。

  • ioが指定されていた場合には、ホスト・ポート情報をそこから読み出す。Juliaワーカはアドレスとポートを起動時に書き出す。こうなっているので、ポートを手で指定せず、任意の空きポートで通信することができる。

  • ioが指定されていなければ、hostportを用いて接続する。

  • countexenameexeflagsはそのワーカから追加でワーカを起動するための情報だ。クラスタマネージャはノードごとにワーカを起動し、この情報を用いて追加のワーカを起動する。

  • countが整数nの場合、全部でn個のワーカが起動できる。
  • countがシンボル:autoの場合、その計算機のCPUスレッド数(論理コアの数)だけワーカを起動する。
  • exename はJuliaコマンドのフルパス名である。
  • exeflags ワーカを起動するために必要なコマンドライン引数である。
  • tunnelbind_addrsshflagsmax_parallelはマスタからワーカに接続する際にSSHトンネルが必要な場合に用いられる。
  • userdataはカスタムマネージャが、固有の情報を保持する際に用いる。

manage(manager::FooManager, id::Integer, config::WorkerConfig, op::Symbol)は、ワーカのライフサイクルの間に異なるop値で呼び出される。

  • ワーカが追加、削除された際にはop:register/:deregisterで呼ばれる。
  • interrupt(workers)が呼ばれた際には:interruptで呼ばれる。ClusterManagerは、適切なワーカに対してインタラプトシグナルを送る必要がある。
  • クリーンアップにはop:finalizeとして呼び出される。

カスタムマネージャとカスタム通信層

デフォルトのTCP/IPの全対全ソケット通信をカスタム通信層で置き換えるのは、もう少し面倒だ。個々のJuliaプロセスには、通信先の数と同じだけ通信タスクがある。たとえばJuliaクラスタに32のプロセスがあり、全対全のメッシュネットワークで相互接続しているとしよう。

  • 個々のJuliaプロセスは31の通信タスクを持つ
  • それぞれのタスクは一つのリモートワーカからのメッセージを、メッセージ処理ループで処理する。
  • メッセージ処理ループは、IOオブジェクトに対して待ち(例えばデフォルトではTCPSocket)、メッセージ全対を読み出し、処理して、次のメッセージを待つ。
  • メッセージの送信は通信タスク以外の任意のJuliaタスクからも、適切なIOオブジェクトを通じて行う事ができる。

デフォルトの通信層を置き換えるには、新しい実装でリモートワーカに接続する方法と、メッセージ処理ループがwaitすることのできる適切なIOオブジェクトを返す必要がある。また、カスタムマネージャは、以下の2つのコールバック関数を用意する必要がある。

connect(manager::FooManager, pid::Integer, config::WorkerConfig)
kill(manager::FooManager, pid::Int, config::WorkerConfig)

TCP/IPを用いるデフォルトの実装はconnect(manager::ClusterManager, pid::Integer, config::WorkerConfig)として実装されている。

connectはIOオブジェクトのペアを返す。1つはワーカpidから送られてきたデータを読み出すためのもの、もう一方は、ワーカpidに対してデータを書き出すためのものだ。カスタムクラスタマネージャでは、インメモリのBufferStreamをパイプとして使い、Julia組み込みの並列実行機能を用いて独自の転送層を提供すれば良いだろう。

BufferStreamはインメモリのIOBufferでIOと同じように機能する。つまり非同期に動作する。

Examplesレポジトリのclustermanager/0mqフォルダには、ZeroMQを使う例が収められている。
この例では、0MQのブローカを中心においたスタートポロジでワーカを接続する。ただし、すべてのJuliaプロセスは論理的には相互に接続されているので、0MQが転送層に使われていることを意識せず、すべてのワーカは他のワーカと直接通信できる。

カスタム通信層を用いるには:

  • Juliaワーカを--workerオプションを付けて起動してはならない。--workerを付けて起動すると、TCP/IPソケット実装のワーカが起動する。

  • ワーカへのすべての論理的な入力接続に対してBase.process_messages(rd::IO, wr::IO)()を呼び出さなければならない。こうすると、IOオブジェクトによって表されるワーカからのメッセージを読み出し、そのワーカへのメッセージを書き出す新しいタスクが生成される。

  • ワーカプロセス起動時にinit_worker(cookie, manager::FooManager)を呼びださなければならない。

  • クラスタマネージャはlaunchを呼び出す際にWorkerConfigconnect_at::Anyフィールドを設定することができる。このフィールドは、connectコールバックが呼ばれる際に必ず設定される。このフィールドはワーカに接続するための情報を格納するためによく用いられる。例えば、TCP/IPソケット通信レイヤはこのフィールドに、ワーカに接続するための(host, port)タプルを指定する。

クラスタからワーカを削除する際にはkill(manager, pid, config)が呼び出される。マスタプロセスでは、対応するIOオブジェクトをクローズしてクリーンアップする必要がある。デフォルト実装では、リモートワーカで単純にexit()を実行する。

Examplesレポジトリのclustermanager/simpleフォルダには、Unixドメインソケットを用いた簡単な実装例が収められている。

LocalManagerSSHManagerにおけるネットーワークへの要請

Juliaのクラスタは、安全が保証された環境で実行するように設計されている。ローカルなラップトップや部門の管理するクラスタ、もしくはクラウドなどだ。この節では、組み込みのLocalManagerSSHManagerに関する要請をまとめる。

  • マスタプロセスは全くlistenしない。ワーカに対して接続するだけだ。

  • 個々のワーカはローカルなインターフェイスに対してbindし、OSから割り当てられるエフェメラルポートをlistenする。

  • addprocs(N)とすると使われるLocalManagerは、デフォルトでループバックインターフェイスに対してバインドする。このため、あとになってからリモートホストでワーカを起動しても、クラスタに接続できない(悪意のあるプロセスも接続できない)。したがって、addprocs(4)としてからaddprocs(["remote_host"])を実行すると失敗する。ローカルプロセスとリモートプロセスから構成されるクラスタが必要な場合もある。これを実現するには、LocalManagerに対して明示的に外部ネットワークインターフェイスにバインドするように指定すれば良い。restrictキーワード引数を用いてaddprocs(4; restrict=false)のように書く。

  • addprocs(list_of_remote_hosts)とすると使われるSSHManagerは、SSHでリモートホストにワーカを起動する。デフォルトではSSHはJuliaのワーカを起動するためのみに使われるので、それ以降のマスタ-ワーカ間通信、ワーカ-ワーカ間通信は通常の暗号化されていないソケットを用いる。リモートホストへはパスワードなしでログインできるように設定しておく必要がある。SSHで用いるフラグやクレデンシャルは、キーワード引数sshflagで指定する。

  • マスタ-ワーカ間通信にもSSH接続を使いたい場合には、addprocs(list_of_remote_hosts; tunnel=true, sshflags=<ssh keys and other flags>)とする。典型的な状況としては、ローカルなラップトップでJulia REPLをマスタとして用い、クラスタの残りの部分はAmazon EC2のようなクラウドを用いる場合が考えられる。このような場合には、リモートクラスタでは公開鍵インフラストラクチャで認証されるSSHクライアントで用いる22番ポートだけをオープンすればよい。認証に用いるクレデンシャルはsshflagsを用いて指定する。例えばsshflags="-i <keyfile>"のように書く。

デフォルトの全対全トポロジでは、すべてのワーカが通常のTCPソケットで相互に接続する。したがって、クラスタノード間のセキュリティポリシが、ワーカノードのエフェメラルポートレンジ(OSによって異なる)に対して、自由な接続を許すように設定する必要がある。

すべてのワーカ間通信をSSHで暗号化したり、個々のメッセージを暗号化することも、カスタムクラスタマネージャをかけば実現できる。

クラスタクッキー

クラスタ内のすべてのプロセスは、同じクッキーを共有する。これはデフォルトでは、マスタプロセスで生成されるランダムな文字列である。

  • cluster_cookie()はクッキーを返す。 cluster_cookie(cookie)とすると新たにクッキーをセットし、それを返す。
  • すべての接続は両端で認証される。これによって同じマスタによって起動されたワーカ同士のみが通信できるようにしている。
  • クッキーは、起動時にオプション--worker=<cookie>で渡すことができる。--workerオプションにクッキーが指定されていなかった場合、ワーカは標準入力からクッキーを読み込む。クッキーの読み込みが終わったら標準入力は直ちにクローズされる。
  • ClusterManagerはマスタ上でcluster_cookie()を呼び出してクッキーを取得できる。TCP/IP通信層を用いないクラスタマネージャは、--workerオプションを使わないので、マスタと同じクッキーでinit_worker(cookie, manager)を呼び出さなければならない。

より高度なセキュリティが必要な環境ではカスタムのClusterManagerを用いると良い。例えば、クッキーを事前に共有しておけば、起動時に指定する必要がなくなる。

ネットワークトポロジの指定 (実験的機能)

addprocsに渡すキーワード引数topologyで、ワーカ間の接続関係を指定する事ができる。

  • :all_to_all: デフォルト。すべてのワーカが相互に直接接続する
  • :master_worker: ドライバプロセス、すなわちpid 1だけがワーカプロセスへの接続を持つ
  • :custom: クラスタマネージャのlaunchメソッドは、WorkerConfigidentconnect_identsフィールドで接続トポロジを指定する。クラスタマネージャが指定したIDidentを持つワーカは、connect_identsで指定されるすべてのワーカに接続する。

キーワード引数lazy=true|falseは、topologyオプションが:all_to_allの場合にだけ意味を持つ。これがtrueになっていると、マスタだけがすべてのワーカと接続された状態で起動する。ワーカとワーカの間の接続は、最初に必要が生じた時点で確率される。こうすると最初にクラスタ間通信をセットアップするのに必要な資源をへらすことができる。実行時にプログラムからの要請に応じて接続が行われる。デフォルトではlazytrueとなる。

現状では、接続されていないワーカ間でメッセージを転送しようとすると、エラーになる。この挙動や、機能やAPIも実験的なものであり、将来のリリースでは変更される可能性がある。

その他の外部パッケージ

Juliaには、他にも様々な外部パッケージが用意されている。例えば、MPI.jlはMPI用のラッパで、共有配列の項でも述べたがDistributedArrays.jlもある。また、GPUプログラミングエコシステムについても言及する必要があるだろう。これには、以下のものがある。

  1. OpenCL.jlCUDAdrv.jl:Cで書かれた低レベルカーネルを操作するOpenCLとCUDAのラッパである。

  2. CUDAnative.jl :JuliaによるネイティブなCUDA実装を可能にする低レベルインターフェイス

  3. CuArrays.jlCLArrays.jlなどのベンタ固有高レベル抽象レイヤ

  4. ArrayFire.jlGPUArrays.jlなどの高レベルライブラリ

下の例ではDistributedArrays.jlCuArrays.jlを用いて複数のプロセスに分散する。最初にdistribute()もしくはCuArray()を用いてデータをばらまく。DistributedArrays.jlをインポートする際には、@everywhereを使ってすべてのプロセスでインポートすることを忘れずに。

$ ./julia -p 4

julia> addprocs()

julia> @everywhere using DistributedArrays

julia> using CuArrays

julia> B = ones(10_000) ./ 2;

julia> A = ones(10_000) .* π;

julia> C = 2 .* A ./ B;

julia> all(C . 4*π)
true

julia> typeof(C)
Array{Float64,1}

julia> dB = distribute(B);

julia> dA = distribute(A);

julia> dC = 2 .* dA ./ dB;

julia> all(dC . 4*π)
true

julia> typeof(dC)
DistributedArrays.DArray{Float64,1,Array{Float64,1}}

julia> cuB = CuArray(B);

julia> cuA = CuArray(A);

julia> cuC = 2 .* cuA ./ cuB;

julia> all(cuC . 4*π);
true

julia> typeof(cuC)
CuArray{Float64,1}

現状のCUDAnative.jlではJuliaの機能の一部がサポートされていないことに留意しよう1
例えば、sinのような関数はCUDAnative.sin(cc: @maleadt)で置き換えてやる必要がある。

次に示す例では、DistributedArrays.jlCuArrays.jlを用いて配列を複数のプロセスに分散し、汎関数をそれに対して呼び出す。


function power_method(M, v)
    for i in 1:100
        v = M*v
        v /= norm(v)
    end

    return v, norm(M*v) / norm(v)  # or  (M*v) ./ v
end

power_methodは新しいベクタを繰り返し生成し、それを正規化する。関数宣言では型を指定していないが、前述のデータ型に対して動作するか見てみよう。

julia> M = [2. 1; 1 1];

julia> v = rand(2)
2-element Array{Float64,1}:
0.40395
0.445877

julia> power_method(M,v)
([0.850651, 0.525731], 2.618033988749895)

julia> cuM = CuArray(M);

julia> cuv = CuArray(v);

julia> curesult = power_method(cuM, cuv);

julia> typeof(curesult)
CuArray{Float64,1}

julia> dM = distribute(M);

julia> dv = distribute(v);

julia> dC = power_method(dM, dv);

julia> typeof(dC)
Tuple{DistributedArrays.DArray{Float64,1,Array{Float64,1}},Float64}

この外部パッケージの簡単な紹介の最後に、MPIプロトコルのJuliaラッパであるMPI.jlを見てみよう。内部関数をすべて見ていくには時間がかかりすぎるので、簡単な実装例を見るだけにしよう。

次の簡単なスクリプトは、個々のサブプロセスでランクを取得して、Reduceで加算し、マスタプロセスで出力している。

import MPI

MPI.Init()

comm = MPI.COMM_WORLD
MPI.Barrier(comm)

root = 0
r = MPI.Comm_rank(comm)

sr = MPI.Reduce(r, MPI.SUM, root, comm)

if(MPI.Comm_rank(comm) == root)
   @printf("sum of ranks: %s\n", sr)
end

MPI.Finalize()
mpirun -np 4 ./julia example.jl
  1. Julia GPU man pages)

3
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
3
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?