前回のコードリーディング Parallel gem (Thread)でThreadベースでの並列処理を追っていきましたが今回はProcessベースの方を追っていきます.
対象
- Parallel
- MRI 2.3
- MacOS
ThreadとProcessの違いと其々どうゆうものかを知ってる人向けです.
その辺は何処かの記事を読んで置いてください.
ユースケース
コードを読むにあたり以下のコードがどのように実行されるか追うことにしました.
list = ['a','b','c']
log.info 'start'
Parallel.map(list, in_processes: 2) do |one_letter|
log.info one_letter
end
log.info 'finish'
実行結果は以下のようになります.
-
pid
はProcess.pid
-
tid
はThread.current.object_id
Parallel.map
ここは前回読んでしまったので概要だけ箇条書きにしておきます.
- 並列数を処理するJobの数かCPUのコア数から少ない方を選択
- ThreadまたはProcessのどちらかを選択
- 今回はオプションで
in_processes
を指定したのでParallel.work_in_processes
を実行
- 今回はオプションで
Parallel.work_in_processes
ここから先は複雑すぎて文章じゃ説明できなかったので図を描いてみた.

1. new - JobFactory.new
まずメインスレッドでJobFactory.newをしています
def map(source, options = {}, &block)
# ...
job_factory = JobFactory.new(source, options[:mutex])
2. option[:size]の数だけfork - 事前にProcessを作成
事前にProcessをForkしています.
余談ですがApacheとかでPreforkとか呼ばれるものが同じようにしています.
ProcessをForkするとメモリ空間自体が共有できないのでプロセス間通信の問題があります.
Parallelではパイプを使って解決しています.
具体的にはwork_in_processes
内でcreate_workers
を呼び出すことで並列数分のWorkerオブジェクトを作成しています.
def work_in_processes(job_factory, options, &blk)
workers = if options[:isolation]
[] # we create workers per job and not beforehand
else
create_workers(job_factory, options, &blk)
end
isolation
はワーカーを事前に作らずスレッド呼び出し毎に逐次ワーカーを作成していくものです. 今回のユースケースには無いので無視します.
create_workers
でjob_factory
からジョブを取得しワーカーに割り当てます.
内部で具体的に何をしてるかは後で書きます.
3. newでpidと紐付け - 子プロセスを管理するworkerインスタンスの作成
worker
メソッドで子プロセスのforkとWorkerオブジェクトを作成しています.
def worker(job_factory, options, &block)
# ...
pid = Process.fork do
# ...
end
# ...
Worker.new(parent_read, parent_write, pid)
end
Workerオブジェクトは親プロセスとのin
とout
のIOオブジェクトが渡され、親と子プロセスのデータのやり取りをしています.
データのフォーマットはRubyに用意されているMarshalを使ってdump
/load
されます(図の9を参照)
5. option[:size]の数だけスレッド起動 - 親プロセスの1スレッドに対して子プロセスを割り当て
親プロセスがシングルスレッドでは子プロセスが複数あると同時に子プロセスにデータや処理の同期ができません.
そこで親プロセスをマルチスレッドにすることで1スレッドに対して子プロセスを割り当てて並列に処理できるようにしていきます.
UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
in_threads(options) do |i|
worker = workers[i]
# ....
end
end
UserInterruptHandler.kill_on_ctrl_c
はシグナルを送って処理に割り込む仕組みです.
以下の処理でSIGINT
(Control-C)でProcessを全て殺します.
シグナル自体は:interrupt_signal
オプションで変更可能です.