Rubyで並列処理を書くときに便利なParallelで何か処理してその結果をactiverecord-importでbulk updateする方法について。
例ではImageモデルに紐づく画像をParallelでダウンロードしてきて、Imagemagickなどで処理を施し、結果をbulk updateする場合を扱う。
threadを使う場合
必要な処理の部分だけ端折って書くとこんな感じ。
並列にしたい部分をParallelで囲むだけ。
require 'parallel'
require 'activerecord-import'
columns = [:id, :status]
Image.find_in_batches(batch_size: 1000) do |images|
Parallel.each(images, in_threads: 4) do |image|
# 画像をdownloadしてrmagickなどで処理をしてどこかにアップロードする処理など
...
# updateしたいカラムの更新(準備)。
# ここでは処理済みの画像に完了フラグを立てる。
image.status = 1
end
# bulk updateの実行
values = images.map{|u| [u.id, u.status]}
Image.import(columns, values, on_duplicate_key_update: [:status], timestamps: false, validate: false)
end
processを使う場合
threadで並列実行する際の問題点としては、マルチコア環境でコアをすべて使い切れないこと。
Rubyでthreadを使う場合はGVLがあるのでコア数分だけthreadを動かすということができないらしい。
今回の例だとダウンロードやアップロードがボトルネックになる場合はthreadでもprocessでもあまりスピード差はでない。けれど、rmagickでの処理で結構重たいことをやる場合、つまりCPUバウンドなときなどはコアをフルで使い切れるprocessを使ったほうが速度がでることが多い。
ただ、threadのときと違い、processを利用する場合はimagesを直接updateできないので少し工夫が必要になる。
require 'parallel'
require 'activerecord-import'
columns = [:id, :status]
Image.find_in_batches(batch_size: 1000) do |images|
# mapを利用して更新した結果をprocessから受け取る
results = Parallel.map(images, in_processes: 4) do |image|
# 画像をdownloadしてrmagickなどで処理をしてどこかにアップロードする処理など
...
# updateしたいカラムの更新(準備)。
# ここでは処理済みの画像に完了フラグを立てる。
image.status = 1
image # mapに更新したオブジェクトを返す
end
# bulk updateの実行
# parallelから返された結果を利用する
values = results.map{|u| [u.id, u.status]}
Image.import(columns, values, on_duplicate_key_update: [:status], timestamps: false, validate: false)
end