LoginSignup
33
32

More than 5 years have passed since last update.

each を遅延評価しながら複数スレッドで並行処理

Last updated at Posted at 2012-10-17

目的

to_a できないが each できるリストについて、複数スレッドで処理したい。

具体的なユースケースは AWS::S3::ObjectCollection の処理で、これは each すると 1000 回ごとに AWS の API を呼び出す。to_a すると、いちどきに連続して API を呼び出し、巨大な配列をつくってしまうので避けたい。

だめだった方法

  • parallel gem : 内部で最初に to_a しているため、each は遅延処理されず、すべてのオブジェクトを入れる配列 (+結果を入れる配列) が必要になる。
  • Enumerator / Fiber : スレッドをまたいで使えない

解決方法

標準ライブラリ thread で追加される SizedQueue をつかう。これは固定サイズのキューで、複数スレッドから扱うことができ、また値の追加時にキューがいっぱいなら待ってくれる。

これを利用し、サブスレッドはキューから次に処理するオブジェクトを取得して処理、メインスレッドは each で得たオブジェクトを (サブスレッドの処理を待ちつつ) キューに追加、というようにする。

実装例

require "thread"

TERMINATOR = Object.new # キューの終わりを示すオブジェクト
COUNT = 10 # 処理するスレッド数

def each_in_multi_threads(enumerable)
  queue = SizedQueue.new(COUNT)
  threads = COUNT.times.map{
    Thread.new {
      while true
        v = queue.shift
        break if v.equal?(TERMINATOR) # TERMINATOR だったらこのスレッドの仕事は終わり
        yield v
      end
    }  
  }
  enumerable.each{|v| queue << v }
  COUNT.times{ queue << TERMINATOR } # キューの最後にすべてのスレッド分 TERMINATOR を追加
  threads.each(&:join)
  enumerable
end

enumerable = 1..(1.0 / 0) # 無限リスト
each_in_multi_threads(enumerable) do |v|
  p v
  sleep 1 # 時間のかかる処理
end

同じ仕組みで each_with_index や map も可能 (map はどうしても結果の配列を作ることになるけど)。

parallel のように fork 使って複数プロセスで処理できるといいんだけど、Marshal.dump して子プロセスに渡す必要があるから、ちょっと用途が限られてしまう。

33
32
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
33
32