before reading
自分が前提として持ってる知識
- activejobはキューイングバックエンドの抽象でしかない。
- globalidによりactiverecordのシリアライズをしてくれる。
この前提知識のもと、あたりをつけて潜っていきます
target
branch: 6-0-stable
commit: 186edaedfc
ActiveJobの使い方
class JustPrintJob < ApplicationJob
def perform()
print('hello')
end
end
JustPrintJob.perform_now
JustPrintJob.perform_later
ActiveJobの基本的な使い方です。
このあたりがgrep候補であり、コードを読み始めるエントリポイントになります
base class
module ActiveJob
class Base
include Core
include QueueAdapter
include QueueName
include QueuePriority
include Enqueuing
include Execution
include Callbacks
include Exceptions
include Logging
include Timezones
include Translation
ActiveSupport.run_load_hooks(:active_job, self)
end
end
ActiveJob::Baseは11のModuleをincludeしてます
中核をなしてそうなModuleは
Core, Enqueuing, Execution
の三つです
Core # 後述
QueueAdapter # queue adapterのcomposition
QueueName # queueの名前の設定
QueuePriority # queueの優先度の設定
Enqueuing # 後述
Execution # 後述
Callbacks # ActiveSupport::Callbackの設定
Exceptions # retry周りの設定
Logging # Log周り
Timezones # Jobが実行されるtimezoneを設定
Translation # Jobが実行されるlocaleを設定
execution
module ActiveJob
module Execution
extend ActiveSupport::Concern
include ActiveSupport::Rescuable
module ClassMethods
def perform_now(*args)
job_or_instantiate(*args).perform_now
end
def execute(job_data) #:nodoc:
ActiveJob::Callbacks.run_callbacks(:execute) do
job = deserialize(job_data)
job.perform_now
end
end
end
def perform_now
self.executions = (executions || 0) + 1
deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
rescue => exception
rescue_with_handler(exception) || raise
end
def perform(*)
fail NotImplementedError
end
end
end
Executionのクラスメソッドにperform_nowがあります。
処理としては、instance化してインスタンスメソッド perform_now を呼ぶだけです。
def perform_now(*args)
job_or_instantiate(*args).perform_now
end
インスタンスメソッド perform_nowは
実行回数をインクリメント
引数のデシリアライズ
perform呼び出し
をしていることがわかります
def perform_now
self.executions = (executions || 0) + 1
deserialize_arguments_if_needed
run_callbacks :perform do
perform(*arguments)
end
rescue => exception
rescue_with_handler(exception) || raise
end
performはユーザー定義のクラスでオーバーライドする処理です。
activejob側では not implemented です。
def perform(*)
fail NotImplementedError
end
enqueuing
module ActiveJob
module Enqueuing
extend ActiveSupport::Concern
module ClassMethods
def perform_later(*args)
job_or_instantiate(*args).enqueue
end
private
def job_or_instantiate(*args) # :doc:
args.first.is_a?(self) ? args.first : new(*args)
end
end
def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
self.class.queue_adapter.enqueue_at self, scheduled_at
else
self.class.queue_adapter.enqueue self
end
successfully_enqueued = true
end
if successfully_enqueued
self
else
if self.class.return_false_on_aborted_enqueue
false
else
ActiveSupport::Deprecation.warn(
"Rails 6.1 will return false when the enqueuing is aborted. Make sure your code doesn't depend on it" \
" returning the instance of the job and set `config.active_job.return_false_on_aborted_enqueue = true`" \
" to remove the deprecations."
)
self
end
end
end
end
end
enqueuingのクラスメソッドにperform_laterがあります。
処理としては、instance化してインスタンスメソッド enqueue を呼ぶだけです。
def perform_later(*args)
job_or_instantiate(*args).enqueue
end
インスタンスメソッド enqueueは
optionをパース
queue_adapterにenqueue呼び出しを移譲
をしていることがわかります
綺麗なstrategy patternが見えますね。
def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
self.class.queue_adapter.enqueue_at self, scheduled_at
else
self.class.queue_adapter.enqueue self
end
successfully_enqueued = true
end
if successfully_enqueued
self
else
if self.class.return_false_on_aborted_enqueue
false
else
ActiveSupport::Deprecation.warn(
"Rails 6.1 will return false when the enqueuing is aborted. Make sure your code doesn't depend on it" \
" returning the instance of the job and set `config.active_job.return_false_on_aborted_enqueue = true`" \
" to remove the deprecations."
)
self
end
end
end
Core
ここまでexecution, enqueueを見てきましたが、デシリアライズ周りの処理は見受けられませんでした。
Coreに詰まってると信じて見ていきます。
Execution#perform_now で現れた deserialize_arguments_if_needed からコードに潜っていきます。
def deserialize_arguments_if_needed
if arguments_serialized?
@arguments = deserialize_arguments(@serialized_arguments)
@serialized_arguments = nil
end
end
def deserialize_arguments(serialized_args)
Arguments.deserialize(serialized_args)
end
def arguments_serialized?
defined?(@serialized_arguments) && @serialized_arguments
end
@serialized_argumentsが定義されていて存在するなら、
それを引数にArgument.deserializeを呼んでいます。
結果を@argumentsに入れます。
この@argumentsはExecution#performの引数として展開されます。
Arguments.deserializeを見ていきます
PERMITTED_TYPES = [ NilClass, String, Integer, Float, BigDecimal, TrueClass, FalseClass ]
def deserialize(arguments)
arguments.map { |argument| deserialize_argument(argument) }
rescue
raise DeserializationError
end
def deserialize_argument(argument)
case argument
when String
argument
when *PERMITTED_TYPES
argument
when Array
argument.map { |arg| deserialize_argument(arg) }
when Hash
if serialized_global_id?(argument)
deserialize_global_id argument
elsif custom_serialized?(argument)
Serializers.deserialize(argument)
else
deserialize_hash(argument)
end
else
raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
end
end
def serialized_global_id?(hash)
hash.size == 1 && hash.include?(GLOBALID_KEY)
end
def deserialize_global_id(hash)
GlobalID::Locator.locate hash[GLOBALID_KEY]
end
def custom_serialized?(hash)
hash.key?(OBJECT_SERIALIZER_KEY)
end
このdeserializeはserializeの逆と思い込みます。
ちなみにserializeはこんな感じで呼び出せて、ARクラスはこうなります。
[13] pry(main)> ActiveJob::Arguments.serialize [User.last]
=> [{"_aj_globalid"=>"gid://rails-sample6/User/1"}]
QueueAdaptersの各Adapter内でserializeされてqueueに積まれます。
なのでqueueからdequeueされるデータをdeserializeする必要があります。