SESの現場で気になるGemを読む勉強会のためにParallelを読んで見ました.
ParallelはRuby環境で手軽にマルチスレッドやマルチプロセスができるライブラリです.
コードリーディング Parallel gem (Process)も投稿しました.
対象
- Parallel
- MRI 2.3
- MacOS
ユースケース
コードを読むにあたり以下のコードがどのように実行されるか追うことにしました.
list = ['a','b','c']
log.info 'start'
Parallel.map(list, in_threads: 2) do |one_letter|
log.info one_letter
end
log.info 'finish'
実行結果は以下のようになります.
-
pid
はProcess.pid
-
tid
はThread.current.object_id
実はin_processes
も読んだけど何となく挙動は理解したものの文章にまとめられませんでした.
もし続きを書くとしたら更に詳しく読んで検証しないと記事にできないくらい複雑です.
Parallel.map
まず最初は.map
です.
OSとRubyの実装から並列方法の決定
Rubyの実装を考慮して以下のことを決定してます
- threadベースかprocessベースのメソッド
- 並列数
if RUBY_PLATFORM =~ /java/ and not options[:in_processes]
method = :in_threads
size = options[method] || processor_count
elsif options[:in_threads]
method = :in_threads
size = options[method]
else
method = :in_processes
if Process.respond_to?(:fork)
size = options[method] || processor_count
else
warn "Process.fork is not supported by this Ruby"
size = 0
end
end
JRubyでin_processes
の指定がない場合はスレッドを使用.
JavaではRubyと違ってCPUコアを同時に使えるのでThreadで十分ですね.
processor_count
はOSからCPUのコア数を取得する関数です.
Rubyの実装がプロセスのフォークに対応していない場合は警告が出ます.
処理する対象のソースから並列数を再計算
job_factory = JobFactory.new(source, options[:mutex])
size = [job_factory.size, size].min
-
JobFactory
の生成- 各配列の要素をジョブ扱うキューやEnumerableっぽいもの.
- 並列数(
size
)をジョブ数から再設定- ジョブが少ないと多くスレッドを生成しても意味ない
Parallel.work_in_threads
.work_in_threads
の中身を見ていきます.
各Threadの実行結果を記録する
results = []
results_mutex = Mutex.new
#...
in_threads() do
result = something
results_mutex.synchronize { results[index] = result }
end
jRubyの場合、MRIと違ってGVL(GIL)ではないのでスレッドセーフではありません.
そこでMutexを使って排他制御ができるようにしています.
JobFactoryが空になるまで各スレッド上で実行
in_threads(options) do |worker_num|
self.worker_number = worker_num
# as long as there are more jobs, work on one of them
while !exception && set = job_factory.next
begin
item, index = set
result = with_instrumentation item, index, options do
call_with_index(item, index, options, &block)
end
results_mutex.synchronize { results[index] = result }
rescue StandardError => e
exception = e
end
end
end
-
in_threads
のブロック内は生成されたThread上で実行される -
job_factory.next
でjobが尽きるまでループ-
StandardError
がrescueされた時に例外をメモしてループを抜ける -
with_instrumentation
はoptionsのstart
とfinish
の実行をする -
call_with_index
は基本的には渡されたブロックを実行している
-
processor_count
processor_count
は色々な環境のCPU数を取得する関数です.
手元のmacOS(Darwin)で確認してみと自分の環境では4コアのCPUのハズ何ですが何故か8コア扱い.
おそらくIntelのCPUなのでハイパースレッディング・テクノロジーよる恩恵でしょう.
$ /usr/sbin/sysctl -n hw.ncpu
8