背景
Ruby or Railsでは、長時間掛かるジョブを非同期に実行するためのインタフェースとしてActiveJobがあります。ActiveJobはQueueAdapterを切り替えることで、ジョブの実行エンジンを変更することができます。
その中のひとつに Sucker Punch があります。Sucker Punchは、非同期処理をスレッドで実行します。Ruby on Railsのプロセスと別にジョブ実行のためのプロセスを起動する必要がありません。Heroku等のPaaSで動かすときに便利な特徴です。制約もあります。ジョブをDB等で永続化しません。ジョブ実行中にRuby on Railsを再起動すると、実行中のジョブは単に中断され、再起動後にはリトライしません。
- ジョブ実行プロセスを起動したくない
- ジョブのリトライが不要
の場合、Sucker Punchが実行エンジンの選択肢になります1
問題
Sucker Punch を単体で使う場合、ジョブのキューはジョブクラス毎に分かれます。ジョブクラス単位でジョブを実行するスレッドプールが用意されます。これがキューとして働きます。
もし、1つのキューに遅いジョブと速いジョブが混ざると、速いジョブの実行が遅いジョブの完了を待つことがあります。開始すればすぐ終わるジョブが、キューが詰まって、時間が掛かる現象が発生します。ジョブクラス単位でキューが分かれているため、これを回避できます。
ActiveJobのQueueAdapterを経由してSucker Punchを使うと、すべてのジョブが一つのスレッドプールで実行されます。AsyncAdapter2を使うのと同じ動きになります3。
ActiveJobのQueueAdapterから、Sucker Punchを使用しかつ、ジョブクラス毎のキューで動かす方法はないでしょうか?
問題の原因
Sucker Punchのジョブはperform_async
というメソッドで開始します。
https://github.com/brandonhilkert/sucker_punch/blob/v3.1.0/lib/sucker_punch/job.rb#L35-L39
def perform_async(*args)
return unless SuckerPunch::RUNNING.true?
queue = SuckerPunch::Queue.find_or_create(self.to_s, num_workers, num_jobs_max)
queue.post { __run_perform(*args) }
end
この中で、ジョブのクラス名(self.to_s
)に基づいて、ジョブを実行するSuckerPunch::Queue からスレッドプールを探します。
一方、ActiveJobからSucker Punchを呼び出すため、SuckerPunchAdapterはすべてのジョブをJobWrapper
クラスでラップします。
class JobWrapper #:nodoc:
include SuckerPunch::Job
def perform(job_data)
Base.execute job_data
end
end
このためself.to_s
は常にJobWrapper
になります。Sucker Punchは、全てのジョブを JobWrapper
クラスのジョブと判定し、同じスレッドプールを使います。
なぜ、SuckerPunchAdapterはジョブをJobWrapper
クラスでラップしているのでしょうか?ActiveJobのインタフェースを守るためです。ActiveJobのジョブはActiveJob::Base.execute
を実行して、ジョブを開始します。そうしなければ、コールバックが使えません。また、ジョブの開始・終了のログも出力されません。
https://github.com/rails/rails/blob/7-1-stable/activejob/lib/active_job/execution.rb#L27-L32
def execute(job_data) #:nodoc:
ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
end
end
解決方法
self.to_s
の代わりに、:job_class
を使います。
SuckerPunch::Job.perform_async
SuckerPunch::Job.perform_async
を修正します。
def perform_async(*args)
return unless SuckerPunch::RUNNING.true?
name = args.first[:job_class] # 第一引数からJobのクラス名を取得する
queue = SuckerPunch::Queue.find_or_create(name, num_workers, num_jobs_max)
queue.post { __run_perform(*args) }
end
複数の引数を受け取るために、引数は *args
です。ここから第一引数を取得します。これはジョブの情報を表すハッシュです。:job_class
キーで、ジョブのクラス名が取得出来ます。これを使ってスレッドプールを取得します。
モンキーパッチ
前述のSuckerPunch::Job.perform_async
をモンキーパッチとして実装します。
Rails.application.config.after_initialize do
SuckerPunch::Job::ClassMethods.prepend(SuckerPunchJobExtensions)
end
app/lib
ディレクトリ配下のクラスを読み込むためにRails.application.config.after_initialize
を使います。
prepend
を使ってパッチを適用します。
module SuckerPunchJobExtensions
def perform_async(*args)
return unless SuckerPunch::RUNNING.true?
name = args.first[:job_class] # 第一引数からJobのクラス名を取得する
queue = SuckerPunch::Queue.find_or_create(name, num_workers, num_jobs_max)
queue.post { __run_perform(*args) }
end
end
上書きしたいメソッドを定義したモジュールをapp/lib
に配置してあります。