Parallelを使って並列処理してその結果をactiverecord-importでupdateする

  • 3
    いいね
  • 0
    コメント

Rubyで並列処理を書くときに便利なParallelで何か処理してその結果をactiverecord-importでbulk updateする方法について。

例ではImageモデルに紐づく画像をParallelでダウンロードしてきて、Imagemagickなどで処理を施し、結果をbulk updateする場合を扱う。

threadを使う場合

必要な処理の部分だけ端折って書くとこんな感じ。

並列にしたい部分をParallelで囲むだけ。

require 'parallel'
require 'activerecord-import'

columns = [:id, :status]
Image.find_in_batches(batch_size: 1000) do |images|
  Parallel.each(images, in_threads: 4) do |image| 
    # 画像をdownloadしてrmagickなどで処理をしてどこかにアップロードする処理など
    ...

    # updateしたいカラムの更新(準備)。
    # ここでは処理済みの画像に完了フラグを立てる。
    image.status = 1
  end

  # bulk updateの実行
  values = images.map{|u| [u.id, u.status]}
  Image.import(columns, values, on_duplicate_key_update: [:status], timestamps: false, validate: false)
end

processを使う場合

threadで並列実行する際の問題点としては、マルチコア環境でコアをすべて使い切れないこと。

Rubyでthreadを使う場合はGVLがあるのでコア数分だけthreadを動かすということができないらしい。

今回の例だとダウンロードやアップロードがボトルネックになる場合はthreadでもprocessでもあまりスピード差はでない。けれど、rmagickでの処理で結構重たいことをやる場合、つまりCPUバウンドなときなどはコアをフルで使い切れるprocessを使ったほうが速度がでることが多い。

ただ、threadのときと違い、processを利用する場合はimagesを直接updateできないので少し工夫が必要になる。

require 'parallel'
require 'activerecord-import'

columns = [:id, :status]
Image.find_in_batches(batch_size: 1000) do |images|
  # mapを利用して更新した結果をprocessから受け取る
  results = Parallel.map(images, in_processes: 4) do |image| 
    # 画像をdownloadしてrmagickなどで処理をしてどこかにアップロードする処理など
    ...

    # updateしたいカラムの更新(準備)。
    # ここでは処理済みの画像に完了フラグを立てる。
    image.status = 1

    image # mapに更新したオブジェクトを返す
  end

  # bulk updateの実行
  # parallelから返された結果を利用する
  values = results.map{|u| [u.id, u.status]}
  Image.import(columns, values, on_duplicate_key_update: [:status], timestamps: false, validate: false)
end

参考

Rubyでforkを利用したマルチプロセスでコアを使い切りたい気持ち