4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

コードリーディング Parallel gem (Process)

Last updated at Posted at 2016-10-30

前回のコードリーディング Parallel gem (Thread)でThreadベースでの並列処理を追っていきましたが今回はProcessベースの方を追っていきます.

対象

ThreadとProcessの違いと其々どうゆうものかを知ってる人向けです.
その辺は何処かの記事を読んで置いてください.

ユースケース

コードを読むにあたり以下のコードがどのように実行されるか追うことにしました.

list = ['a','b','c']
log.info 'start'
Parallel.map(list, in_processes: 2) do |one_letter|
  log.info one_letter
end
log.info 'finish'

実行結果は以下のようになります.

  • pidProcess.pid
  • tidThread.current.object_id

parallel_processes.png

Parallel.map

ここは前回読んでしまったので概要だけ箇条書きにしておきます.

  • 並列数を処理するJobの数かCPUのコア数から少ない方を選択
  • ThreadまたはProcessのどちらかを選択
    • 今回はオプションでin_processesを指定したのでParallel.work_in_processesを実行

Parallel.work_in_processes

ここから先は複雑すぎて文章じゃ説明できなかったので図を描いてみた.

work_in_processes.png

1. new - JobFactory.new

まずメインスレッドでJobFactory.newをしています

parallel.rb#L230
def map(source, options = {}, &block)
  # ...
  
  job_factory = JobFactory.new(source, options[:mutex])

2. option[:size]の数だけfork - 事前にProcessを作成

事前にProcessをForkしています.
余談ですがApacheとかでPreforkとか呼ばれるものが同じようにしています.

ProcessをForkするとメモリ空間自体が共有できないのでプロセス間通信の問題があります.
Parallelではパイプを使って解決しています.

具体的にはwork_in_processes内でcreate_workersを呼び出すことで並列数分のWorkerオブジェクトを作成しています.

parallel.rb#L324
def work_in_processes(job_factory, options, &blk)
  workers = if options[:isolation]
    [] # we create workers per job and not beforehand
  else
    create_workers(job_factory, options, &blk)
  end

isolationはワーカーを事前に作らずスレッド呼び出し毎に逐次ワーカーを作成していくものです. 今回のユースケースには無いので無視します.

create_workersjob_factoryからジョブを取得しワーカーに割り当てます.
内部で具体的に何をしてるかは後で書きます.

3. newでpidと紐付け - 子プロセスを管理するworkerインスタンスの作成

workerメソッドで子プロセスのforkとWorkerオブジェクトを作成しています.

parallel.rb#L391
def worker(job_factory, options, &block)
  # ...

  pid = Process.fork do
    # ...
  end
  # ...

  Worker.new(parent_read, parent_write, pid)
end

Workerオブジェクトは親プロセスとのinoutのIOオブジェクトが渡され、親と子プロセスのデータのやり取りをしています.

データのフォーマットはRubyに用意されているMarshalを使ってdump/loadされます(図の9を参照)

5. option[:size]の数だけスレッド起動 - 親プロセスの1スレッドに対して子プロセスを割り当て

親プロセスがシングルスレッドでは子プロセスが複数あると同時に子プロセスにデータや処理の同期ができません.
そこで親プロセスをマルチスレッドにすることで1スレッドに対して子プロセスを割り当てて並列に処理できるようにしていきます.

parallel.rb#L334
UserInterruptHandler.kill_on_ctrl_c(workers.map(&:pid), options) do
  in_threads(options) do |i|
    worker = workers[i]

    # ....
  end
end

UserInterruptHandler.kill_on_ctrl_cはシグナルを送って処理に割り込む仕組みです.
以下の処理でSIGINT(Control-C)でProcessを全て殺します.

シグナル自体は:interrupt_signalオプションで変更可能です.

4
4
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
4

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?