Posted at

Parallelを使って並列処理してその結果をactiverecord-importでupdateする

More than 3 years have passed since last update.

Rubyで並列処理を書くときに便利なParallelで何か処理してその結果をactiverecord-importでbulk updateする方法について。

例ではImageモデルに紐づく画像をParallelでダウンロードしてきて、Imagemagickなどで処理を施し、結果をbulk updateする場合を扱う。


threadを使う場合

必要な処理の部分だけ端折って書くとこんな感じ。

並列にしたい部分をParallelで囲むだけ。

require 'parallel'

require 'activerecord-import'

columns = [:id, :status]
Image.find_in_batches(batch_size: 1000) do |images|
Parallel.each(images, in_threads: 4) do |image|
# 画像をdownloadしてrmagickなどで処理をしてどこかにアップロードする処理など
...

# updateしたいカラムの更新(準備)。
# ここでは処理済みの画像に完了フラグを立てる。
image.status = 1
end

# bulk updateの実行
values = images.map{|u| [u.id, u.status]}
Image.import(columns, values, on_duplicate_key_update: [:status], timestamps: false, validate: false)
end


processを使う場合

threadで並列実行する際の問題点としては、マルチコア環境でコアをすべて使い切れないこと。

Rubyでthreadを使う場合はGVLがあるのでコア数分だけthreadを動かすということができないらしい。

今回の例だとダウンロードやアップロードがボトルネックになる場合はthreadでもprocessでもあまりスピード差はでない。けれど、rmagickでの処理で結構重たいことをやる場合、つまりCPUバウンドなときなどはコアをフルで使い切れるprocessを使ったほうが速度がでることが多い。

ただ、threadのときと違い、processを利用する場合はimagesを直接updateできないので少し工夫が必要になる。

require 'parallel'

require 'activerecord-import'

columns = [:id, :status]
Image.find_in_batches(batch_size: 1000) do |images|
# mapを利用して更新した結果をprocessから受け取る
results = Parallel.map(images, in_processes: 4) do |image|
# 画像をdownloadしてrmagickなどで処理をしてどこかにアップロードする処理など
...

# updateしたいカラムの更新(準備)。
# ここでは処理済みの画像に完了フラグを立てる。
image.status = 1

image # mapに更新したオブジェクトを返す
end

# bulk updateの実行
# parallelから返された結果を利用する
values = results.map{|u| [u.id, u.status]}
Image.import(columns, values, on_duplicate_key_update: [:status], timestamps: false, validate: false)
end


参考

Rubyでforkを利用したマルチプロセスでコアを使い切りたい気持ち