Ruby
Rails
ActiveJob
suckerpunch

Ruby on Rails 5 のActiveJobの実行エンジンにSuckerPunchを使うときにキュー単位の優先制御を有効にする


背景

Ruby or Railsでは、長時間掛かるジョブを非同期に実行するためのインタフェースとしてActiveJobを提供しています。ActiveJobはQueueAdapterを切り替えることで、ジョブの実行エンジンを変更することができます。

その中で特徴のある実行エンジンにSucker Punchがあります。Sucker Punchは、非同期処理をスレッドで実行します。このためジョブ実行のためのプロセスをRuby on Railsのプロセスと分ける必要がありません。これはHeroku等のSaasで動かすときに便利な特徴です。一方で、制約もあります。ジョブをDB等で永続化しないため、ジョブ実行中にRuby on Railsを再起動すると、実行中のジョブは強制的に中断され、再起動後には自動的にはリトライされません。


  • ジョブプロセスを分けたくない

  • ジョブのリトライが不要

な場合は、Sucker PunchがAsyncAdapterの実行エンジンの選択肢になります。


問題

Sucker Punchは単体で使う場合は、ジョブのキューはジョブ毎に分かれて管理されています。厳密に言うと、ジョブ毎にジョブを実行するための独立したスレッドプールを用意します。

これにより、一つのキューに遅いジョブと速いジョブが混ざることがなくなります。遅いジョブと速いジョブが混ざると速いジョブが遅いジョブに待たされることがあります。開始されれば、すぐ終わるはずのジョブが、キューが詰まって開始されないため、終了まで時間が掛かる現象が発生します。これを回避できます。

しかし、ActiveJobのQueueAdapterを経由してSucker Punchを使うと、すべてのジョブが一つのスレッドプールで実行されます。これではAsyncAdapter(ActiveJobのデフォルトのQueueAdapter)を使うのと変わりがありません。ActiveJob::QueueAdaptersにも、Sucker PunchのPrioritiesNoと書かれています。

Sucker PunchをActiveJobのQueueAdapterを経由して使用しつつ、ジョブ別のキューで動かす方法はないでしょうか?


問題の原因

Sucker Punchのジョブはperform_asyncというメソッドで開始します。

def perform_async(*args)

return unless SuckerPunch::RUNNING.true?
queue = SuckerPunch::Queue.find_or_create(self.to_s, num_workers, num_jobs_max)
queue.post(args) { |job_args| __run_perform(*job_args) }
end

https://github.com/brandonhilkert/sucker_punch/blob/v2.1.1/lib/sucker_punch/job.rb#L35-L39

この中で、ジョブのクラス名(self.to_s)に基づいて、ジョブを実行するためのSuckerPunch::Queue(スレッドプール)を探します。

一方、SuckerPunchAdapter(ActiveJobのQueueAdapter)はすべてのジョブをJobWrapperクラスでラップしています。

class JobWrapper #:nodoc:

include SuckerPunch::Job

def perform(job_data)
Base.execute job_data
end
end

https://github.com/rails/rails/blob/5-2-2/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb#L40-L46

このためSucker Punchは、どのジョブも一種類のJobWrapperクラスのジョブであると判定します。

ではなぜ、SuckerPunchAdapterはジョブをJobWrapperクラスでラップしているのでしょうか?ActiveJobのジョブインタフェースを守るためです。ActiveJobのジョブはActiveJob::Base.executeを実行して、ジョブを開始する必要があります。

def execute(job_data) #:nodoc:

ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
end
end

https://github.com/rails/rails/blob/5-2-2/activejob/lib/active_job/execution.rb#L21-L26

そうしなければ、コールバックが使えませんし、ジョブの開始と終了のログも出力されません。


解決方法

ActiveJobとSuckerPunchのジョブのインタフェースのミスマッチを解消します。


SuckerPunchAdapter.enqueu

まずSuckerPunchAdapter.enqueuを次のように修正します。

class SuckerPunchAdapter

def enqueue job
job.queue_name = job.class.to_s
job.class.perform_async { Base.execute job.serialize }
end
end

ジョブをJobWapperクラスでラップするのをやめます。代わりにActiveJob::Base.executeを実行するブロックを渡します。job.queue_nameにジョブのクラス名を設定しているのは、ログに正しいジョブ名を表示するためです。


SuckerPunch::Job.perform_async

次にSuckerPunch::Job.perform_asyncqueue.postのブロックを修正します。

def perform_async *_args, &block

return unless SuckerPunch::RUNNING.true?
queue = SuckerPunch::Queue.find_or_create to_s, num_workers, num_jobs_max
queue.post { __run_perform(&block) }
end

__run_perform(*job_args)__run_performメソッドにジョブの引数を渡すのをやめ、__run_perform(&block)のように引数のブロックをそのまま渡します。

perform_asyncも修正します。

def __run_perform

SuckerPunch::Counter::Busy.new(to_s).increment
result = yield
SuckerPunch::Counter::Processed.new(to_s).increment
result
rescue StandardError => ex
SuckerPunch::Counter::Failed.new(to_s).increment
SuckerPunch.exception_handler.call ex, self, args
ensure
SuckerPunch::Counter::Busy.new(to_s).decrement
end

result = self.new.perform(*args)result = yieldに置き換えています。修正前のselfにはJobWrapperクラスが入っています。JobWrapperクラスをインスタス化して、performメソッドを呼んでいました。

ブロックの実行に置き換えます。このブロックには

Base.execute job.serialize

が、入っているます。ActiveJob::Base.executeが実行できます。


ApplicationJob

また、この修正ではJobをperform_asyncメソッドで起動します。ActiveJobのクラスにSuckerPunch::Jobをinculdeします。すべてのActiveJobクラスでの実行エンジンにSucker Punchを使う場合は、次のようにApplicationJobでSuckerPunch::Jobをinculdeしても良いです。

class ApplicationJob < ActiveJob::Base

include SuckerPunch::Job
end


パッチ

次の3つのファイルが必要です。


app/jobs/application_job.rb

class ApplicationJob < ActiveJob::Base

include SuckerPunch::Job
end


app/lib/active_job/queue_adapters/sucker_punch_adapter.rb

module ActiveJob

module QueueAdapters
class SuckerPunchAdapter
def enqueue job
job.queue_name = job.class.to_s
job.class.perform_async { Base.execute job.serialize }
end
end
end
end


app/lib/sucker_punch/job/class_methods.rb

module SuckerPunch

module Job
module ClassMethods
def perform_async *_args, &block
return unless SuckerPunch::RUNNING.true?

queue = SuckerPunch::Queue.find_or_create to_s, num_workers, num_jobs_max
queue.post { __run_perform(&block) }
end

def __run_perform
SuckerPunch::Counter::Busy.new(to_s).increment
result = yield
SuckerPunch::Counter::Processed.new(to_s).increment
result
rescue StandardError => ex
SuckerPunch::Counter::Failed.new(to_s).increment
SuckerPunch.exception_handler.call ex, self, args
ensure
SuckerPunch::Counter::Busy.new(to_s).decrement
end
end
end
end