経緯
昔 sidekiq を使っていたとき、非同期実行をしたければ perform_async
を使うのが一般的でした。しかしながら、今回使おうとしたら使えず....。
ドキュメント読んでも、コード読んでもperform_asyncは存在するし...。
sidekiq_optionsも使えないので困ったぞ!となって色々調べました。
sidekiqのlog
まずは実行時の log 眺めますよね。ということでsidekiqのlogを見てみました。
昔見たのと、結構違う logっぽい。ここにある retry パラメータをfalseにしたい。
※ Railsのlogはあんまり使えそうに無かったので消してます。
"lpush" "queue:default" "{\"class\":\
"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",
\"wrapped\":\"MyJob\",\"queue\":\"default\",\"args\":
[{\"job_class\":\"IntroduceNotifier\",
\"job_id\":\"3829b424-7c3c-4f52-85ee-5b1d076a7937\"
,\"queue_name\":\"default\",
\"priority\":null,\"arguments\":[{\"subject_id\":1,
\"branch_ids\":[1,2,5],\"_aj_symbol_keys\":[\"subject_id\",
\"branch_ids\"]}],\"locale\":\"ja\"}],
\"retry\":true,\"jid\":\"b3aefff2e53910b0e3862073\",
\"created_at\":1487839214.219861,
\"enqueued_at\":1487839214.220073}"
1487839214.221975 [0 127.0.0.1:64915] "exec"
ActiveJob
ActiveJobの構成は下記のようになっています。
- AcriveJob
- queue_adapter.rb (どのadapterを指定するか決める役割)
- queue_adapters.rb (対応している adapterを持っている. resqueとか)
- sidekiq_adapter.rb (sidekiqのadapter )
- others...
- core.rb ( jobを実行するために必要な引数を持つ )
- configured_job.rb ( jobに対する wrapperっぽい? )
- encueuing.rb ( 適切なadapterにqueuingする役割を持つ )
- others...
読むべきポイント
jobの実行は下記のコードで行う。
MyJob.set(wait: 10).perform_later
となると、set で何の objectを返しているのか、把握する必要がある。
grepしてみると、core.rb が set methodを持っていることが分かった.
core.rb
module ActiveJob
module Core
...
def set(options = {})
ConfiguredJob.new(self, options)
end
core.rb は ConfiguredJobを返しているっぽいですね。
configured_job.rb
module ActiveJob
class ConfiguredJob #:nodoc:
...
def perform_later(*args)
@job_class.new(*args).enqueue @options
end
ここで、job_classってなんだろってなったので、諸々確認。
consoleで確認したらこんな感じです。
[7] pry(main)> job = MyJob.set(wait: 10)
=> #<ActiveJob::ConfiguredJob:0x007fcef2e5bd98
@job_class=MyJob,
@options={:wait=>10}>
自分たちで設定した MyJobらしい。
で、MyJobはApplicationJobを継承しているはずなので、
ActiveJob::Baseでincludeされている methodを利用可能です。
というところから、次に読むべきポイントは ActiveJob::Baseで
includeされているmoduleの中で enqueue methodを持つやつになります。
enqueuing.rb
module ActiveJob
module Enqueuing
def enqueue(options = {})
....
run_callbacks :enqueue do
if scheduled_at
self.class.queue_adapter.enqueue_at self, scheduled_at
else
self.class.queue_adapter.enqueue self
end
end
self
end
enqueuing.rbを見ると、 queue_adapterのenqueue methodを使っているらしい。
とのことで、ようやく sidekiq_adapterを見る必要が出てきた。
sidekiq_adapters.rb と sidekiq/client.rb
module ActiveJob
module QueueAdapters
class SidekiqAdapter
def enqueue(job) #:nodoc:
#Sidekiq::Client does not support symbols as keys
job.provider_job_id = Sidekiq::Client.push \
"class" => JobWrapper,
"wrapped" => job.class.to_s,
"queue" => job.queue_name,
"args" => [ job.serialize ]
end
class JobWrapper #:nodoc:
include Sidekiq::Worker
def perform(job_data)
Base.execute job_data.merge("provider_job_id" => jid)
end
end
end
ということで、serializeする際のパラメータに retryを入れられれば良さそう。
まで行った。ところで、exception.rb を読んだら下記のことが分かった。
追記
exception.rb を読んだところ↓みたいな記述が。
ちなみに master branchにしかいないっぽい。
こいつがmergeされたらいいなぁ
# Discard the job with no attempts to retry, if the exception is raised. This is useful when the subject of the job,
# like an Active Record, is no longer available, and the job is thus no longer relevant.
#
# ==== Example
#
# class SearchIndexingJob < ActiveJob::Base
# discard_on ActiveJob::DeserializationError
#
# def perform(record)
# # Will raise ActiveJob::DeserializationError if the record can't be deserialized
# end
# end
def discard_on(exception)
rescue_from exception do |error|
logger.error "Discarded #{self.class} due to a #{exception}. The original exception was #{error.cause.inspect}."
end
end
ということで、下のように jobを書けば良さそう。
# frozen_string_literal: true
class MyJob < ApplicationJob
discard_on StandardError
おまけ
queue_adapter.rb
sidekiqのadapterを指定するclass。
railsのapplication.rbで config.active_job.queue_adapter = :sidekiq
で設定しているのはここの値。
def queue_adapter=(name_or_adapter_or_class)
self._queue_adapter = interpret_adapter(name_or_adapter_or_class)
end
private
def interpret_adapter(name_or_adapter_or_class)
case name_or_adapter_or_class
when Symbol, String
ActiveJob::QueueAdapters.lookup(name_or_adapter_or_class).new
else
if queue_adapter?(name_or_adapter_or_class)
name_or_adapter_or_class
else
raise ArgumentError
end
end
end