この記事では Sidekiq 5 系を対象にしていますが、4 系でも変わらない内容です。
Sidekiq Worker を利用する利点の一つに、 リトライ処理 を Sidekiq に任せられる点があります。
リトライを安全に実行するために、Sidekiq Worker の処理を冪等にしておくことは Sidekiq の Best Practice にも触れられています。以下は引用です。
2. Make your job idempotent and transactional
Idempotency means that your job can safely execute multiple times. For instance, with the error retry functionality, your job might be half-processed, throw an error, and then be re-executed over and over until it successfully completes. Let's say you have a job which voids a credit card transaction and emails the user to let them know the charge has been refunded:
def perform(card_charge_id)
charge = CardCharge.find(card_charge_id)
charge.void_transaction
Emailer.charge_refunded(charge).deliver
end
What happens when the email fails to render due to a bug? Will the void_transaction method handle the case where a charge has already been refunded? You can use a database transaction to ensure data changes are rolled back if there is an error or you can write your code to be resilient in the face of errors. Just remember that Sidekiq will execute your job at least once, not exactly once.
リトライを規定回数実行しても失敗した場合、Sidekiq の Job はどうなる?
これは設定によります。以下は Sidekiq の Error Handling の説明 からの引用です。
3. If you don't fix the bug within 25 retries (about 21 days), Sidekiq will stop retrying and move your job to the Dead Job Queue. You can fix the bug and retry the job manually anytime within the next 6 months using the Web UI.
これはデフォルトの場合ですが、25回のリトライが終了してもバグが修正されなければ、リトライをやめて、 Dead Job Queue
に移動します。よく死亡ジョブと呼ばれるものです。
Dead Job Queue の説明にもあるように、これはオプションで retry: false
にするか dead: false
にしない限りは仮に retry: 0
でも死亡ジョブに送ることができます。
死亡ジョブは Sidekiq Web の UI からも再実行が可能なので、ハンドルできないエラーが起こった時に送る先として便利に使っています。
死亡ジョブに入る前にログを出すことができる
ようやく本題に近づいてくるのですが、先程の Dead Job Queue の説明の所に以下の段落があります。
After retrying so many times, Sidekiq will call the sidekiq_retries_exhausted hook on your Worker if you've defined it. The hook receives the queued message as an argument. This hook is called right before Sidekiq moves the job to the DJQ.
class FailingWorker
include Sidekiq::Worker
>
sidekiq_retries_exhausted do |msg, e|
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']}"
end
>
def perform(*args)
raise "or I don't work"
end
end
sidekiq_retries_exhausted
の hook に登録しておくと、死亡ジョブに送られる直前にだけ呼び出される処理が定義できます
もちろん、Worker の perform
の内部で例外を掴んでログを出すこともできますが、「負荷が高い時によく失敗するんだけど、リトライしてくれれば問題じゃない処理」というのもあり、そういう場合には、死亡になった時だけログが出ると都合が良いケースがあります。
default_retries_exhausted も同様です
Sidekiq 起動時の configure 時に default_retries_exhausted
として個別の Sidekiq Worker に sidekiq_retries_exhausted
が指定されていなかった場合のデフォルトの処理を定義可能です。(未指定の場合は何もしない)
具体的な設定方法はChangeLog から引用します。
Sidekiq.configure_server do |config|
config.default_retries_exhausted = -> (job, ex) do
Sidekiq.logger.info "#{job['class']} job is now dead"
end
end
フォーマットは同じですね。
sidekiq_retries_exhausted の hook の第二引数は例外が渡されてくる
上記の公式の例が、msg
の引数から例外の class
や error_message
を取得しているため、同様にしている例をみかけます。ただ、msg
には例外のバックトレースは入っていません。 (error_class
は入っています。)
第二引数の e
はまさにリトライを引き起こすきっかけになった例外のインスタンスが入っているため、ここからバックトレースを取り出せます。(というか、msg
の error_class
や error_message
はそのインスタンスから取り出している)
つまり先程の例を使うと、以下のようにするとバックトレースもログに出力できます。
class FailingWorker
include Sidekiq::Worker
sidekiq_retries_exhausted do |msg, e|
Sidekiq.logger.warn "Failed #{msg['class']} with #{msg['args']}: #{msg['error_message']} : #{e.backtrace.join("\n")}"
end
def perform(*args)
raise "or I don't work"
end
end
sidekiq_options の backtrace オプションは何か?
Sidekiq は backtrace オプションを指定できます。(ただし ActiveJob
からは設定できません。)
Backtrace Logging
Enabling backtrace logging for a job will cause the backtrace to be persisted throughout the lifetime of the job. This can cause your Redis memory usage to grow without new jobs being added if a large quantity of jobs are failing repeatedly and being requeued.
You should use caution when enabling backtrace by limiting it to a couple of lines, or use an error service to keep track of failures.
これは実際に設定すると以下のような感じですね。
class FailingWorker
include Sidekiq::Worker
sidekiq_options backtrace: true # default は false
sidekiq_options backtrace: 100 # 数値を指定した場合は保存する backtrace の行数。backtrace は配列で取得できるので。
これは何かというと、リトライ時に、再度 Sidekiq の Job をキューイングすることになるのですが、その際にリトライの原因になったエラーのバックトレースを保存するというものです。
これは実際に Redis に保存される JSON のオブジェクトを見るとわかりやすいですね。以下も公式 wiki からの引用です。
Retries
Sidekiq's retry feature adds several elements to the job payload, necessary for documenting the error in the UI:
{
"retry_count": 2, // number of times we've retried so far
"error_message": "wrong number of arguments (2 for 3)", // the exception message
"error_class": "ArgumentError", // the exception class
"error_backtrace": ["line 0", "line 1", ...], // some or all of the exception's backtrace, optional, array of strings
"failed_at": 1234567890, // the first time the job failed
"retried_at": 1234567890 // the last time the job failed
}
The last two items are timestamps stored as epoch integers.
これは用途としては、特に Sidekiq Web の UI から参照する時に便利ですし、もちろん、アプリのログを使わずにこちらに任せるということもできます。(ただし、こちらは Job が処理されれば消えてしまいますが)
Appendix sidekiq_retries_exhausted に渡されてくる例外情報について
上記ではさらっと第二引数の例外インスタンスはリトライの原因になった例外だと説明しましたが、一応内部を見ておきます。
コードにインラインでコメントしましたが、sidekiq_retries_exhausted
に渡される引数の例外と、msg
の error_message
の元である例外のインスタンが同じであることがわかります。
module Sidekiq
class JobRetry
# 🏁 yield 内で middleware の実行と、Worker での Job の処理を行うため、ユーザーが書いたコードで発生する例外はここで rescue される。
def local(worker, msg, queue)
yield
rescue Skip => ex
raise ex
rescue Sidekiq::Shutdown => ey
# ignore, will be pushed back onto queue during hard_shutdown
raise ey
rescue Exception => e
# ignore, will be pushed back onto queue during hard_shutdown
raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)
if msg['retry'] == nil
msg['retry'] = worker.class.get_sidekiq_options['retry']
end
raise e unless msg['retry']
# 🏁 リトライが有効であれば attempt_retry に進む
attempt_retry(worker, msg, queue, e)
# We've handled this error associated with this job, don't
# need to handle it at the global level
raise Skip
end
# Note that +worker+ can be nil here if an error is raised before we can
# instantiate the worker instance. All access must be guarded and
# best effort.
def attempt_retry(worker, msg, queue, exception)
max_retry_attempts = retry_attempts_from(msg['retry'], @max_retries)
msg['queue'] = if msg['retry_queue']
msg['retry_queue']
else
queue
end
# 📝 以下で exception から message と class を取り出し、msg の error_message と error_class に入れている
# App code can stuff all sorts of crazy binary data into the error message
# that won't convert to JSON.
m = exception.message.to_s[0, 10_000]
if m.respond_to?(:scrub!)
m.force_encoding("utf-8")
m.scrub!
end
msg['error_message'] = m
msg['error_class'] = exception.class.name
count = if msg['retry_count']
msg['retried_at'] = Time.now.to_f
msg['retry_count'] += 1
else
msg['failed_at'] = Time.now.to_f
msg['retry_count'] = 0
end
# 📝 backtrace オプションが true / Integer の場合のみ msg の error_backtrace に exception の backtrace が入る
if msg['backtrace'] == true
msg['error_backtrace'] = exception.backtrace
elsif !msg['backtrace']
# do nothing
elsif msg['backtrace'].to_i != 0
msg['error_backtrace'] = exception.backtrace[0...msg['backtrace'].to_i]
end
# 🏁 最大リトライ数を超えると retries_exhausted に移動
if count < max_retry_attempts
delay = delay_for(worker, count, exception)
logger.debug { "Failure! Retry #{count} in #{delay} seconds" }
retry_at = Time.now.to_f + delay
payload = Sidekiq.dump_json(msg)
Sidekiq.redis do |conn|
conn.zadd('retry', retry_at.to_s, payload)
end
else
# Goodbye dear message, you (re)tried your best I'm sure.
retries_exhausted(worker, msg, exception)
end
end
def retries_exhausted(worker, msg, exception)
logger.debug { "Retries exhausted for job" }
begin
# 🏁 ここで sidekiq_retries_exhausted or default_retries_exhausted を呼んでます。
block = worker && worker.sidekiq_retries_exhausted_block || Sidekiq.default_retries_exhausted
block.call(msg, exception) if block
rescue => e
handle_exception(e, { context: "Error calling retries_exhausted for #{msg['class']}", job: msg })
end
# 💀 死亡キューに送られます
send_to_morgue(msg) unless msg['dead'] == false
end