0
0

More than 3 years have passed since last update.

each を遅延評価しながら複数プロセスで並行処理

Posted at

だいたい "each を遅延評価しながら複数スレッドで並行処理" のプロセス版。

require "thread"
require "thwait"

module MultiProcess
  DEFAULT_NUM_OF_WORKERS = 10

  module_function
  def each(enumerable, num_of_workers: DEFAULT_NUM_OF_WORKERS, &block)
    thwait = ThreadsWait.new

    enumerable.each do |item|
      # 子プロセスの数が上限に達していたら子プロセスの終了を待つ
      thwait.next_wait.join if thwait.threads.size >= num_of_workers

      # fork して block を実行
      cid = fork do
        block.call(item)
      end

      # 子プロセスの終了を待つスレッドを作成して ThreadsWait に登録
      thwait.join_nowait(
        Thread.new {
          _, s = Process.waitpid2(cid)
          raise "Worker process existed with #{s.exitstatus}" unless s.success?
        }
      )
    end

    # すべての子プロセスの終了を待つ
    thwait.all_waits(&:join)
    enumerable
  end
end
# 下記のコードの実行は約 10 秒で完了する
MultiProcess.each(1..100) {|i|
  sleep 1
  puts [i, Process.pid].inspect
}
# [1, 19039]
# [2, 19040]
# [3, 19041]
# ...
# [98, 19136]
# [100, 19138]
# [99, 19137]
0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0