Help us understand the problem. What is going on with this article?

Rails4.2 で AsyncAdapter を使う

More than 1 year has passed since last update.

背景

Ruby on Rails 4.2 の ActiveJob::QueueAdapter(以下、QueueAdapterと記述) はデフォルトではInlineAdapterです1。ジョブは必ず同期的に実行されます。あるHTTPリクエストでジョブを実行すると、perform_laterメソッドを使ってジョブを呼び出しても、同期的に実行します。そのリクエストのレスポンスは、ジョブの実行完了後に帰ります。

問題

UIの動作を確認したいときにジョブが同期的に実行されると画面の使い勝手を想像しにくくなります。ActiveJob を使うのは、時間の掛かる処理をバックエンドで実行し、画面遷移をすばやく完了させるためです。想定する画面の使い勝手は、ActiveJob を使ってジョブを登録し、即座に次の画面に遷移することです。これをデフォルトの環境では、確認できません。

また Rails4.2 には AsyncAdapter の実装が含まれていません2。そのためconfig/applcation.rbファイルに以下の記述を追加するだけでは使えません。

config.active_job.queue_adapter = :async

Delayed::JobSidekiqなどのQueueAdapterを使うことも考えられますが、これらを使うには次のような作業が必要です。

  • 新しいGemのインストール
  • DBのマイグレーション
  • Redisやジョブプロセスの追加

「画面の使い勝手を確認」の実現に対して、必要なアプリケーションとアーキテクチャの変更が大き過ぎます。

解決策

Rails5.2 の AsyncAdapter3 を移植します。

Rails4と5のActiveJobの変更点

Rails5.0 のリリースノート4

Add ability to configure the queue adapter on a per job basis without affecting each other.

とあるように、ジョブクラスごとに QueueAdapter を変更できるようになりました。これに伴い、ActiveJobのインタフェースにいくつかの変更点があるため、Rails5.2 の AsyncAdapter をそのままコピーしても動きません。

動的なQueueAdapter

ジョブごとに異なる QueueAdapter を使うため、Rails は QueueAdapter は静的にクラスとして使うのをやめ、インスタンスを生成して使うようになりました。以前は QueueAdapter はインタフェースをクラスメソッドで提供し、Railsはクラスメソッドを使っていました。

AsyncAdapter のメソッドをclass << selfでくくり、クラスメソッドに変更します。

provider_job_idの追加

ジョブから、ジョブを生成した QueueAdapter インスタンスを判別できるように、ActiveJob::Base#provider_job_idが追加されました。

job.provider_job_id = SecureRandom.uuid

としている部分です5

修正したソースコード

変更後のソースコードは次の通りです。

require 'concurrent/scheduled_task'
require 'concurrent/executor/thread_pool_executor'
require 'concurrent/utility/processor_counter'

module ActiveJob
  module QueueAdapters
    # == Active Job Async adapter
    #
    # The Async adapter runs jobs with an in-process thread pool.
    #
    # This is the default queue adapter. It's well-suited for dev/test since
    # it doesn't need an external infrastructure, but it's a poor fit for
    # production since it drops pending jobs on restart.
    #
    # To use this adapter, set queue adapter to +:async+:
    #
    #   config.active_job.queue_adapter = :async
    #
    # To configure the adapter's thread pool, instantiate the adapter and
    # pass your own config:
    #
    #   config.active_job.queue_adapter = ActiveJob::QueueAdapters::AsyncAdapter.new \
    #     min_threads: 1,
    #     max_threads: 2 * Concurrent.processor_count,
    #     idletime: 600.seconds
    #
    # The adapter uses a {Concurrent Ruby}[https://github.com/ruby-concurrency/concurrent-ruby] thread pool to schedule and execute
    # jobs. Since jobs share a single thread pool, long-running jobs will block
    # short-lived jobs. Fine for dev/test; bad for production.
    class AsyncAdapter
      class << self
      # See {Concurrent::ThreadPoolExecutor}[http://ruby-concurrency.github.io/concurrent-ruby/Concurrent/ThreadPoolExecutor.html] for executor options.
        def initialize(**executor_options)
          @scheduler = Scheduler.new(**executor_options)
        end

        def enqueue(job) #:nodoc:
          @scheduler ||= Scheduler.new
          @scheduler.enqueue JobWrapper.new(job), queue_name: job.queue_name
        end

        def enqueue_at(job, timestamp) #:nodoc:
          @scheduler ||= Scheduler.new
          @scheduler.enqueue_at JobWrapper.new(job), timestamp, queue_name: job.queue_name
        end

        # Gracefully stop processing jobs. Finishes in-progress work and handles
        # any new jobs following the executor's fallback policy (`caller_runs`).
        # Waits for termination by default. Pass `wait: false` to continue.
        def shutdown(wait: true) #:nodoc:
          @scheduler ||= Scheduler.new
          @scheduler.shutdown wait: wait
        end

        # Used for our test suite.
        def immediate=(immediate) #:nodoc:
          @scheduler ||= Scheduler.new
          @scheduler.immediate = immediate
        end
      end

      # Note that we don't actually need to serialize the jobs since we're
      # performing them in-process, but we do so anyway for parity with other
      # adapters and deployment environments. Otherwise, serialization bugs
      # may creep in undetected.
      class JobWrapper #:nodoc:
        def initialize(job)
          pp job
          @job_data = job.serialize
        end

        def perform
          Base.execute @job_data
        end
      end

      class Scheduler #:nodoc:
        DEFAULT_EXECUTOR_OPTIONS = {
          min_threads:     0,
          max_threads:     Concurrent.processor_count,
          auto_terminate:  true,
          idletime:        60, # 1 minute
          max_queue:       0, # unlimited
          fallback_policy: :caller_runs # shouldn't matter -- 0 max queue
        }.freeze

        attr_accessor :immediate

        def initialize(**options)
          self.immediate = false
          @immediate_executor = Concurrent::ImmediateExecutor.new
          @async_executor = Concurrent::ThreadPoolExecutor.new(DEFAULT_EXECUTOR_OPTIONS.merge(options))
        end

        def enqueue(job, queue_name:)
          executor.post(job, &:perform)
        end

        def enqueue_at(job, timestamp, queue_name:)
          delay = timestamp - Time.current.to_f
          if delay > 0
            Concurrent::ScheduledTask.execute(delay, args: [job], executor: executor, &:perform)
          else
            enqueue(job, queue_name: queue_name)
          end
        end

        def shutdown(wait: true)
          @async_executor.shutdown
          @async_executor.wait_for_termination if wait
        end

        def executor
          immediate ? @immediate_executor : @async_executor
        end
      end
    end
  end
end

このソースコードをapp/lib/active_job/queue_adapters/async_adapter.rbファイルとして保存します。

注意

config/applcation.rbファイルに以下の記述を追加して、使用する QueueAdapter を変更する必要があります。

config.active_job.queue_adapter = :async

その他の考えられる対応策

Rails4.2 の ActiveJob のジョブを非同期に実行するには、次の方法も考えられます。

  1. QueueAdapter に Sucker Punchを使う
  2. Rails のバージョンを5.0.7.1以上に上げる

Sucker Punch は AsyncAdapter と同様のスレッドベースの QueueAdapter です。AsyncAdapter と同様にアプリケーションの構成を大きく変えずにジョブの非同期実行が実現できそうです。こちらは試していません。

参考

Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした