背景
Ruby on Rails 4.2 の ActiveJob::QueueAdapter(以下、QueueAdapterと記述) はデフォルトではInlineAdapterです1。ジョブは必ず同期的に実行されます。あるHTTPリクエストでジョブを実行すると、perform_later
メソッドを使ってジョブを呼び出しても、同期的に実行します。そのリクエストのレスポンスは、ジョブの実行完了後に帰ります。
問題
UIの動作を確認したいときにジョブが同期的に実行されると画面の使い勝手を想像しにくくなります。ActiveJob を使うのは、時間の掛かる処理をバックエンドで実行し、画面遷移をすばやく完了させるためです。想定する画面の使い勝手は、ActiveJob を使ってジョブを登録し、即座に次の画面に遷移することです。これをデフォルトの環境では、確認できません。
また Rails4.2 には AsyncAdapter の実装が含まれていません2。そのためconfig/applcation.rb
ファイルに以下の記述を追加するだけでは使えません。
config.active_job.queue_adapter = :async
Delayed::JobやSidekiqなどのQueueAdapterを使うことも考えられますが、これらを使うには次のような作業が必要です。
- 新しいGemのインストール
- DBのマイグレーション
- Redisやジョブプロセスの追加
「画面の使い勝手を確認」の実現に対して、必要なアプリケーションとアーキテクチャの変更が大き過ぎます。
解決策
Rails5.2 の AsyncAdapter3 を移植します。
Rails4と5のActiveJobの変更点
Rails5.0 のリリースノート4に
Add ability to configure the queue adapter on a per job basis without affecting each other.
とあるように、ジョブクラスごとに QueueAdapter を変更できるようになりました。これに伴い、ActiveJobのインタフェースにいくつかの変更点があるため、Rails5.2 の AsyncAdapter をそのままコピーしても動きません。
動的なQueueAdapter
ジョブごとに異なる QueueAdapter を使うため、Rails は QueueAdapter は静的にクラスとして使うのをやめ、インスタンスを生成して使うようになりました。以前は QueueAdapter はインタフェースをクラスメソッドで提供し、Railsはクラスメソッドを使っていました。
AsyncAdapter のメソッドをclass << self
でくくり、クラスメソッドに変更します。
provider_job_idの追加
ジョブから、ジョブを生成した QueueAdapter インスタンスを判別できるように、ActiveJob::Base#provider_job_id
が追加されました。
job.provider_job_id = SecureRandom.uuid
としている部分です5。
修正したソースコード
変更後のソースコードは次の通りです。
require 'concurrent/scheduled_task'
require 'concurrent/executor/thread_pool_executor'
require 'concurrent/utility/processor_counter'
module ActiveJob
module QueueAdapters
# == Active Job Async adapter
#
# The Async adapter runs jobs with an in-process thread pool.
#
# This is the default queue adapter. It's well-suited for dev/test since
# it doesn't need an external infrastructure, but it's a poor fit for
# production since it drops pending jobs on restart.
#
# To use this adapter, set queue adapter to +:async+:
#
# config.active_job.queue_adapter = :async
#
# To configure the adapter's thread pool, instantiate the adapter and
# pass your own config:
#
# config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
# min_threads: 1,
# max_threads: 2 * Concurrent.processor_count,
# idletime: 600.seconds
#
# The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
# jobs. Since jobs share a single thread pool, long-running jobs will block
# short-lived jobs. Fine for dev/test; bad for production.
class AsyncAdapter
class << self
# See {Concurrent::ThreadPoolExecutor}[http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options.
def initialize(**executor_options)
@scheduler = Scheduler.new(**executor_options)
end
def enqueue(job) #:nodoc:
@scheduler ||= Scheduler.new
@scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
end
def enqueue_at(job, timestamp) #:nodoc:
@scheduler ||= Scheduler.new
@scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
end
# Gracefully stop processing jobs. Finishes in-progress work and handles
# any new jobs following the executor's fallback policy (`caller_runs`).
# Waits for termination by default. Pass `wait: false` to continue.
def shutdown(wait: true) #:nodoc:
@scheduler ||= Scheduler.new
@scheduler.shutdown wait: wait
end
# Used for our test suite.
def immediate=(immediate) #:nodoc:
@scheduler ||= Scheduler.new
@scheduler.immediate = immediate
end
end
# Note that we don't actually need to serialize the jobs since we're
# performing them in-process, but we do so anyway for parity with other
# adapters and deployment environments. Otherwise, serialization bugs
# may creep in undetected.
class JobWrapper #:nodoc:
def initialize(job)
pp job
@job_data = job.serialize
end
def perform
Base.execute @job_data
end
end
class Scheduler #:nodoc:
DEFAULT_EXECUTOR_OPTIONS = {
min_threads: 0,
max_threads: Concurrent.processor_count,
auto_terminate: true,
idletime: 60, # 1 minute
max_queue: 0, # unlimited
fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
}.freeze
attr_accessor :immediate
def initialize(**options)
self.immediate = false
@immediate_executor = Concurrent::ImmediateExecutor.new
@async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
end
def enqueue(job, queue_name:)
executor.post(job, &:perform)
end
def enqueue_at(job, timestamp, queue_name:)
delay = timestamp - Time.current.to_f
if delay > 0
Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
else
enqueue(job, queue_name: queue_name)
end
end
def shutdown(wait: true)
@async_executor.shutdown
@async_executor.wait_for_termination if wait
end
def executor
immediate ? @immediate_executor : @async_executor
end
end
end
end
end
このソースコードをapp/lib/active_job/queue_adapters/async_adapter.rb
ファイルとして保存します。
注意
config/applcation.rb
ファイルに以下の記述を追加して、使用する QueueAdapter を変更する必要があります。
config.active_job.queue_adapter = :async
その他の考えられる対応策
Rails4.2 の ActiveJob のジョブを非同期に実行するには、次の方法も考えられます。
- QueueAdapter に Sucker Punchを使う
- Rails のバージョンを
5.0.7.1
以上に上げる
Sucker Punch は AsyncAdapter と同様のスレッドベースの QueueAdapter です。AsyncAdapter と同様にアプリケーションの構成を大きく変えずにジョブの非同期実行が実現できそうです。こちらは試していません。
参考
-
Rails 5 changed Active Job default adapter from Inline to Async | BigBinary Blog ↩
-
https://github.com/rails/rails/tree/4-2-stable/activejob/lib/active_job/queue_adapters ↩
-
https://github.com/rails/rails/blob/5-2-1/activejob/lib/active_job/queue_adapters/async_adapter.rb ↩
-
https://guides.rubyonrails.org/5_0_release_notes.html#active-job ↩
-
https://github.com/rails/rails/blob/5-2-1/activejob/lib/active_job/queue_adapters/async_adapter.rb#L65 ↩