7
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

Julia Parallel Computing の訳 (2) マルチスレッド

Posted at

自分の為に https://docs.julialang.org/en/v1/manual/parallel-computing/ を訳してみる。続き。

マルチスレッド (実験的機構)

Juliaはタスクの他にマルチスレッドも言語としてサポートしている。ただし実験的な機構なので、インターフェイスが将来変更になる可能性があることに注意しよう。

セットアップ

デフォルトでは、Juliaは1スレッドとして実行を開始する。これはThreads.nthreads()で確認する。

julia> Threads.nthreads()
1

起動時のスレッド数を変更するには環境変数JULIA_NUM_THREADSで変更できる。4スレッドで実行開始するには下のようにする。

Linux/OSXのbash:

export JULIA_NUM_THREADS=4

Linux/OSXのcsh、WindowsのCMD:

set JULIA_NUM_THREADS=4

WindowsのPowershell:

$env:JULIA_NUM_THREADS=4

4スレッド使えることを確認してみよう。

julia> Threads.nthreads()
4

ユーザが実行しているのはマスタースレッドだ。Threads.threadidで確認しよう。

julia> Threads.threadid()
1

@threads マクロ

ネイティブスレッドを使った例を作ってみよう。まずゼロの配列を作る。

julia> a = zeros(10)
10-element Array{Float64,1}:
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0
 0.0

4スレッド使ってこの配列を同時に処理してみよう。個々のスレッドはスレッドIDをそれぞれの場所に書いていく。

JuliaはThreads.@threadsマクロを用いた並列ループをサポートしている。このマクロはforループの前に付けて、
そのループがマルチスレッド領域であることをJulia処理系に示す。

julia> Threads.@threads for i = 1:10
           a[i] = Threads.threadid()
       end

繰り返し実行する範囲がスレッドに対して分割される。各スレッドは割り当てられた領域にスレッド番号を書き込んでいる。

julia> a
10-element Array{Float64,1}:
 1.0
 1.0
 1.0
 2.0
 2.0
 2.0
 3.0
 3.0
 4.0
 4.0

Threads.@threadsには@distributedにあるようなreduceパラメータは無いことに注意しよう。

アトミック操作

Juliaは値に対するアクセスと更新をアトミック、つまり競合の発生しないスレッドセーフな方法で行う機構をサポートしている。プリミティブ型をThread.Atomicでくるむことで、アトミックにアクセスするように指定する。例を見てみよう。

julia> i = Threads.Atomic{Int}(0);

julia> ids = zeros(4);

julia> old_is = zeros(4);

julia> Threads.@threads for id in 1:4
           old_is[id] = Threads.atomic_add!(i, id)
           ids[id] = id
       end

julia> old_is
4-element Array{Float64,1}:
 0.0
 1.0
 7.0
 3.0

julia> ids
4-element Array{Float64,1}:
 1.0
 2.0
 3.0
 4.0

アトミック指定をしないと、競合によって答えおかしくなる場合がある。競合を避けないとどうなるか見てみよう。

julia> using Base.Threads

julia> nthreads()
4

julia> acc = Ref(0)
Base.RefValue{Int64}(0)

julia> @threads for i in 1:1000
          acc[] += 1
       end

julia> acc[]
926

julia> acc = Atomic{Int64}(0)
Atomic{Int64}(0)

julia> @threads for i in 1:1000
          atomic_add!(acc, 1)
       end

julia> acc[]
1000

Note
すべてのプリミティブ型がアトミックにできるわけではない。サポートされているのはInt8, Int16, Int32, Int64, Int128, UInt8, UInt16, UInt32, UInt64, UInt128, Float16, Float32, Float64だけだ。また、AAarch32ppc64elでは、Int128UInt128はサポートされない。

副作用と変更可能な関数引数

純粋でない関数に対してマルチスレッドを用いる際には、注意しないと間違った答えが得られる場合がある。例えば、名前の最後に!がつく関数は引数を変更するので純粋ではない。しかし最後が!になっていないにも関わらう副作用を持つ関数もある。例えば、findfirst(regex, str)は引数regexを変更するし、rand()Base.GLOBAL_RNGを変更する。

julia> using Base.Threads

julia> nthreads()
4

julia> function f()
           s = repeat(["123", "213", "231"], outer=1000)
           x = similar(s, Int)
           rx = r"1"
           @threads for i in 1:3000
               x[i] = findfirst(rx, s[i]).start
           end
           count(v -> v == 1, x)
       end
f (generic function with 1 method)

julia> f() # the correct result is 1000
1017

julia> function g()
           a = zeros(1000)
           @threads for i in 1:1000
               a[i] = rand()
           end
           length(unique(a))
       end
g (generic function with 1 method)

julia> Random.seed!(1); g() # the result for a single thread is 1000
781

このような場合には、競合が起きないようにコードを再設計するか、同期プリミティブを用いる必要がある。

上のfindfirstを修正するには、rx変数をスレッドごとにコピーしておけば良い。

julia> function f_fix()
             s = repeat(["123", "213", "231"], outer=1000)
             x = similar(s, Int)
             rx = [Regex("1") for i in 1:nthreads()]
             @threads for i in 1:3000
                 x[i] = findfirst(rx[threadid()], s[i]).start
             end
             count(v -> v == 1, x)
         end
f_fix (generic function with 1 method)

julia> f_fix()
1000

We now use
r"1"ではなくRegex("1")をつかうことで、別のコピーが作られるようにして、rxベクタに格納している。

randの場合はちょっと複雑で、各スレッドの疑似乱数系列が重なり合わないようにしなければならない。そのためにはFuture.randjump関数を用いる。

julia> using Random; import Future

julia> function g_fix(r)
           a = zeros(1000)
           @threads for i in 1:1000
               a[i] = rand(r[threadid()])
           end
           length(unique(a))
       end
g_fix (generic function with 1 method)

julia>  r = let m = MersenneTwister(1)
                [m; accumulate(Future.randjump, fill(big(10)^20, nthreads()-1), init=m)]
            end;

julia> g_fix(r)
1000

g_fixrベクトルを渡している。RGNを複数作るのは重い処理なので、関数を実行するたびに繰り返したくはないからだ。

@threadcall (実験的機能)

Juliaでは、すべてのI/Oタスク、タイマー、REPLコマンドは一つのOSスレッドのイベントループで実行される。これは、libuv(http://docs.libuv.org/en/v1.x/) を改変したもので実現されている。プログラム中のイールド点によって、複数のタスクの一つのOSスレッドに対する協調的なスケジュールが実現されている。I/Oタスクやタイマーは、イベントを待つ間に暗黙裡にイールドしている。yieldをよびだすと明示的に他のタスクに実行をイールド(譲る)ことになる。

したがって、ccallをあるタスクから実行すると、その実行が終了するまで、Juliaのスケジューラは他のタスクをスケジュールできなくなる。これはすべての外部ライブラリ呼び出しについていえる。例外はJuliaのコードをコールバックし、そこでyieldするコードと、Juliaのyieldに相当するC関数jl_yield()を呼び出すコードだけだ。

ここで、(デフォルトでは)シングルスレッドで動作するJuliaのコードでも、Juliaが利用するライブラリは独自のスレッドを起動する場合があることに注意しよう。例えばBLASライブラリはその計算機のコアの数と同じだけのスレッドを実行する。

@threadcallマクロは、ccallでJuliaのイベントループをブロックしたくない場合に役に立つ。これを用いるとCの関数を別のスレッドで実行する事ができる。このスレッドに用いられるスレッドプールのサイズはデフォルトでは4になるが、環境変数UV_THREADPOOL_SIZEで制御することができる。空きスレッドを待つ間、そのスレッドでの関数の実行の実行が行われている間も、(Juliaのメインイベントループから)外部関数の実行を要求したタスクは、他のタスクに実行を譲る(イールド)できる。@threadcallは実行が終了するまでリターンしないことに注意しよう。つまり、ユーザの視点からは、他のJulia APIと同様にブロッキング呼び出しのように見える。

呼び出された関数がJuliaをコールバックするとセグメンテーションフォールトが発生するので注意しよう。

@threadcall は、将来のJuliaでは変更されるかもしれないし削除されるかもしれない。

7
6
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
6

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?