LoginSignup
2

ActiveJobの実行エンジンにSuckerPunchを使いジョブクラス毎にスレッドプールをわける

Last updated at Posted at 2018-12-11

背景

Ruby or Railsでは、長時間掛かるジョブを非同期に実行するためのインタフェースとしてActiveJobがあります。ActiveJobはQueueAdapterを切り替えることで、ジョブの実行エンジンを変更することができます。

その中のひとつに Sucker Punch があります。Sucker Punchは、非同期処理をスレッドで実行します。Ruby on Railsのプロセスと別にジョブ実行のためのプロセスを起動する必要がありません。Heroku等のPaaSで動かすときに便利な特徴です。制約もあります。ジョブをDB等で永続化しません。ジョブ実行中にRuby on Railsを再起動すると、実行中のジョブは単に中断され、再起動後にはリトライしません。

  • ジョブ実行プロセスを起動したくない
  • ジョブのリトライが不要

の場合、Sucker Punchが実行エンジンの選択肢になります1

問題

Sucker Punch を単体で使う場合、ジョブのキューはジョブクラス毎に分かれます。ジョブクラス単位でジョブを実行するスレッドプールが用意されます。これがキューとして働きます。

もし、1つのキューに遅いジョブと速いジョブが混ざると、速いジョブの実行が遅いジョブの完了を待つことがあります。開始すればすぐ終わるジョブが、キューが詰まって、時間が掛かる現象が発生します。ジョブクラス単位でキューが分かれているため、これを回避できます。

ActiveJobのQueueAdapterを経由してSucker Punchを使うと、すべてのジョブが一つのスレッドプールで実行されます。AsyncAdapter2を使うのと同じ動きになります3

ActiveJobのQueueAdapterから、Sucker Punchを使用しかつ、ジョブクラス毎のキューで動かす方法はないでしょうか?

問題の原因

Sucker Punchのジョブはperform_asyncというメソッドで開始します。

https://github.com/brandonhilkert/sucker_punch/blob/v3.1.0/lib/sucker_punch/job.rb#L35-L39

def perform_async(*args)
  return unless SuckerPunch::RUNNING.true?
  queue = SuckerPunch::Queue.find_or_create(self.to_s, num_workers, num_jobs_max)
  queue.post { __run_perform(*args) }
end

この中で、ジョブのクラス名(self.to_s)に基づいて、ジョブを実行するSuckerPunch::Queue からスレッドプールを探します。

一方、ActiveJobからSucker Punchを呼び出すため、SuckerPunchAdapterはすべてのジョブをJobWrapperクラスでラップします。

https://github.com/rails/rails/blob/7-1-stable/activejob/lib/active_job/queue_adapters/sucker_punch_adapter.rb#L40-L46

class JobWrapper #:nodoc:
  include SuckerPunch::Job

  def perform(job_data)
    Base.execute job_data
  end
end

このためself.to_sは常にJobWrapperになります。Sucker Punchは、全てのジョブを JobWrapper クラスのジョブと判定し、同じスレッドプールを使います。

なぜ、SuckerPunchAdapterはジョブをJobWrapperクラスでラップしているのでしょうか?ActiveJobのインタフェースを守るためです。ActiveJobのジョブはActiveJob::Base.executeを実行して、ジョブを開始します。そうしなければ、コールバックが使えません。また、ジョブの開始・終了のログも出力されません。

https://github.com/rails/rails/blob/7-1-stable/activejob/lib/active_job/execution.rb#L27-L32

def execute(job_data) #:nodoc:
  ActiveJob::Callbacks.run_callbacks(:execute) do
    job = deserialize(job_data)
    job.perform_now
  end
end

解決方法

self.to_sの代わりに、:job_classを使います。

SuckerPunch::Job.perform_async

SuckerPunch::Job.perform_asyncを修正します。

def perform_async(*args)
  return unless SuckerPunch::RUNNING.true?

  name = args.first[:job_class] # 第一引数からJobのクラス名を取得する

  queue = SuckerPunch::Queue.find_or_create(name, num_workers, num_jobs_max)
  queue.post { __run_perform(*args) }
end

複数の引数を受け取るために、引数は *args です。ここから第一引数を取得します。これはジョブの情報を表すハッシュです。:job_classキーで、ジョブのクラス名が取得出来ます。これを使ってスレッドプールを取得します。

モンキーパッチ

前述のSuckerPunch::Job.perform_asyncをモンキーパッチとして実装します。

config/initializers/sucker_punch_patches.rb
Rails.application.config.after_initialize do
  SuckerPunch::Job::ClassMethods.prepend(SuckerPunchJobExtensions)
end

app/libディレクトリ配下のクラスを読み込むためにRails.application.config.after_initializeを使います。
prependを使ってパッチを適用します。

app/lib/sucker_punch_job_extensions.rb
module SuckerPunchJobExtensions
  def perform_async(*args)
    return unless SuckerPunch::RUNNING.true?

    name = args.first[:job_class] # 第一引数からJobのクラス名を取得する

    queue = SuckerPunch::Queue.find_or_create(name, num_workers, num_jobs_max)
    queue.post { __run_perform(*args) }
  end
end

上書きしたいメソッドを定義したモジュールをapp/libに配置してあります。

  1. AsyncAdapterもスレッドで実行されます。選択肢になります。

  2. ActiveJobのデフォルトのQueueAdapter

  3. AsyncAdapterとSucker Punchはどちらも、Concurrent::ThreadPoolExecutor 上で実装されています。キューをわける機能がなければ、ほとんど同じに思えます。どういうときに、この記事のようなパッチを当てずに、ActiveJobからSucker Punchを使いたいのでしょうか?

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
2