自分の為に https://docs.julialang.org/en/v1/manual/parallel-computing/ を訳してみる。
Parallel Computing
マルティスレッドや並列計算に親しみのない読者は、まずJuliaがサポートする並列性のレベルを理解しておくといいだろう。大きく分けて3つのカテゴリがある。
- Juliaコルーチン (グリーンスレッド)
- マルチスレッド
- マルチコア/分散計算
Multi-Core or Distributed Processing
まず、Juliaの実行ライブラリに依存するJuliaのTask(コルーチン)といくつかのモジュールを見ていこう。これらは、計算の実行をサスペンドしたり再開したりする事ができ、OSのスケジューラを使わずにタスク間の通信を完全に制御できる。wait
とfetch
などの操作を用いてタスク間通信もサポートされている。通信やデータの同期はタスク間通信を提供するパイプであるChannel
によって実現される。
実験的にマルチスレッドも実装されている。マルチスレッドでは、実行を分岐させ、無名関数をすべてのスレッドで実行する事ができる。並列動作するスレッドは独立に実行されるが、最終的にはJuliaのメインスレッドに合流して、それ以降の逐次実行を続行する。これをfork-joinと呼ぶ。マルチスレッドはBase.Threads
モジュールで実現されているがこのモジュールは実験段階にある。I/O操作やタスク切替時にsegfaultが起きる場合があるからだ。イッシュートラッカーを見て最新の情報をチェックしてほしい。マルチスレッドをつかうには、後述のグローバル変数、ロック、アトミックを考慮する必要がある。
最後に、Juliaでの分散並列計算について説明する。Juliaは科学技術計算を対象にしているので、マルチコア、マルチマシンに一つのプロセスを分散させるインターフェイスを言語レベルで実装している。また、MPI.jlやDistributedArrays.jlなどの分散プログラム用の外部パッケージについても触れる。
コルーチン(coroutine)
Juliaの並列プログラミングプラットフォームは、タスク(コルーチン)をもちいて複数の計算のスイッチングを行う。軽量スレッド間での実行順序を表現するには通信プリミティブを用いる必要がある。このためにJuliaは、関数Channel(func::Function, ctype=Any, csize=0, taskref=nothing)
を提供している。この関数は、func
を用いて新しいタスクを作り、そのタスクに型ctype
、サイズcsize
の新しいチャンネルを接続しタスクの実行をスケジュールする。Channelはタスク間通信に用いるもので、Channel{T}(sz::Int)
として、型T
サイズsz
のバッファ付チャンネルを作ることもできる。あるタスクがfetch
やwait
などの通信操作を行うと、そのタスクはサスペンドされ、スケジューラが選んだ別のタスクが実行される。待っていたイベントが終了すると、実行が再開される。
多くの場合、直接タスクを扱う必要はない。しかしタスクを用いれば、複数のイベントを同時に待ち受けて動的なスケジューリングを実現する事ができる。
動的スケジューリングは、なにをどこで計算するのかを、他のジョブが終了した時点で決定する。
このようなスケジューリングは、ワークロードの実行時間が予測できない場合やバランスしていない場合に必要となる。実行中のタスクが終了した場合にだけ次の仕事を割り当てる。
チャンネル
コントロールフローのタスクのセクションで、複数の関数を協調して動作させる方法について説明した。
チャンネルは実行中のタスク間でデータのやり取りに有用だ。特にI/O操作によく用いられる。
I/O操作とは、ファイルの読み書き、Webサービスへのアクセス、外部プログラムの実行などだ。
いずれの場合もファイルを読み出している間や外部サービス・プログラムの実行終了を待つ間に
他の事ができれば、全体の実行時間を短縮できる。
チャンネルは、両端がそれぞれ入力と出力になっているパイプとして図にする事ができる。
-
複数のタスクから一つのチャンネルに対して
put!
を呼び出して書き込むことができる。 -
複数のタスクから一つのチャンネルに対して
take!
を呼び出して読み出すことができる。 -
例を示す:
# チャンネルを2つ
c1 = Channel(32)
c2 = Channel(32)
# 関数`foo`はc1から読みだしたアイテムを処理してc2に書き込む。
function foo()
while true
data = take!(c1)
[...] # dataを処理
put!(c2, result) # resultを書き出す
end
end
# `foo`を`n`個同時に実行できる。
for _ in 1:n
@async foo()
end
-
チャンネルは
Channel{T}(sz)
コンストラクタで作ることができる。このチャンネルは型T
のオブジェクトだけを保持する事ができる。sz
はチャンネルに同時に保持できる最大のオブジェクト数を指す。例えば、Channel(32)
は任意の型のオブジェクトを最大32個保持する事ができる。Channel{MyType}(64)
はMyType
型のオブジェクトを同時に64個保持することができる。 -
チャンネルが空の場合読み出しタスクは(
take!
を呼び出した際に)、データが得られるまでブロックする。 -
チャンネルがいっぱいであれば書き込みタスクは(
put!
を呼び出した際に)、書き込む隙間ができるまでブロックする。 -
isready
関数を用いるとチャンネルにオブジェクトがあるか確認することができる。wait
関数はオブジェクトが用意されるまでブロックする。
チャンネルはつくられたところでオープン状態となる。つまり、自由にtake!
で読み出すことができ、put!
で書き込むことができる。close
関数でクローズすることができる。クローズされたチャンネルに対してput!
すると失敗する。例を示す。
julia> c = Channel(2);
julia> put!(c, 1) # `put!` on an open channel succeeds
1
julia> close(c);
julia> put!(c, 2) # `put!` on a closed channel throws an exception.
ERROR: InvalidStateException("Channel is closed.",:closed)
Stacktrace:
[...]
クローズされたチャンネルに対してtake!
やfetch
(こちらはデータを読み出すがチャンネルから取り除かない)を実行した場合には、チャンネルが空になるまで中に入った値を読み出すことができる。下の例は上のつづきだ。
julia> fetch(c) # Any number of `fetch` calls succeed.
1
julia> fetch(c)
1
julia> take!(c) # The first `take!` removes the value.
1
julia> take!(c) # No more data available on a closed channel.
ERROR: InvalidStateException("Channel is closed.",:closed)
Stacktrace:
[...]
チャンネルはループ内のイテラブルとして利用できる。この場合、ループはチャンネルにデータがあるもしくは
オープンである間、実行し続ける。
ループ変数にはチャンネルに書き込まれた値が順に代入される。
ループはチャンネルがクローズされ、空になると終了する。
たとえば、以下のコードではforループがデータを待ってブロックしてしまう。
julia> c = Channel{Int}(10);
julia> foreach(i->put!(c, i), 1:3) # add a few entries
julia> data = [i for i in c]
このようにすれば動作する。
julia> c = Channel{Int}(10);
julia> foreach(i->put!(c, i), 1:3); # add a few entries
julia> close(c); # `for` loops can exit
julia> data = [i for i in c]
3-element Array{Int64,1}:
1
2
3
チャンネルを使ってタスク間通信する簡単な例を見てみよう。一つのジョブチャンネルからデータを得て処理する4つのタスクを作ってみよう。 チャンネルに書き込まれるジョブは、ジョブID (job_id
)で識別できる。ここで用いるそれぞれのタスクはjob_id
を読み出し、ランダムな時間だけ待ってからjob_idとシミュレートした実行時間をresults
チャンネルに書き出す。最後にこのresults
の中身を書き出す。
julia> const jobs = Channel{Int}(32);
julia> const results = Channel{Tuple}(32);
julia> function do_work()
for job_id in jobs
exec_time = rand()
sleep(exec_time) # simulates elapsed time doing actual work
# typically performed externally.
put!(results, (job_id, exec_time))
end
end;
julia> function make_jobs(n)
for i in 1:n
put!(jobs, i)
end
end;
julia> n = 12;
julia> @async make_jobs(n); # feed the jobs channel with "n" jobs
julia> for i in 1:4 # start 4 tasks to process requests in parallel
@async do_work()
end
julia> @elapsed while n > 0 # print out results
job_id, exec_time = take!(results)
println("$job_id finished in $(round(exec_time; digits=2)) seconds")
global n = n - 1
end
4 finished in 0.22 seconds
3 finished in 0.45 seconds
1 finished in 0.5 seconds
7 finished in 0.14 seconds
2 finished in 0.78 seconds
5 finished in 0.9 seconds
9 finished in 0.36 seconds
6 finished in 0.87 seconds
8 finished in 0.79 seconds
10 finished in 0.64 seconds
12 finished in 0.5 seconds
11 finished in 0.97 seconds
0.029772311
現状のJuliaはすべてのタスクを1つのOSスレッドで多重実行する。したがって、タスクを用いるとI/O操作を並列実行で効率化することはできるが、計算バウンドのタスクに関しては一つのOSスレッドで逐次的に実行されることになる。将来はタスクを複数のスレッドにスケジュールできるようになるかもしれない。そうなれば計算バウンドのタスクでも並列実行のメリットを享受できるようになるだろう。