Julia
並列処理

Juliaのスレッド並列を使って簡単な並列計算を楽しむ

Youtube で Intel の Tim Mattson 氏の OpenMP の解説動画 を見ていて、Juliaでもスレッド並列やってみるかと思い立ったので試してみました。

ちなみに私は

  • Julia使用歴 2 年
  • LLVMは読めません
  • 並列計算に関しては学部生の頃、講義で OpenMP を触った程度
  • 情報系の学部学科出身ではない

位のライトな Julian です。なので高度な並列計算については紹介しません(できません)。
今回は、計算順序に全く依存しない、OpenMP でいったら #pragma omp parallel for つけるだけで並列化できる程度の簡単な問題だけ扱います。

注意) 以下に書かれているコードをパーソナル・コンピュータで気軽に実行するとパソコンが固まる可能性があるため注意してください。
さらっと倍精度浮動小数点数型の $50000 \times 50000$ の配列が出てきますが、この配列1つでメモリを 18.6 GByte 消費します。
Unix 系 OS を使用されている方は ulimit 等でメモリ使用量に制限をかけることをおすすめします。

環境

OS: Red Hat Enterprise Linux Server release 6.9
CPU: Intel(R) Xeon(R) CPU E7- 2860 @ 2.27GHz × 8 (80コア/160スレッド)
Memory: PC3-10600R 800 GB強 (正確な値不明。htopには 803 GBと出ている。16 GB x 53位?)

Julia: version 0.6.2
Shell: Bash

Juliaのスレッド並列はCPUでの並列処理なので最低限 2 コア以上のマシンを用意してください。

公式ドキュメントより

Juliaのスレッド並列に関しては、調べても情報が少ない、というよりかググるとプロセス並列の方ばかり出てきて探しづらいので素直に公式ドキュメント Parallel Computing · The Julia Language を読むことにします。
ドキュメントを見るとわかりますが、Parallel Computing の章の大半はプロセス並列について書かれており、スレッド並列に関してはたった2節1です。紹介されている例も配列にスレッド番号を代入するというシンプルなものだけです2
具体例を減らすことによってユーザーの自由な発想を阻害しないようにという Julia Computing, Inc. の愛を感じますね!

とりあえず、Julia でのスレッド並列初心者なのでドキュメントに沿っていきます。
まずは、OpenMP 同様スレッド数を指定します。今回使用したマシンは80コア/160スレッドなのでMAXの160としました。

bash
$ export JULIA_NUM_THREADS=160

※ OpenMP の場合、論理プロセッサーの数以上を指定できますが、Julia の場合、論理プロセッサーの数を超えて指定しても意味はありません。Julia を起動したあとに Threads.nthreads() とすると論理プロセッサーの数になっています。

julia> using BenchmarkTools

julia> using Base.Threads # Threads.foo と打つの面倒なので using で読み込み

julia> nth = nthreads() # スレッド数
160

julia> a = zeros(2*nth);

julia> @threads for i = 1:2*nth
           a[i] = threadid() # スレッド番号を代入
       end

julia> a
320-element Array{Float64,1}:
   1.0
   1.0
   2.0
   2.0
   3.0
   3.0

     
  省略
     

 159.0
 159.0
 160.0
 160.0

配列 $\bf a$ の $2i-1$, $2i$ 番目にスレッド番号 $i$ が入りました。

例からわかるように Julia のスレッド並列では @threads の後の for 文が並列化されます。@threads の後に for 文以外を置くことは出来ません。

Threads module のソースコードを見るとわかりますが、@threads を使ったときのスケジューリングはラウンドロビン方式です。range で指定された範囲は各スレッドに等しく分配されます。

同期は for 文終了時に入ります。

また、この例からさらに読み取れることとして、プロセス並列時には各 Worker が書き込める配列として SharedArray を使う必要がありますが、スレッド並列時には各スレッドは普通の配列に書き込むことが出来ます。この辺のことに関してはメモリ空間を共有しているか否かを考えれば当然といえば当然ですが念の為。

参考資料
並列計算の基礎知識 | スーパーコンピュータシステムの使い方(旧版), 京都大学情報環境機構

本当に並列化されてるの?

実行時間が早すぎて並列化されている実感が持てないので sleep 使って擬似的な重い処理を入れてみます。Julia のスレッド構文内で Julia 標準の sleep を使うと Segmentation fault で落ちるので Libc.systemsleep を使います3
以下のコードでは 0.1, 0.2, 0.4, 0.8 秒の sleep を逐次と4並列で実行しています。理想的には逐次の場合 1.5 秒、並列の場合 0.8 秒で終了するはずです。

julia> @benchmark for i in 1:4
           t = (1 << (i-1)) / 10
           Libc.systemsleep(t)
       end
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     1.502 s (0.00% GC)
  median time:      1.504 s (0.00% GC)
  mean time:        1.503 s (0.00% GC)
  maximum time:     1.504 s (0.00% GC)
  --------------
  samples:          4
  evals/sample:     1

julia> @benchmark @threads for i in 1:4
           t = (1 << (i-1)) / 10
           Libc.systemsleep(t)
       end
BenchmarkTools.Trial: 
  memory estimate:  32 bytes
  allocs estimate:  1
  --------------
  minimum time:     800.055 ms (0.00% GC)
  median time:      800.073 ms (0.00% GC)
  mean time:        800.328 ms (0.00% GC)
  maximum time:     801.871 ms (0.00% GC)
  --------------
  samples:          7
  evals/sample:     1

良さげですね。

より実用的に

実際の数値計算において、配列にスレッド番号入れるなんて操作は滅多にないと思うので、より実用的な例で試してみます。
Julia のブログ記事 Technical preview: Native GPU programming with CUDAnative.jl を見ると GPU programming における Hello World はベクトルの足し算らしいので、まずはベクトルの足し算からやってみます。

ベクトルの和

function addvec(v::Vector{Float32}, w::Vector{Float32})
    len = length(v)
    u = zeros(Float32, len)
    for i in 1:len
        u[i] = v[i] + w[i]
    end
    return u
end

function taddvec(v::Vector{Float32}, w::Vector{Float32})
    len = length(v)
    u = zeros(Float32, len)
    @threads for i in 1:len
        u[i] = v[i] + w[i]
    end
    return u
end

const n = 500_000_000;
const v = rand(Float32, n);
const w = rand(Float32, n);

各関数 3回ずつの実行時間は以下の通りです。

julia> for i in 1:3 @time addvec(v,w) end
  3.170928 seconds (2 allocations: 1.863 GiB, 2.85% gc time)
  3.049815 seconds (2 allocations: 1.863 GiB, 1.07% gc time)
  3.213339 seconds (2 allocations: 1.863 GiB, 3.25% gc time)

julia> for i in 1:3 @time taddvec(v,w) end
  1.296064 seconds (121 allocations: 1.863 GiB, 0.90% gc time)
  1.227323 seconds (3 allocations: 1.863 GiB, 1.34% gc time)
  1.228507 seconds (3 allocations: 1.863 GiB, 1.34% gc time)

一応 2.5 倍近く早くなってますね。計算している問題を考えれば、まぁこんなものでしょう。

ちなみに普通の足し算は以下の通りです。

julia> for i in 1:3 @time v + w end
  1.380438 seconds (2 allocations: 1.863 GiB, 0.92% gc time)
  1.380853 seconds (2 allocations: 1.863 GiB, 0.90% gc time)
  1.380617 seconds (2 allocations: 1.863 GiB, 0.84% gc time)

2次元平面上の2粒子間の距離

線形計算に関しては BLAS や LAPACK に任せるとして、ここでは線形計算に落とし込めないけれど並列化の恩恵にあずかれる例として2粒子間の距離の計算をしてみます。

$L \times L$ の正方形の箱 (境界は壁) の中に $N$ 個の粒子が存在するとします。このときの粒子 $i$ と 粒子 $j$ 間の距離 $D_{ij}$ $(1 \leq i,~j \leq N)$ を計算することが目標です。
以下の例では $N = 50000$, $L = 10$ として計算しています。

function pdist(x::Vector{Float64}, y::Vector{Float64})
    N = length(x)
    D = zeros(Float64, N, N)
    for i in 1:N
        for j in 1:N
            dx, dy = x[i] - x[j], y[i] - y[j]
            D[i,j] = hypot(dx, dy)
        end
    end

    return D
end

function tpdist(x::Vector{Float64}, y::Vector{Float64})
    N = length(x)
    D = zeros(Float64, N, N)
    @threads for i in 1:N
        for j in 1:N
            dx, dy = x[i] - x[j], y[i] - y[j]
            D[i,j] = hypot(dx, dy)
        end
    end

    return D
end


const N = 50_000
const L = 10
const x = rand(N) * L;
const y = rand(N) * L;
julia> for i in 1:3 @time pdist(x, y) end
 97.880293 seconds (444 allocations: 18.626 GiB, 0.10% gc time)
 97.924678 seconds (2 allocations: 18.626 GiB, 0.20% gc time)
 97.954345 seconds (2 allocations: 18.626 GiB, 0.20% gc time)

julia> for i in 1:3 @time tpdist(x, y) end
 22.816490 seconds (655 allocations: 18.626 GiB, 0.79% gc time)
 22.919686 seconds (3 allocations: 18.626 GiB, 0.45% gc time)
 23.879021 seconds (3 allocations: 18.626 GiB, 0.47% gc time)

CPU使用率は圧巻の光景なのですが、思ったほど速度は出ませんね。

普通、2点間の距離を計算するときの2重ループの部分は

for i in 1:N-1
    for j in i+1:N
         

とするのでしょうが、こうするとスレッド間のロードバランスが悪くなって上図のようなほぼ全てのCPU使用率を100%にするのが難しいのでやりませんでした。
単純に圧巻の光景を長く見たかったから愚直に計算しました。他に深い意味はありません。

time.png width=50%
スレッド数を 1, 2, 4, 8, 16, 32, 64, 128, 160 と変えたときの実行時間は上図のようになりました。
このとき、各スレッド数に対して10回サンプルを取っています。
スレッド数 1 で @threads 使うと普通の逐次よりも遅くなります。

OpenMP における #pragma omp parallel 的なもの

Julia における @threads は OpenMP における #pragma omp parallel for に相当する操作であり、for がついていない #pragma omp parallel に相当する操作は提供されていません。しかし、以下のようにすることで近いことはできます。

@threads for id in 1:nthreads()
    処理
end

このようにすると、for文内部の処理は各スレッドで実行されます。
手間はかかりますがロードバランスを自分で調整することができます。

試しに一番初めの例の、配列にスレッド番号を入れるというのを自力で書いてみます。

julia> nth = nthreads();

julia> a = zeros(2*nth);

julia> d = div(length(a), nth);

julia> @threads for id in 1:nth
           tid = threadid()
           for i in (tid-1)*d+1:tid*d
               a[i] = tid
           end
       end

julia> a
320-element Array{Float64,1}:
   1.0
   1.0
   2.0
   2.0
   3.0
   3.0

     
  省略
     

 159.0
 159.0
 160.0
 160.0

初めの例と同じ結果を得ることができました。

Reduction

Juliaの公式ドキュメントによると @threads では @parallel のような reduction parameter が使えません。
reduction が使えないことで不都合が生じる代表例として、下記のような数の足し上げがあります。

function tadd1()
    s = 0
    @threads for i in 1:10_000
        s += 1
    end

    return s
end

単純に 1 を1万回足しているだけなので答えは 10000 になるはずです。パット見正しく動きそうな気がします。

それでは実際に実行してみます。

julia> tadd1()
7730

julia> tadd1()
67

なんだか意味のわからない数値が返ってきました。原因はいわゆるデータ競合(data race)というやつで複数のスレッドが同じ共有変数を書き換えるためにこのような頓珍漢な値が返ってきます。

回避方法として配列に一時的に入れるという方法があります。

function tadd1_v2()
    sa = zeros(Int, nthreads())
    @threads for i in 1:10_000
        sa[threadid()] += 1
    end

    s = sum(sa)
    return s
end
julia> tadd1_v2()
10000

julia> tadd1_v2()
10000

今度はちゃんと 10000 を得ることが出来ました。
配列を使う方法は OpenMP でも使われる手法ですが、並列領域外で足し上げなければいけないというのがなんだか負けた気がするので、並列領域内で reduction できるように頑張ってみます。
そのためにアトミック操作を使います。現時点(2018/1/12)の stable 版のドキュメントには書いてありませんが latest 版には実行例が書かれているのでそれを参考にしてみます。

function tadd1_v3(n::Int)
    s = Atomic{Int64}(0)
    len = div(n, nthreads())

    @threads for i in 1:nthreads()
        tmps = 0
        id = threadid()

        for i in (id-1)*len+1:id*len
            tmps += 1
        end

        atomic_add!(s, tmps)
    end

    return s.value
end
julia> println(nthreads()*1000)
160000

julia> tadd1_v3(nthreads()*1000)
160000

正しく動いてそうです。
動作を見る限り、スレッド構文内で定義した変数は private 変数になるようです。

v2 の方も n 回足し上げるように書き換えベンチマークを取ってみます。

julia> @benchmark tadd1_v2(nthreads()*10000)
BenchmarkTools.Trial: 
  memory estimate:  1.36 KiB
  allocs estimate:  2
  --------------
  minimum time:     9.219 ms (0.00% GC)
  median time:      9.998 ms (0.00% GC)
  mean time:        11.981 ms (0.00% GC)
  maximum time:     20.996 ms (0.00% GC)
  --------------
  samples:          418
  evals/sample:     1


julia> @benchmark tadd1_v3(nthreads()*10000)
BenchmarkTools.Trial: 
  memory estimate:  64 bytes
  allocs estimate:  2
  --------------
  minimum time:     8.816 ms (0.00% GC)
  median time:      10.152 ms (0.00% GC)
  mean time:        10.813 ms (0.00% GC)
  maximum time:     20.006 ms (0.00% GC)
  --------------
  samples:          463
  evals/sample:     1

アトミック操作のほうが遅くなると思ったのですが、実行時間に顕著な差はありませんでした。

うまいことマクロを書けばスレッド並列でも reduction parameter をつけられそうな気がしますが、そのへんはプロに任せます。

ちなみに逐次実行は並列化したものよりも圧倒的に早いです。

julia> function seq(n::Int)
                  s = 0
                  for i in 1:n
                      s += 1
                  end

                  return s
              end
seq (generic function with 1 method)

julia> @benchmark seq(nthreads()*10000)
BenchmarkTools.Trial: 
  memory estimate:  0 bytes
  allocs estimate:  0
  --------------
  minimum time:     4.175 ns (0.00% GC)
  median time:      4.177 ns (0.00% GC)
  mean time:        4.194 ns (0.00% GC)
  maximum time:     14.863 ns (0.00% GC)
  --------------
  samples:          10000
  evals/sample:     1000

おわりに

以上、素人目線からの Julia スレッド並列でした。
80コア/160スレッドは私の手に余ります。マシンの能力を全く発揮できる気がしません。
そもそも見当違いなことをしている気がしてなりません。
Julia・並列計算の両方に精通している方の投稿を切に願います。

参考


  1. @threadcall (Experimental) はスレッドといえばスレッドだが今回の趣旨とは違うと思うので実質 Multi-Threading (Experimental) の1節のみ。 

  2. latest 版のドキュメントにはアトミック操作についても書かれています。 

  3. 他にも print をスレッド構文内に入れると Segmentation fault で落ちます。