LoginSignup
4
2

More than 5 years have passed since last update.

ActiveJobコードリーディング

Last updated at Posted at 2017-02-23

経緯

昔 sidekiq を使っていたとき、非同期実行をしたければ perform_async
を使うのが一般的でした。しかしながら、今回使おうとしたら使えず....。
ドキュメント読んでも、コード読んでもperform_asyncは存在するし...。
sidekiq_optionsも使えないので困ったぞ!となって色々調べました。

sidekiqのlog

まずは実行時の log 眺めますよね。ということでsidekiqのlogを見てみました。
昔見たのと、結構違う logっぽい。ここにある retry パラメータをfalseにしたい。

※ Railsのlogはあんまり使えそうに無かったので消してます。

"lpush" "queue:default" "{\"class\":\
"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",
\"wrapped\":\"MyJob\",\"queue\":\"default\",\"args\":
[{\"job_class\":\"IntroduceNotifier\",
\"job_id\":\"3829b424-7c3c-4f52-85ee-5b1d076a7937\"
,\"queue_name\":\"default\",
\"priority\":null,\"arguments\":[{\"subject_id\":1,
\"branch_ids\":[1,2,5],\"_aj_symbol_keys\":[\"subject_id\",
\"branch_ids\"]}],\"locale\":\"ja\"}],
\"retry\":true,\"jid\":\"b3aefff2e53910b0e3862073\",
\"created_at\":1487839214.219861,
\"enqueued_at\":1487839214.220073}"
1487839214.221975 [0 127.0.0.1:64915] "exec"

ActiveJob

ActiveJobの構成は下記のようになっています。

  • AcriveJob
    • queue_adapter.rb (どのadapterを指定するか決める役割)
    • queue_adapters.rb (対応している adapterを持っている. resqueとか)
      • sidekiq_adapter.rb (sidekiqのadapter )
      • others...
    • core.rb ( jobを実行するために必要な引数を持つ )
    • configured_job.rb ( jobに対する wrapperっぽい? )
    • encueuing.rb ( 適切なadapterにqueuingする役割を持つ )
    • others...

読むべきポイント

jobの実行は下記のコードで行う。
MyJob.set(wait: 10).perform_later

となると、set で何の objectを返しているのか、把握する必要がある。
grepしてみると、core.rb が set methodを持っていることが分かった.

core.rb

module ActiveJob
  module Core
      ...
      def set(options = {})
        ConfiguredJob.new(self, options)
      end

core.rb は ConfiguredJobを返しているっぽいですね。

configured_job.rb

module ActiveJob
  class ConfiguredJob #:nodoc:
    ...
    def perform_later(*args)
      @job_class.new(*args).enqueue @options
    end

ここで、job_classってなんだろってなったので、諸々確認。

consoleで確認したらこんな感じです。

[7] pry(main)> job = MyJob.set(wait: 10)
=> #<ActiveJob::ConfiguredJob:0x007fcef2e5bd98
 @job_class=MyJob,
 @options={:wait=>10}>

自分たちで設定した MyJobらしい。
で、MyJobはApplicationJobを継承しているはずなので、
ActiveJob::Baseでincludeされている methodを利用可能です。

というところから、次に読むべきポイントは ActiveJob::Baseで
includeされているmoduleの中で enqueue methodを持つやつになります。

enqueuing.rb

module ActiveJob
  module Enqueuing
    def enqueue(options = {})
      ....
      run_callbacks :enqueue do
        if scheduled_at
          self.class.queue_adapter.enqueue_at self, scheduled_at
        else
          self.class.queue_adapter.enqueue self
        end
      end
      self
    end

enqueuing.rbを見ると、 queue_adapterのenqueue methodを使っているらしい。
とのことで、ようやく sidekiq_adapterを見る必要が出てきた。

sidekiq_adapters.rb と sidekiq/client.rb

module ActiveJob
  module QueueAdapters
    class SidekiqAdapter
      def enqueue(job) #:nodoc:
        #Sidekiq::Client does not support symbols as keys
        job.provider_job_id = Sidekiq::Client.push \
          "class"   => JobWrapper,
          "wrapped" => job.class.to_s,
          "queue"   => job.queue_name,
          "args"    => [ job.serialize ]
      end
      class JobWrapper #:nodoc:
        include Sidekiq::Worker

        def perform(job_data)
          Base.execute job_data.merge("provider_job_id" => jid)
        end
      end
    end

ということで、serializeする際のパラメータに retryを入れられれば良さそう。
まで行った。ところで、exception.rb を読んだら下記のことが分かった。

追記

exception.rb を読んだところ↓みたいな記述が。
ちなみに master branchにしかいないっぽい。
こいつがmergeされたらいいなぁ

# Discard the job with no attempts to retry, if the exception is raised. This is useful when the subject of the job,
# like an Active Record, is no longer available, and the job is thus no longer relevant.
#
# ==== Example
#
#  class SearchIndexingJob < ActiveJob::Base
#    discard_on ActiveJob::DeserializationError
#
#    def perform(record)
#      # Will raise ActiveJob::DeserializationError if the record can't be deserialized
#    end
#  end

def discard_on(exception)
  rescue_from exception do |error|
    logger.error "Discarded #{self.class} due to a #{exception}. The original exception was #{error.cause.inspect}."
  end
end

ということで、下のように jobを書けば良さそう。

# frozen_string_literal: true
class MyJob < ApplicationJob
  discard_on StandardError

おまけ

queue_adapter.rb

sidekiqのadapterを指定するclass。
railsのapplication.rbで config.active_job.queue_adapter = :sidekiq
で設定しているのはここの値。

      def queue_adapter=(name_or_adapter_or_class)
        self._queue_adapter = interpret_adapter(name_or_adapter_or_class)
      end

      private

        def interpret_adapter(name_or_adapter_or_class)
          case name_or_adapter_or_class
          when Symbol, String
            ActiveJob::QueueAdapters.lookup(name_or_adapter_or_class).new
          else
            if queue_adapter?(name_or_adapter_or_class)
              name_or_adapter_or_class
            else
              raise ArgumentError
            end
          end
        end
4
2
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
4
2