自分の為に https://docs.julialang.org/en/v1/manual/parallel-computing/ を訳してみる。続き。
マルチスレッド (実験的機構)
Juliaはタスクの他にマルチスレッドも言語としてサポートしている。ただし実験的な機構なので、インターフェイスが将来変更になる可能性があることに注意しよう。
セットアップ
デフォルトでは、Juliaは1スレッドとして実行を開始する。これはThreads.nthreads()
で確認する。
julia> Threads.nthreads()
1
起動時のスレッド数を変更するには環境変数JULIA_NUM_THREADS
で変更できる。4スレッドで実行開始するには下のようにする。
Linux/OSXのbash:
export JULIA_NUM_THREADS=4
Linux/OSXのcsh、WindowsのCMD:
set JULIA_NUM_THREADS=4
WindowsのPowershell:
$env:JULIA_NUM_THREADS=4
4スレッド使えることを確認してみよう。
julia> Threads.nthreads()
4
ユーザが実行しているのはマスタースレッドだ。Threads.threadid
で確認しよう。
julia> Threads.threadid()
1
@threads マクロ
ネイティブスレッドを使った例を作ってみよう。まずゼロの配列を作る。
julia> a = zeros(10)
10-element Array{Float64,1}:
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
0.0
4スレッド使ってこの配列を同時に処理してみよう。個々のスレッドはスレッドIDをそれぞれの場所に書いていく。
JuliaはThreads.@threads
マクロを用いた並列ループをサポートしている。このマクロはforループの前に付けて、
そのループがマルチスレッド領域であることをJulia処理系に示す。
julia> Threads.@threads for i = 1:10
a[i] = Threads.threadid()
end
繰り返し実行する範囲がスレッドに対して分割される。各スレッドは割り当てられた領域にスレッド番号を書き込んでいる。
julia> a
10-element Array{Float64,1}:
1.0
1.0
1.0
2.0
2.0
2.0
3.0
3.0
4.0
4.0
Threads.@threads
には@distributed
にあるようなreduceパラメータは無いことに注意しよう。
アトミック操作
Juliaは値に対するアクセスと更新をアトミック、つまり競合の発生しないスレッドセーフな方法で行う機構をサポートしている。プリミティブ型をThread.Atomic
でくるむことで、アトミックにアクセスするように指定する。例を見てみよう。
julia> i = Threads.Atomic{Int}(0);
julia> ids = zeros(4);
julia> old_is = zeros(4);
julia> Threads.@threads for id in 1:4
old_is[id] = Threads.atomic_add!(i, id)
ids[id] = id
end
julia> old_is
4-element Array{Float64,1}:
0.0
1.0
7.0
3.0
julia> ids
4-element Array{Float64,1}:
1.0
2.0
3.0
4.0
アトミック指定をしないと、競合によって答えおかしくなる場合がある。競合を避けないとどうなるか見てみよう。
julia> using Base.Threads
julia> nthreads()
4
julia> acc = Ref(0)
Base.RefValue{Int64}(0)
julia> @threads for i in 1:1000
acc[] += 1
end
julia> acc[]
926
julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)
julia> @threads for i in 1:1000
atomic_add!(acc, 1)
end
julia> acc[]
1000
Note
すべてのプリミティブ型がアトミックにできるわけではない。サポートされているのはInt8
,Int16
,Int32
,Int64
,Int128
,UInt8
,UInt16
,UInt32
,UInt64
,UInt128
,Float16
,Float32
,Float64
だけだ。また、AAarch32
やppc64el
では、Int128
とUInt128
はサポートされない。
副作用と変更可能な関数引数
純粋でない関数に対してマルチスレッドを用いる際には、注意しないと間違った答えが得られる場合がある。例えば、名前の最後に!
がつく関数は引数を変更するので純粋ではない。しかし最後が!
になっていないにも関わらう副作用を持つ関数もある。例えば、findfirst(regex, str)
は引数regex
を変更するし、rand()
はBase.GLOBAL_RNG
を変更する。
julia> using Base.Threads
julia> nthreads()
4
julia> function f()
s = repeat(["123", "213", "231"], outer=1000)
x = similar(s, Int)
rx = r"1"
@threads for i in 1:3000
x[i] = findfirst(rx, s[i]).start
end
count(v -> v == 1, x)
end
f (generic function with 1 method)
julia> f() # the correct result is 1000
1017
julia> function g()
a = zeros(1000)
@threads for i in 1:1000
a[i] = rand()
end
length(unique(a))
end
g (generic function with 1 method)
julia> Random.seed!(1); g() # the result for a single thread is 1000
781
このような場合には、競合が起きないようにコードを再設計するか、同期プリミティブを用いる必要がある。
上のfindfirst
を修正するには、rx
変数をスレッドごとにコピーしておけば良い。
julia> function f_fix()
s = repeat(["123", "213", "231"], outer=1000)
x = similar(s, Int)
rx = [Regex("1") for i in 1:nthreads()]
@threads for i in 1:3000
x[i] = findfirst(rx[threadid()], s[i]).start
end
count(v -> v == 1, x)
end
f_fix (generic function with 1 method)
julia> f_fix()
1000
We now use
r"1"
ではなくRegex("1")
をつかうことで、別のコピーが作られるようにして、rx
ベクタに格納している。
rand
の場合はちょっと複雑で、各スレッドの疑似乱数系列が重なり合わないようにしなければならない。そのためにはFuture.randjump
関数を用いる。
julia> using Random; import Future
julia> function g_fix(r)
a = zeros(1000)
@threads for i in 1:1000
a[i] = rand(r[threadid()])
end
length(unique(a))
end
g_fix (generic function with 1 method)
julia> r = let m = MersenneTwister(1)
[m; accumulate(Future.randjump, fill(big(10)^20, nthreads()-1), init=m)]
end;
julia> g_fix(r)
1000
g_fix
にr
ベクトルを渡している。RGNを複数作るのは重い処理なので、関数を実行するたびに繰り返したくはないからだ。
@threadcall (実験的機能)
Juliaでは、すべてのI/Oタスク、タイマー、REPLコマンドは一つのOSスレッドのイベントループで実行される。これは、libuv(http://docs.libuv.org/en/v1.x/) を改変したもので実現されている。プログラム中のイールド点によって、複数のタスクの一つのOSスレッドに対する協調的なスケジュールが実現されている。I/Oタスクやタイマーは、イベントを待つ間に暗黙裡にイールドしている。yield
をよびだすと明示的に他のタスクに実行をイールド(譲る)ことになる。
したがって、ccall
をあるタスクから実行すると、その実行が終了するまで、Juliaのスケジューラは他のタスクをスケジュールできなくなる。これはすべての外部ライブラリ呼び出しについていえる。例外はJuliaのコードをコールバックし、そこでyield
するコードと、Juliaのyield
に相当するC関数jl_yield()
を呼び出すコードだけだ。
ここで、(デフォルトでは)シングルスレッドで動作するJuliaのコードでも、Juliaが利用するライブラリは独自のスレッドを起動する場合があることに注意しよう。例えばBLASライブラリはその計算機のコアの数と同じだけのスレッドを実行する。
@threadcall
マクロは、ccall
でJuliaのイベントループをブロックしたくない場合に役に立つ。これを用いるとCの関数を別のスレッドで実行する事ができる。このスレッドに用いられるスレッドプールのサイズはデフォルトでは4になるが、環境変数UV_THREADPOOL_SIZE
で制御することができる。空きスレッドを待つ間、そのスレッドでの関数の実行の実行が行われている間も、(Juliaのメインイベントループから)外部関数の実行を要求したタスクは、他のタスクに実行を譲る(イールド)できる。@threadcall
は実行が終了するまでリターンしないことに注意しよう。つまり、ユーザの視点からは、他のJulia APIと同様にブロッキング呼び出しのように見える。
呼び出された関数がJuliaをコールバックするとセグメンテーションフォールトが発生するので注意しよう。
@threadcall は、将来のJuliaでは変更されるかもしれないし削除されるかもしれない。