目的
to_a できないが each できるリストについて、複数スレッドで処理したい。
具体的なユースケースは AWS::S3::ObjectCollection の処理で、これは each すると 1000 回ごとに AWS の API を呼び出す。to_a すると、いちどきに連続して API を呼び出し、巨大な配列をつくってしまうので避けたい。
だめだった方法
- parallel gem : 内部で最初に to_a しているため、each は遅延処理されず、すべてのオブジェクトを入れる配列 (+結果を入れる配列) が必要になる。
- Enumerator / Fiber : スレッドをまたいで使えない
解決方法
標準ライブラリ thread で追加される SizedQueue をつかう。これは固定サイズのキューで、複数スレッドから扱うことができ、また値の追加時にキューがいっぱいなら待ってくれる。
これを利用し、サブスレッドはキューから次に処理するオブジェクトを取得して処理、メインスレッドは each で得たオブジェクトを (サブスレッドの処理を待ちつつ) キューに追加、というようにする。
実装例
require "thread"
TERMINATOR = Object.new # キューの終わりを示すオブジェクト
COUNT = 10 # 処理するスレッド数
def each_in_multi_threads(enumerable)
queue = SizedQueue.new(COUNT)
threads = COUNT.times.map{
Thread.new {
while true
v = queue.shift
break if v.equal?(TERMINATOR) # TERMINATOR だったらこのスレッドの仕事は終わり
yield v
end
}
}
enumerable.each{|v| queue << v }
COUNT.times{ queue << TERMINATOR } # キューの最後にすべてのスレッド分 TERMINATOR を追加
threads.each(&:join)
enumerable
end
enumerable = 1..(1.0 / 0) # 無限リスト
each_in_multi_threads(enumerable) do |v|
p v
sleep 1 # 時間のかかる処理
end
同じ仕組みで each_with_index や map も可能 (map はどうしても結果の配列を作ることになるけど)。
parallel のように fork 使って複数プロセスで処理できるといいんだけど、Marshal.dump して子プロセスに渡す必要があるから、ちょっと用途が限られてしまう。