5
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Juliaでプロセス並列〜MPIとの違い

Last updated at Posted at 2020-12-26

Juliaでのプロセス並列について調べました。MPI.jlを使えばMPIが使えるので簡単にできるのですが、やはりJuliaの基本機能を使って並列化にチャレンジしてみたいところです。

これまでの記事としては
Juliaでforループをプロセス並列化してみる
があります。

Juliaでの分散並列はMPIとは少し異なっています。詳しくは公式にありますが、慣れていないために私はよくわかっていませんでした。やっとわかってきたので、シェアします。

結論から言うと、艦隊指示書が重要です。
そこに注意することで、配列を分散で持って値を更新することに成功しました。

[追記]分散で持っている配列を引数にして関数を用いて更新することにも成功しました。

MPI

MPIについては詳しくは述べません。
拙い図ですが、こんな感じでしょう。
MPI.png
それぞれのCPUコアを船として考えたときに、船同士が通信します。
ポイントはmyrankがそれぞれのコアの番号だとわかっている時に、

  Bob = myrank*2

のような形で書くと、それぞれの船でのBobの値がそれぞれの船に保存されます。それぞれのBobの値が欲しい場合には、gatherやallgetherでコアの番号を指定して取り出してきます。

Julia

Juliaの分散並列はこんな感じです。
Julia.png
違いは、「指示を出す別の船(旗艦)がいる」ということです。MPIでもmyrankが0の部分でデータの受け渡しや出力をやっていたりしていましたから、これはそんなに変わらない、と最初は思っていました。しかし、違っていました。図にあるFuturesというのがポイントです。Juliaの場合、

Bob = @spawnat myrank myrank*2

のような形でそれぞれの船にBobの値をセットすること

for myrank=1:3
Bob = @spawnat myrank myrank*2
end

は「できません」。なぜならば、ここで出てきているBobという名前は「旗艦から見た名前」だからです。このコードだと、ループの最後のmyrank=3の時だけBobが入り、myrank=1とmyrank=2には入っていません。
もし、それぞれの船にBobを定義したければ、

fut = Array{Future}(undef,3)
for myrank=1:3
  fut[myrank] = @spawnat myrank myrank*2
end

としなければなりません。
ここで、Futureという型が非常に重要となります。これは旗艦が「どの船に何があるか」を把握するための情報(艦隊指示書)です。ですので、このFutureをうまく扱わないと分散並列をするのが難しいです。

分散並列による行列の分散確保

以上のことをおさえることで、分散並列による行列の値の分散保持と値の更新に成功しました。以下がコードの例です。

using Distributed
addprocs(3) #workerを3個にした。
@everywhere module Parallelmodule
    using Distributed
    struct Parallelarray{T}
        numworkers::Int64
        workers::Array{Int64,1}
        futures::Array{Future,1}
        numelements::Int64

        function Parallelarray{T}(n) where T<: Number
            numworkers = nworkers() #ワーカーの数を取得
            @assert n % numworkers == 0 #配列サイズはワーカー数で割り切れる事
            nbun = n ÷ numworkers #一つのワーカーが持つ配列サイズ
            workids = workers() #使っているワーカープロセスのIDを取得
            futures = Array{Future,1}(undef,numworkers) #艦隊指示書の作成
            for (id,myID) in enumerate(workids)
                futures[id] = @spawnat myID zeros(T,nbun) #ワーカーmyIDにzero(T,nbun)を作成するぞ、と旗艦指示書に書き込み
            end

            return new{T}(numworkers,workids,futures,nbun)
        end
    end

    function Base.setindex!(A::Parallelarray,v,id,i) #A[id,i] = vの動作の定義
        myid = A.workers[id]
        @spawnat myid fetch(A.futures[id])[i] = v #ワーカーmyidでfetchしているので、自分自身が持っている配列をゲットし、i番目にvという値を入れている
        return 
    end 

    function Base.getindex(A::Parallelarray,id,i) #A[id,i]の動作の定義
        myid = A.workers[id]
        f = @spawnat myid fetch(A.futures[id])[i] #fetchを直接やってしまうと中のデータがworkerから消されてしまうので、fというFutureを作ってみた。うまくいっているかは不明。
        fetch(f)
        #fetch(A.futures[id])[i] #ここでは、旗艦に値を転送している
    end
    
    function get_numelements(A::Parallelarray)
        return A.numelements
    end
end

@everywhere using .Parallelmodule

function test()
    numworkers = nworkers()
    println(numworkers)
    println(workers())
    n = 12
    
    A = Parallelmodule.Parallelarray{Float64}(n)
    @sync for id=1:numworkers
        for i=1:A.numelements
            A[id,i] = 100*id+i
        end
    end

    @sync for id=1:numworkers
        for i=1:A.numelements
            println("id = $id,i = $i")
            println(A[id,i])
        end
    end

    return
end
test()

説明はコメントに記述しました。このモジュールを使う事で、何も考えずにworkerIDと配列のindexを指定すれば行列を更新することができるようになりました。

追記

fetch(A.futures[id])[i] #ここでは、旗艦に値を転送している

としていたところですが、これだと一度getindexすると値がworkerから消えてしまっているようです。理由は、fetchをしてしまうとworkerから旗艦にデータが移ってしまい、workerから消えてしまうから、のようです。
ということで、少し変更してみました。これでうまくいっているかはまだ不明です。
この書き方だと

function test()
    numworkers = nworkers()
    println(numworkers)
    println(workers())
    n = 12
    

    A = Parallelmodule.Parallelarray{Float64}(n)
    @sync for id=1:numworkers
        for i=1:A.numelements
            A[id,i] = 100*id+i
        end
    end

    
    @sync for id=1:numworkers
        for i=1:A.numelements
            println("id = $id,i = $i")
            println(A[id,i])
        end
    end
    println("done")
    

    @sync for id=1:numworkers
        for i=1:A.numelements
            A[id,i] = A[id,i]+10000*id+1000*i
        end
    end

    @sync for id=1:numworkers
        for i=1:A.numelements
            println("id = $id,i = $i")
            println(A[id,i])
        end
    end
end
test()

がちゃんと

3
[2, 3, 4]
id = 1,i = 1
101.0
id = 1,i = 2
102.0
id = 1,i = 3
103.0
id = 1,i = 4
104.0
id = 2,i = 1
201.0
id = 2,i = 2
202.0
id = 2,i = 3
203.0
id = 2,i = 4
204.0
id = 3,i = 1
301.0
id = 3,i = 2
302.0
id = 3,i = 3
303.0
id = 3,i = 4
304.0
done
id = 1,i = 1
11101.0
id = 1,i = 2
12102.0
id = 1,i = 3
13103.0
id = 1,i = 4
14104.0
id = 2,i = 1
21201.0
id = 2,i = 2
22202.0
id = 2,i = 3
23203.0
id = 2,i = 4
24204.0
id = 3,i = 1
31301.0
id = 3,i = 2
32302.0
id = 3,i = 3
33303.0
id = 3,i = 4
34304.0

となっていますので、配列の更新がうまくいっているように見えます。

追記2

それぞれのコアが持っている配列をそれぞれで更新する方法もわかりましたので、追記します。それを行うには、moduleに

    function paralleldo(A::Parallelarray,id,func!::Function,indices...)
        myid = A.workers[id]
        @spawnat myid func!(fetch(A.futures[id]),indices...)
    end

を足せばOKです。これで、引数としてfunctionを入れることができ、さらにそのfunctionには好きなだけ変数を入れることができます。
例えば、

@everywhere function test!(data,id,i)
    #println(data[i])
    data[i] = data[i]+10000*id+1000*i
end

function test()
    numworkers = nworkers()
    println(numworkers)
    println(workers())
    n = 12

    A = Parallelmodule.Parallelarray{Float64}(n)
    @sync for id=1:numworkers
        for i=1:A.numelements
            A[id,i] = 100*id+i
        end
    end

    @sync for id=1:numworkers
        for i=1:A.numelements
            println("id = $id,i = $i")
            println(A[id,i])
        end
    end
    println("dd")

    @sync for id=1:numworkers
        for i=1:A.numelements
            Parallelmodule.paralleldo(A,id,test!,id,i)
        end
    end

    @sync for id=1:numworkers
        for i=1:A.numelements
            println("id = $id,i = $i")
            println(A[id,i])
        end
    end

    return
end

test()

とすれば、関数test!の中身を好きなようにいじることで、並列計算ができることになります。

5
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
13

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?