0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

activejobを読む

0
Posted at

before reading

自分が前提として持ってる知識

  1. activejobはキューイングバックエンドの抽象でしかない。
  2. globalidによりactiverecordのシリアライズをしてくれる。

この前提知識のもと、あたりをつけて潜っていきます

target

branch: 6-0-stable
commit: 186edaedfc

ActiveJobの使い方

class JustPrintJob < ApplicationJob
  def perform()
    print('hello')
  end
end

JustPrintJob.perform_now
JustPrintJob.perform_later

ActiveJobの基本的な使い方です。
このあたりがgrep候補であり、コードを読み始めるエントリポイントになります

base class

activejob/lib/active_job/base.rb

module ActiveJob
  class Base
    include Core
    include QueueAdapter
    include QueueName
    include QueuePriority
    include Enqueuing
    include Execution
    include Callbacks
    include Exceptions
    include Logging
    include Timezones
    include Translation

    ActiveSupport.run_load_hooks(:active_job, self)
  end
end

ActiveJob::Baseは11のModuleをincludeしてます
中核をなしてそうなModuleは
Core, Enqueuing, Execution
の三つです

Core          # 後述
QueueAdapter  # queue adapterのcomposition
QueueName     # queueの名前の設定
QueuePriority # queueの優先度の設定
Enqueuing     # 後述
Execution     # 後述
Callbacks     # ActiveSupport::Callbackの設定
Exceptions    # retry周りの設定
Logging       # Log周り
Timezones     # Jobが実行されるtimezoneを設定
Translation   # Jobが実行されるlocaleを設定

execution

activejob/lib/active_job/execution.rb

module ActiveJob
  module Execution
    extend ActiveSupport::Concern
    include ActiveSupport::Rescuable

    module ClassMethods
      def perform_now(*args)
        job_or_instantiate(*args).perform_now
      end

      def execute(job_data) #:nodoc:
        ActiveJob::Callbacks.run_callbacks(:execute) do
          job = deserialize(job_data)
          job.perform_now
        end
      end
    end

    def perform_now
      self.executions = (executions || 0) + 1

      deserialize_arguments_if_needed
      run_callbacks :perform do
        perform(*arguments)
      end
    rescue => exception
      rescue_with_handler(exception) || raise
    end

    def perform(*)
      fail NotImplementedError
    end
  end
end

Executionのクラスメソッドにperform_nowがあります。
処理としては、instance化してインスタンスメソッド perform_now を呼ぶだけです。

      def perform_now(*args)
        job_or_instantiate(*args).perform_now
      end

インスタンスメソッド perform_now
実行回数をインクリメント
引数のデシリアライズ
perform呼び出し
をしていることがわかります

    def perform_now
      self.executions = (executions || 0) + 1

      deserialize_arguments_if_needed
      run_callbacks :perform do
        perform(*arguments)
      end
    rescue => exception
      rescue_with_handler(exception) || raise
    end

performはユーザー定義のクラスでオーバーライドする処理です。
activejob側では not implemented です。

    def perform(*)
      fail NotImplementedError
    end

enqueuing

module ActiveJob
  module Enqueuing
    extend ActiveSupport::Concern

    module ClassMethods
      def perform_later(*args)
        job_or_instantiate(*args).enqueue
      end

      private
        def job_or_instantiate(*args) # :doc:
          args.first.is_a?(self) ? args.first : new(*args)
        end
    end

    def enqueue(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      successfully_enqueued = false

      run_callbacks :enqueue do
        if scheduled_at
          self.class.queue_adapter.enqueue_at self, scheduled_at
        else
          self.class.queue_adapter.enqueue self
        end

        successfully_enqueued = true
      end

      if successfully_enqueued
        self
      else
        if self.class.return_false_on_aborted_enqueue
          false
        else
          ActiveSupport::Deprecation.warn(
            "Rails 6.1 will return false when the enqueuing is aborted. Make sure your code doesn't depend on it" \
            " returning the instance of the job and set `config.active_job.return_false_on_aborted_enqueue = true`" \
            " to remove the deprecations."
          )

          self
        end
      end
    end
  end
end

enqueuingのクラスメソッドにperform_laterがあります。
処理としては、instance化してインスタンスメソッド enqueue を呼ぶだけです。

      def perform_later(*args)
        job_or_instantiate(*args).enqueue
      end

インスタンスメソッド enqueue
optionをパース
queue_adapterにenqueue呼び出しを移譲
をしていることがわかります

綺麗なstrategy patternが見えますね。

    def enqueue(options = {})
      self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
      self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]
      successfully_enqueued = false

      run_callbacks :enqueue do
        if scheduled_at
          self.class.queue_adapter.enqueue_at self, scheduled_at
        else
          self.class.queue_adapter.enqueue self
        end

        successfully_enqueued = true
      end

      if successfully_enqueued
        self
      else
        if self.class.return_false_on_aborted_enqueue
          false
        else
          ActiveSupport::Deprecation.warn(
            "Rails 6.1 will return false when the enqueuing is aborted. Make sure your code doesn't depend on it" \
            " returning the instance of the job and set `config.active_job.return_false_on_aborted_enqueue = true`" \
            " to remove the deprecations."
          )

          self
        end
      end
    end

Core

ここまでexecution, enqueueを見てきましたが、デシリアライズ周りの処理は見受けられませんでした。
Coreに詰まってると信じて見ていきます。
Execution#perform_now で現れた deserialize_arguments_if_needed からコードに潜っていきます。

activejob/lib/active_job/core.rb
      def deserialize_arguments_if_needed
        if arguments_serialized?
          @arguments = deserialize_arguments(@serialized_arguments)
          @serialized_arguments = nil
        end
      end

      def deserialize_arguments(serialized_args)
        Arguments.deserialize(serialized_args)
      end

      def arguments_serialized?
        defined?(@serialized_arguments) && @serialized_arguments
      end

@serialized_argumentsが定義されていて存在するなら、
それを引数にArgument.deserializeを呼んでいます。
結果を@argumentsに入れます。
この@argumentsはExecution#performの引数として展開されます。

Arguments.deserializeを見ていきます

activejob/lib/active_job/arguments.rb
      PERMITTED_TYPES = [ NilClass, String, Integer, Float, BigDecimal, TrueClass, FalseClass ]

    def deserialize(arguments)
      arguments.map { |argument| deserialize_argument(argument) }
    rescue
      raise DeserializationError
    end

      def deserialize_argument(argument)
        case argument
        when String
          argument
        when *PERMITTED_TYPES
          argument
        when Array
          argument.map { |arg| deserialize_argument(arg) }
        when Hash
          if serialized_global_id?(argument)
            deserialize_global_id argument
          elsif custom_serialized?(argument)
            Serializers.deserialize(argument)
          else
            deserialize_hash(argument)
          end
        else
          raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
        end
      end

      def serialized_global_id?(hash)
        hash.size == 1 && hash.include?(GLOBALID_KEY)
      end

      def deserialize_global_id(hash)
        GlobalID::Locator.locate hash[GLOBALID_KEY]
      end

      def custom_serialized?(hash)
        hash.key?(OBJECT_SERIALIZER_KEY)
      end

このdeserializeはserializeの逆と思い込みます。
ちなみにserializeはこんな感じで呼び出せて、ARクラスはこうなります。

[13] pry(main)> ActiveJob::Arguments.serialize [User.last]
=> [{"_aj_globalid"=>"gid://rails-sample6/User/1"}]

QueueAdaptersの各Adapter内でserializeされてqueueに積まれます。
なのでqueueからdequeueされるデータをdeserializeする必要があります。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?