だいたい "each を遅延評価しながら複数スレッドで並行処理" のプロセス版。
require "thread"
require "thwait"
module MultiProcess
DEFAULT_NUM_OF_WORKERS = 10
module_function
def each(enumerable, num_of_workers: DEFAULT_NUM_OF_WORKERS, &block)
thwait = ThreadsWait.new
enumerable.each do |item|
# 子プロセスの数が上限に達していたら子プロセスの終了を待つ
thwait.next_wait.join if thwait.threads.size >= num_of_workers
# fork して block を実行
cid = fork do
block.call(item)
end
# 子プロセスの終了を待つスレッドを作成して ThreadsWait に登録
thwait.join_nowait(
Thread.new {
_, s = Process.waitpid2(cid)
raise "Worker process existed with #{s.exitstatus}" unless s.success?
}
)
end
# すべての子プロセスの終了を待つ
thwait.all_waits(&:join)
enumerable
end
end
# 下記のコードの実行は約 10 秒で完了する
MultiProcess.each(1..100) {|i|
sleep 1
puts [i, Process.pid].inspect
}
# [1, 19039]
# [2, 19040]
# [3, 19041]
# ...
# [98, 19136]
# [100, 19138]
# [99, 19137]