LoginSignup
5
7

More than 5 years have passed since last update.

Parallelを使って並列処理してその結果をactiverecord-importでupdateする

Posted at

Rubyで並列処理を書くときに便利なParallelで何か処理してその結果をactiverecord-importでbulk updateする方法について。

例ではImageモデルに紐づく画像をParallelでダウンロードしてきて、Imagemagickなどで処理を施し、結果をbulk updateする場合を扱う。

threadを使う場合

必要な処理の部分だけ端折って書くとこんな感じ。

並列にしたい部分をParallelで囲むだけ。

require 'parallel'
require 'activerecord-import'

columns = [:id, :status]
Image.find_in_batches(batch_size: 1000) do |images|
  Parallel.each(images, in_threads: 4) do |image| 
    # 画像をdownloadしてrmagickなどで処理をしてどこかにアップロードする処理など
    ...

    # updateしたいカラムの更新(準備)。
    # ここでは処理済みの画像に完了フラグを立てる。
    image.status = 1
  end

  # bulk updateの実行
  values = images.map{|u| [u.id, u.status]}
  Image.import(columns, values, on_duplicate_key_update: [:status], timestamps: false, validate: false)
end

processを使う場合

threadで並列実行する際の問題点としては、マルチコア環境でコアをすべて使い切れないこと。

Rubyでthreadを使う場合はGVLがあるのでコア数分だけthreadを動かすということができないらしい。

今回の例だとダウンロードやアップロードがボトルネックになる場合はthreadでもprocessでもあまりスピード差はでない。けれど、rmagickでの処理で結構重たいことをやる場合、つまりCPUバウンドなときなどはコアをフルで使い切れるprocessを使ったほうが速度がでることが多い。

ただ、threadのときと違い、processを利用する場合はimagesを直接updateできないので少し工夫が必要になる。

require 'parallel'
require 'activerecord-import'

columns = [:id, :status]
Image.find_in_batches(batch_size: 1000) do |images|
  # mapを利用して更新した結果をprocessから受け取る
  results = Parallel.map(images, in_processes: 4) do |image| 
    # 画像をdownloadしてrmagickなどで処理をしてどこかにアップロードする処理など
    ...

    # updateしたいカラムの更新(準備)。
    # ここでは処理済みの画像に完了フラグを立てる。
    image.status = 1

    image # mapに更新したオブジェクトを返す
  end

  # bulk updateの実行
  # parallelから返された結果を利用する
  values = results.map{|u| [u.id, u.status]}
  Image.import(columns, values, on_duplicate_key_update: [:status], timestamps: false, validate: false)
end

参考

Rubyでforkを利用したマルチプロセスでコアを使い切りたい気持ち

5
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
7