Sidekiq の Server プロセス内でバックグラウンド処理をするとは
Sidekiq は Client Side (Worker#perform_asyn
を呼び出す方) と Server Side (スレッドベースで動作し Worker#perform
を実行する方) の 2 つで構成されています。
Server Side の方はそれ自身がプロセスとして動作しており、そのプロセス内でさらにバックグラウンド処理を行っている拡張がいくつか存在します。
私が知るものの多くは「定期実行」と「cron のようなスケジュール」に関するものです。
ここでは、それらの拡張を見ていきながら、どういう手法でバックグラウンド処理を実施しているのかを見ていきます。
cron 処理に関するもの
Sidekiq 本家としては
Ent Periodic Jobs - sidekiq wiki
上記の Enterprise 版で機能を提供しています。
また、OSS で構成する例として以下の clockwork
を用いたサンプルを上げています。
sidekiq/examples/clockwork.rb - sidekiq
以下は抜粋です。
module Clockwork
# Kick off a bunch of jobs early in the morning
every 1.day, 'my_worker.late_night_work', :at => '4:30 am' do
MyWorker.late_night_work # この中で perform_async を呼び出しているだけ
end
every 1.hour do
HourlyWorker.perform_async
end
end
個人的には、ジョブスケジューラは何らか使っていると思うので、それに乗っかるぐらいでいいのではと思っています。
一方で、この手の拡張は多く作られています。
Recurring jobs | Related Projects - sidekiq wiki
sidekiq-cron
上記のリストの中では一番人気のあった endofunky/sidetiq が Sidekiq の Celluloid 脱却についていけなくなってしまったことで、現状はその次に人気のあったこちらが使われているのかもしれません。
ではどのように cron 処理を実現しているかを見ていきます
Sidekiq のポーリング処理をそのまま使う
ある意味一番 Sidekiq に乗っかった実装をしています。
module Sidekiq
module Cron
# The Poller checks Redis every N seconds for sheduled cron jobs
class Poller < Sidekiq::Scheduled::Poller
def enqueue
Sidekiq::Cron::Job.all.each do |job|
enqueue_job(job)
end
rescue => ex
# ...
end
上記の通り、 Sidekiq::Scheduled::Poller
という Sidekiq 本体のスケジューラ実装を継承しています。
enqueue_job
の内容をざっくり説明すると、渡された job
が cron の情報自身なので、現在時刻と比較して起動させるべきな場合は Worker#perform_async
を実行するというものです。
続いて、この Poller の呼び出しは以下のようになっています。
https://github.com/ondrejbartas/sidekiq-cron/blob/01ec3dc55b889cd9cba7e18d4ac5c676480a19a6/lib/sidekiq/cron/launcher.rb#L22-L22
module Sidekiq
class Launcher
attr_reader :cron_poller
alias_method :old_initialize, :initialize
def initialize(options)
@cron_poller = Sidekiq::Cron::Poller.new
old_initialize options
end
alias_method :old_run, :run
def run
old_run
cron_poller.start
end
上記は Sidekiq の Launcher
自身にパッチを当てており、少し行儀が悪くなっているのですが、やりたいこととしては Sidekiq の Server side プロセスが立ち上がる時に、一緒に cron の poller も起動させる というものです。
この方法のメリット
Sidekiq::Scheduled::Poller
自身を継承することで、 Sidekiq がスケジュールジョブを実行するために、 定期的なポーリングを行っている処理 をそのまま活用できるため、自身でポーリング処理を書く必要がありません。
この方法のデメリット
デメリットを説明する前に、 Sidekiq 自身の Poller
を見てみましょう。
必要最低限に抜粋したつもりですが、それでもコメントが多くて長いです
module Sidekiq
module Scheduled
class Poller
include Util
INITIAL_WAIT = 10
def start
@thread ||= safe_thread("scheduler") do
initial_wait
while !@done
enqueue
wait
end
end
end
def enqueue
begin
@enq.enqueue_jobs
rescue => ex
# ...
end
end
private
def wait
@sleeper.pop(random_poll_interval)
rescue Timeout::Error
# expected
rescue => ex
# ...
end
# Calculates a random interval that is ±50% the desired average.
def random_poll_interval
poll_interval_average * rand + poll_interval_average.to_f / 2
end
# We do our best to tune the poll interval to the size of the active Sidekiq
# cluster. If you have 30 processes and poll every 15 seconds, that means one
# Sidekiq is checking Redis every 0.5 seconds - way too often for most people
# and really bad if the retry or scheduled sets are large.
#
# Instead try to avoid polling more than once every 15 seconds. If you have
# 30 Sidekiq processes, we'll poll every 30 * 15 or 450 seconds.
# To keep things statistically random, we'll sleep a random amount between
# 225 and 675 seconds for each poll or 450 seconds on average. Otherwise restarting
# all your Sidekiq processes at the same time will lead to them all polling at
# the same time: the thundering herd problem.
#
# We only do this if poll_interval_average is unset (the default).
def poll_interval_average
Sidekiq.options[:poll_interval_average] ||= scaled_poll_interval
end
# Calculates an average poll interval based on the number of known Sidekiq processes.
# This minimizes a single point of failure by dispersing check-ins but without taxing
# Redis if you run many Sidekiq processes.
def scaled_poll_interval
pcount = Sidekiq::ProcessSet.new.size
pcount = 1 if pcount == 0
pcount * Sidekiq.options[:average_scheduled_poll_interval]
end
end
end
end
まず、Poller
自身は start
された後は @done
が false の限り続くループに入ります。
その中で enqueue
と wait
を繰り返す訳ですが、ポーリング間隔は事実上 wait
がどれぐらいになるかにかかってきます。
そして wait
は random_poll_interval
で決定されるのですが、その内容は以下の要素から計算されて決まります。
- Sidekiq の Server プロセスの数 (Server プロセスは 1 つ以上起動することができる)
-
poll_interval_average
オプションの数値 (デフォルトはnil
)- この数値が与えられていれば、この数値の
+-50%
で決まる。
- この数値が与えられていれば、この数値の
-
average_scheduled_poll_interval
オプションの数値 (デフォルトは15
)
例えば、オプションがデフォルト値でプロセスが 1 の場合は 15 秒の +-50% がポーリング間隔となります。
sidekiq-cron
は最小の間隔単位が 1 分なのですが、それであっても ポーリング時の直前の cron の一致した時刻と 60 秒以上ずれていないか? でしか判断ができていません。
sidekiq-cron
を使うようなケースで問題が起こるような課題ではないと思いますが、留意しておいても良いと思います。
加えて、対象のオプションは sidekiq-cron
だけで使うオプションではなく、万が一 Sidekiq のスケジューラの設定を見直すつもりでオプションを変更してしまうと、その影響をもろに受けてしまいます
デメリットをまとめると、以下になりそうです。
- Sidekiq 自身のスケジューラのオプションを変更するときは
sidekiq-cron
の事も考慮する必要がある - ポーリング間隔に注意しないと、狙った時刻に起動できない可能性がある
sidekiq-scheduler
sidekiq-cron
に続いて人気のありそうな拡張です。
こちらはどのように cron 処理を行っているのでしょうか。
rufus-scheduler をそのまま使う
sidekiq-cron
もそうでしたが、 https://github.com/jmettraux/rufus-scheduler という gem を利用しています。ただし、sidekiq-cron
は単に cron 記法のパーサーとして利用しているだけでしたが、こちらではスケジュール処理自体もその gem に任せています。
# interval_type は cron, every, at, in, interval のいずれか
def new_job(name, interval_type, config, args)
opts = { :job => true, :tags => [name] }
rufus_scheduler.send(interval_type, *args, opts) do |job, time|
idempotent_job_enqueue(name, time, sanitize_job_config(config)) if job_enabled?(name)
end
end
ここだけ見ても分かりにくいのですが、指定された cron のスケジュールの job を登録する処理において、rufus_scheduler
のスケジュール登録を行っています。
この方法のメリット・デメリット
先ほどの sidekiq-cron
とは異なり、 Sidekiq とは独立した rufus_scheduler
というスケジューラに依存していることがメリットにもデメリットにもなりえそうです。
rufus_scheduler
自身もスレッドで動作しているため、もし Sidekiq との相性が悪いような状況にならないとも限りません。一方で、Sidekiq の内部の設定等にも依存しないため安心して cron の設定をすることができそうです。
rufus_scheduler を読めていないので、意味のあるコメントができていませんね。。
cron 以外の定期実行に関するもの
さまざまあると思います。監視や拡張固有のキューのクリーンアップなど、用途は拡張によってそれぞれですが、ここでは、上記の 「Sidekiq::Scheduled::Poller
利用」「rufus_scheduler
利用」以外の例を 2 つ紹介します。
sidekiq-limit_fetch
Sidekiq の Server プロセスにおいて、特定のキューの並列数を制限できる拡張です。( Sidekiq 自体は、全体の並列数の指定のみで、キュー単位ではできない)
この拡張がバックグラウンドで行うのは以下です。
- 複数の Server プロセスにおける並列数の制限もサポートしているため、 Server プロセスが現在どれぐらい存在しているかの監視と古いプロセス情報の削除
- 対象となるキューを動的に追加する処理
スレッドを立ち上げて無限ループ
Sidekiq 本体が、Celluloid
から Ruby 自身のスレッド処理に乗り換えたのはまだ記憶に新しいですが、sidekiq-limit-fetch
も Server プロセスの開始時にスレッドを立ち上げて、後は無限ループをしているだけの実装です。
module Sidekiq::LimitFetch::Global
module Monitor
extend self
HEARTBEAT_PREFIX = 'limit:heartbeat:'
PROCESS_SET = 'limit:processes'
HEARTBEAT_TTL = 20
REFRESH_TIMEOUT = 5
def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
Thread.new do
loop do
Sidekiq::LimitFetch.redis_retryable do
add_dynamic_queues
update_heartbeat ttl
invalidate_old_processes
end
sleep timeout
end
end
end
呼び出しはこちらも少しお行儀が悪いですが、Sidekiq 自身の Manager
にパッチを当てています。
class Sidekiq::Manager
module InitLimitFetch
def start
Sidekiq::LimitFetch::Queues.start options
Sidekiq::LimitFetch::Global::Monitor.start!
super
end
end
prepend InitLimitFetch
end
この方法のメリット・デメリット
これは、この方法だからという訳ではなく、 sidekiq-limit-fetch
の課題ではありますが、ここで上がったスレッドの内部で例外があがったとしたらどうなるでしょうか?
おそらく、その Server プロセスでのバックグラウンド処理は行われなくなります。
Sidekiq 自身もスレッドで処理を行っていますが、Sidekiq はスレッドが例外で死んだ場合には、Manager 側に処理を戻し、再度起こし直しています。
module Sidekiq
# Thread として実行されるクラス。limit-fetch の方と同様に Manager から実行される。
class Processor
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
# 例外が発生されると、親の Manager を呼び出す
@mgr.processor_died(self, ex)
end
end
module Sidekiq
class Manager
def processor_died(processor, reason)
@plock.synchronize do
@workers.delete(processor)
unless @done
p = Processor.new(self)
@workers << p
p.start
end
end
end
sidekiq-limit-fetch
が行っている Monitor の処理はそうそう失敗するものではないですが、この点は注意が必要ですし、この方法を使う場合には上手く対処してあげる必要がでてきます。
attentive_sidekiq
消失してしまった Sidekiq job を管理するための拡張です。
この拡張がバックグラウンドで行うのは、 server middleware で溜め続けている実行 job の記録から、消失したと疑われる job を定期的に探し、消失リストに加えることです。
concurrent-ruby の Concurrent::TimerTask を使う
https://github.com/ruby-concurrency/concurrent-ruby
http://www.rubydoc.info/github/ruby-concurrency/concurrent-ruby/Concurrent/TimerTask
私自身が Concurrent::TimerTask
に対して不勉強なので説明はできないのですが、 concurrent-ruby
製の定期実行なのかなと思います。処理に対して Observer
を付けられるのが面白いですね。
この拡張も特殊な使い方はしておらず、以下のように普通にインターバルを与えて実行しています。
module AttentiveSidekiq
class Manager
def start!
task = Concurrent::TimerTask.new(options) do
AttentiveSidekiq::Manager.instance.update_disappeared_jobs
end
task.add_observer(AttentiveSidekiq::UpdaterObserver.new)
task.execute
end
private
def options
{
execution_interval: AttentiveSidekiq.execution_interval,
timeout_interval: AttentiveSidekiq.timeout_interval
}
end
この方法のメリット・デメリット
おそらくですが、この拡張で欲しい定期実行であれば、最初の Sidekiq::Scheduled::Poller
を使った方法でも実現はできました。(デフォルトのインターバル値を見ると、 Poller
のデフォルトの 15 sec では間隔短すぎかもですが)
そういう意味では、この拡張は sidekiq-scheduler
が rufus_scheduler
という「cron 作るのによく使われている方法」を採用したのと同様に、「定期実行を実現するのによく使われる方法」を採用したというだけなのかもしれません。
sidekiq-limit-fetch
のようにスレッドを利用する方法に対しては、TimerTask
が内部で Concurrent::SafeTaskExecutor
を利用しているため、Observer
にエラーがあったことは通知されますが、スレッドが落ちて以降の処理が行われないということは無さそうです。
まとめ
ここまで以下の 4 つの拡張とそれが採用する方法を見てきました。
- sidekiq-cron
-
Sidekiq::Scheduled::Poller
を継承したクラスを使い、Poller
相当のインターバル実行を行う
-
- sidekiq-scheduler
-
rufus_scheduler
を使い、 cron 実行を素直に実装
-
- sidekiq-limit-fetch
- スレッドを立ち上げて、内部で無限ループ & sleep を繰り返しインターバル実行を行う
- attentive_sidekiq
-
Concurrent::TimerTask
を使ってインターバル実行を素直に実装
-
それぞれメリット・デメリットがあり、他にもやり方はあると思いますが、実現したいケースに応じて適切に手段を選べると良さそうです。
Appendix
Server プロセスのバックグラウンド処理を開始するのはどこがいいか
「お行儀が悪い」と何度か書きましたが、この手の Server プロセスでのバックグラウンド処理を起動するのには一応用意された場所があります。
Events | Deployment - sidekiq wiki
ここにあるように startup
, quiet
(停止前に新たな非同期ジョブを生成しなくなる状態), shutdown
(実行中だった非同期ジョブを全部実行しおわり、Server プロセス自体が停止する) の 3 つに加えて heartbeat
の 4 つがイベントとして用意されています。
この startup
で行うのが本来的には良さそうです。