LoginSignup
9
13

More than 5 years have passed since last update.

コードリーディング Parallel gem (Thread)

Last updated at Posted at 2016-10-16

SESの現場で気になるGemを読む勉強会のためにParallelを読んで見ました.

ParallelはRuby環境で手軽にマルチスレッドやマルチプロセスができるライブラリです.

コードリーディング Parallel gem (Process)も投稿しました.

対象

ユースケース

コードを読むにあたり以下のコードがどのように実行されるか追うことにしました.

list = ['a','b','c']
log.info 'start'
Parallel.map(list, in_threads: 2) do |one_letter|
  log.info one_letter
end
log.info 'finish'

実行結果は以下のようになります.

  • pidProcess.pid
  • tidThread.current.object_id

parallel_threads.png

実はin_processesも読んだけど何となく挙動は理解したものの文章にまとめられませんでした.
もし続きを書くとしたら更に詳しく読んで検証しないと記事にできないくらい複雑です.

Parallel.map

まず最初は.mapです.

OSとRubyの実装から並列方法の決定

Rubyの実装を考慮して以下のことを決定してます

  • threadベースかprocessベースのメソッド
  • 並列数
parallel.rb#L214
if RUBY_PLATFORM =~ /java/ and not options[:in_processes]
  method = :in_threads
  size = options[method] || processor_count
elsif options[:in_threads]
  method = :in_threads
  size = options[method]
else
  method = :in_processes
  if Process.respond_to?(:fork)
    size = options[method] || processor_count
  else
    warn "Process.fork is not supported by this Ruby"
    size = 0
  end
end

JRubyでin_processesの指定がない場合はスレッドを使用.
JavaではRubyと違ってCPUコアを同時に使えるのでThreadで十分ですね.

processor_countはOSからCPUのコア数を取得する関数です.

Rubyの実装がプロセスのフォークに対応していない場合は警告が出ます.

処理する対象のソースから並列数を再計算

parallel.rb#L230
job_factory = JobFactory.new(source, options[:mutex])
size = [job_factory.size, size].min
  • JobFactoryの生成
    • 各配列の要素をジョブ扱うキューやEnumerableっぽいもの.
  • 並列数(size)をジョブ数から再設定
    • ジョブが少ないと多くスレッドを生成しても意味ない

Parallel.work_in_threads

.work_in_threadsの中身を見ていきます.

各Threadの実行結果を記録する

results = []
results_mutex = Mutex.new

#...

in_threads() do
  result = something
  results_mutex.synchronize { results[index] = result }
end  

jRubyの場合、MRIと違ってGVL(GIL)ではないのでスレッドセーフではありません.
そこでMutexを使って排他制御ができるようにしています.

JobFactoryが空になるまで各スレッド上で実行

in_threads(options) do |worker_num|
  self.worker_number = worker_num
  # as long as there are more jobs, work on one of them
  while !exception && set = job_factory.next
    begin
      item, index = set
      result = with_instrumentation item, index, options do
        call_with_index(item, index, options, &block)
      end
      results_mutex.synchronize { results[index] = result }
    rescue StandardError => e
      exception = e
    end
  end
end
  • in_threadsのブロック内は生成されたThread上で実行される
  • job_factory.nextでjobが尽きるまでループ
    • StandardErrorがrescueされた時に例外をメモしてループを抜ける
    • with_instrumentationはoptionsのstartfinishの実行をする
    • call_with_indexは基本的には渡されたブロックを実行している

processor_count

processor_countは色々な環境のCPU数を取得する関数です.
手元のmacOS(Darwin)で確認してみと自分の環境では4コアのCPUのハズ何ですが何故か8コア扱い.
おそらくIntelのCPUなのでハイパースレッディング・テクノロジーよる恩恵でしょう.

$ /usr/sbin/sysctl -n hw.ncpu
8
9
13
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
9
13