Sidekiq の Server プロセス内でバックグラウンド処理をするとは
Sidekiq は Client Side (Worker#perform_asyn を呼び出す方) と Server Side (スレッドベースで動作し Worker#perform を実行する方) の 2 つで構成されています。
Server Side の方はそれ自身がプロセスとして動作しており、そのプロセス内でさらにバックグラウンド処理を行っている拡張がいくつか存在します。
私が知るものの多くは「定期実行」と「cron のようなスケジュール」に関するものです。
ここでは、それらの拡張を見ていきながら、どういう手法でバックグラウンド処理を実施しているのかを見ていきます。
cron 処理に関するもの
Sidekiq 本家としては
Ent Periodic Jobs - sidekiq wiki
上記の Enterprise 版で機能を提供しています。
また、OSS で構成する例として以下の clockwork を用いたサンプルを上げています。
sidekiq/examples/clockwork.rb - sidekiq
以下は抜粋です。
module Clockwork
  # Kick off a bunch of jobs early in the morning
  every 1.day, 'my_worker.late_night_work', :at => '4:30 am' do
    MyWorker.late_night_work # この中で perform_async を呼び出しているだけ
  end
  every 1.hour do
    HourlyWorker.perform_async
  end
end
個人的には、ジョブスケジューラは何らか使っていると思うので、それに乗っかるぐらいでいいのではと思っています。
一方で、この手の拡張は多く作られています。
Recurring jobs | Related Projects - sidekiq wiki
sidekiq-cron
上記のリストの中では一番人気のあった endofunky/sidetiq が Sidekiq の Celluloid 脱却についていけなくなってしまったことで、現状はその次に人気のあったこちらが使われているのかもしれません。
ではどのように cron 処理を実現しているかを見ていきます
Sidekiq のポーリング処理をそのまま使う
ある意味一番 Sidekiq に乗っかった実装をしています。
module Sidekiq
  module Cron
    # The Poller checks Redis every N seconds for sheduled cron jobs
    class Poller < Sidekiq::Scheduled::Poller
      def enqueue
        Sidekiq::Cron::Job.all.each do |job|
          enqueue_job(job)
        end
      rescue => ex
        # ...
      end
上記の通り、 Sidekiq::Scheduled::Poller という Sidekiq 本体のスケジューラ実装を継承しています。
enqueue_job の内容をざっくり説明すると、渡された job が cron の情報自身なので、現在時刻と比較して起動させるべきな場合は Worker#perform_async を実行するというものです。
続いて、この Poller の呼び出しは以下のようになっています。
https://github.com/ondrejbartas/sidekiq-cron/blob/01ec3dc55b889cd9cba7e18d4ac5c676480a19a6/lib/sidekiq/cron/launcher.rb#L22-L22
module Sidekiq
  class Launcher
    attr_reader :cron_poller
    alias_method :old_initialize, :initialize
    def initialize(options)
      @cron_poller = Sidekiq::Cron::Poller.new
      old_initialize options
    end
    alias_method :old_run, :run
    def run
      old_run
      cron_poller.start
    end
上記は Sidekiq の Launcher 自身にパッチを当てており、少し行儀が悪くなっているのですが、やりたいこととしては Sidekiq の Server side プロセスが立ち上がる時に、一緒に cron の poller も起動させる というものです。
この方法のメリット
Sidekiq::Scheduled::Poller 自身を継承することで、 Sidekiq がスケジュールジョブを実行するために、 定期的なポーリングを行っている処理 をそのまま活用できるため、自身でポーリング処理を書く必要がありません。
この方法のデメリット
デメリットを説明する前に、 Sidekiq 自身の Poller を見てみましょう。
必要最低限に抜粋したつもりですが、それでもコメントが多くて長いです ![]()
module Sidekiq
  module Scheduled
    class Poller
      include Util
      INITIAL_WAIT = 10
      def start
        @thread ||= safe_thread("scheduler") do
          initial_wait
          while !@done
            enqueue
            wait
          end
        end
      end
      def enqueue
        begin
          @enq.enqueue_jobs
        rescue => ex
          # ...
        end
      end
      private
      def wait
        @sleeper.pop(random_poll_interval)
      rescue Timeout::Error
        # expected
      rescue => ex
        # ...
      end
      # Calculates a random interval that is ±50% the desired average.
      def random_poll_interval
        poll_interval_average * rand + poll_interval_average.to_f / 2
      end
      # We do our best to tune the poll interval to the size of the active Sidekiq
      # cluster.  If you have 30 processes and poll every 15 seconds, that means one
      # Sidekiq is checking Redis every 0.5 seconds - way too often for most people
      # and really bad if the retry or scheduled sets are large.
      #
      # Instead try to avoid polling more than once every 15 seconds.  If you have
      # 30 Sidekiq processes, we'll poll every 30 * 15 or 450 seconds.
      # To keep things statistically random, we'll sleep a random amount between
      # 225 and 675 seconds for each poll or 450 seconds on average.  Otherwise restarting
      # all your Sidekiq processes at the same time will lead to them all polling at
      # the same time: the thundering herd problem.
      #
      # We only do this if poll_interval_average is unset (the default).
      def poll_interval_average
        Sidekiq.options[:poll_interval_average] ||= scaled_poll_interval
      end
      # Calculates an average poll interval based on the number of known Sidekiq processes.
      # This minimizes a single point of failure by dispersing check-ins but without taxing
      # Redis if you run many Sidekiq processes.
      def scaled_poll_interval
        pcount = Sidekiq::ProcessSet.new.size
        pcount = 1 if pcount == 0
        pcount * Sidekiq.options[:average_scheduled_poll_interval]
      end
    end
  end
end
まず、Poller 自身は start された後は @done が false の限り続くループに入ります。
その中で enqueue と wait を繰り返す訳ですが、ポーリング間隔は事実上 wait がどれぐらいになるかにかかってきます。
そして wait は random_poll_interval で決定されるのですが、その内容は以下の要素から計算されて決まります。
- Sidekiq の Server プロセスの数 (Server プロセスは 1 つ以上起動することができる)
 - 
poll_interval_averageオプションの数値 (デフォルトはnil)- この数値が与えられていれば、この数値の 
+-50%で決まる。 
 - この数値が与えられていれば、この数値の 
 - 
average_scheduled_poll_intervalオプションの数値 (デフォルトは15) 
例えば、オプションがデフォルト値でプロセスが 1 の場合は 15 秒の +-50% がポーリング間隔となります。
sidekiq-cron は最小の間隔単位が 1 分なのですが、それであっても ポーリング時の直前の cron の一致した時刻と 60 秒以上ずれていないか? でしか判断ができていません。
sidekiq-cron を使うようなケースで問題が起こるような課題ではないと思いますが、留意しておいても良いと思います。
加えて、対象のオプションは sidekiq-cron だけで使うオプションではなく、万が一 Sidekiq のスケジューラの設定を見直すつもりでオプションを変更してしまうと、その影響をもろに受けてしまいます 
 ![]()
デメリットをまとめると、以下になりそうです。
- Sidekiq 自身のスケジューラのオプションを変更するときは 
sidekiq-cronの事も考慮する必要がある - ポーリング間隔に注意しないと、狙った時刻に起動できない可能性がある
 
sidekiq-scheduler
sidekiq-cron に続いて人気のありそうな拡張です。
こちらはどのように cron 処理を行っているのでしょうか。
rufus-scheduler をそのまま使う
sidekiq-cron もそうでしたが、 https://github.com/jmettraux/rufus-scheduler という gem を利用しています。ただし、sidekiq-cron は単に cron 記法のパーサーとして利用しているだけでしたが、こちらではスケジュール処理自体もその gem に任せています。
# interval_type は cron, every, at, in, interval のいずれか
def new_job(name, interval_type, config, args)
  opts = { :job => true, :tags => [name] }
  rufus_scheduler.send(interval_type, *args, opts) do |job, time|
    idempotent_job_enqueue(name, time, sanitize_job_config(config)) if job_enabled?(name)
  end
end
ここだけ見ても分かりにくいのですが、指定された cron のスケジュールの job を登録する処理において、rufus_scheduler のスケジュール登録を行っています。
この方法のメリット・デメリット
先ほどの sidekiq-cron とは異なり、 Sidekiq とは独立した rufus_scheduler というスケジューラに依存していることがメリットにもデメリットにもなりえそうです。
rufus_scheduler 自身もスレッドで動作しているため、もし Sidekiq との相性が悪いような状況にならないとも限りません。一方で、Sidekiq の内部の設定等にも依存しないため安心して cron の設定をすることができそうです。
rufus_scheduler を読めていないので、意味のあるコメントができていませんね。。
cron 以外の定期実行に関するもの
さまざまあると思います。監視や拡張固有のキューのクリーンアップなど、用途は拡張によってそれぞれですが、ここでは、上記の 「Sidekiq::Scheduled::Poller 利用」「rufus_scheduler 利用」以外の例を 2 つ紹介します。
sidekiq-limit_fetch
Sidekiq の Server プロセスにおいて、特定のキューの並列数を制限できる拡張です。( Sidekiq 自体は、全体の並列数の指定のみで、キュー単位ではできない)
この拡張がバックグラウンドで行うのは以下です。
- 複数の Server プロセスにおける並列数の制限もサポートしているため、 Server プロセスが現在どれぐらい存在しているかの監視と古いプロセス情報の削除
 - 対象となるキューを動的に追加する処理
 
スレッドを立ち上げて無限ループ
Sidekiq 本体が、Celluloid から Ruby 自身のスレッド処理に乗り換えたのはまだ記憶に新しいですが、sidekiq-limit-fetch も Server プロセスの開始時にスレッドを立ち上げて、後は無限ループをしているだけの実装です。
module Sidekiq::LimitFetch::Global
  module Monitor
    extend self
    HEARTBEAT_PREFIX = 'limit:heartbeat:'
    PROCESS_SET = 'limit:processes'
    HEARTBEAT_TTL = 20
    REFRESH_TIMEOUT = 5
    def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
      Thread.new do
        loop do
          Sidekiq::LimitFetch.redis_retryable do
            add_dynamic_queues
            update_heartbeat ttl
            invalidate_old_processes
          end
          sleep timeout
        end
      end
    end
呼び出しはこちらも少しお行儀が悪いですが、Sidekiq 自身の Manager にパッチを当てています。
class Sidekiq::Manager
  module InitLimitFetch
    def start
      Sidekiq::LimitFetch::Queues.start options
      Sidekiq::LimitFetch::Global::Monitor.start!
      super
    end
  end
  prepend InitLimitFetch
end
この方法のメリット・デメリット
これは、この方法だからという訳ではなく、 sidekiq-limit-fetch の課題ではありますが、ここで上がったスレッドの内部で例外があがったとしたらどうなるでしょうか?
おそらく、その Server プロセスでのバックグラウンド処理は行われなくなります。
Sidekiq 自身もスレッドで処理を行っていますが、Sidekiq はスレッドが例外で死んだ場合には、Manager 側に処理を戻し、再度起こし直しています。
module Sidekiq
  # Thread として実行されるクラス。limit-fetch の方と同様に Manager から実行される。
  class Processor
    def run
      begin
        while !@done
          process_one
        end
        @mgr.processor_stopped(self)
      rescue Sidekiq::Shutdown
        @mgr.processor_stopped(self)
      rescue Exception => ex
        # 例外が発生されると、親の Manager を呼び出す
        @mgr.processor_died(self, ex)
      end
    end
module Sidekiq
  class Manager
    def processor_died(processor, reason)
      @plock.synchronize do
        @workers.delete(processor)
        unless @done
          p = Processor.new(self)
          @workers << p
          p.start
        end
      end
    end
sidekiq-limit-fetch が行っている Monitor の処理はそうそう失敗するものではないですが、この点は注意が必要ですし、この方法を使う場合には上手く対処してあげる必要がでてきます。
attentive_sidekiq
消失してしまった Sidekiq job を管理するための拡張です。
この拡張がバックグラウンドで行うのは、 server middleware で溜め続けている実行 job の記録から、消失したと疑われる job を定期的に探し、消失リストに加えることです。
concurrent-ruby の Concurrent::TimerTask を使う
https://github.com/ruby-concurrency/concurrent-ruby
http://www.rubydoc.info/github/ruby-concurrency/concurrent-ruby/Concurrent/TimerTask
私自身が Concurrent::TimerTask に対して不勉強なので説明はできないのですが、 concurrent-ruby 製の定期実行なのかなと思います。処理に対して Observer を付けられるのが面白いですね。
この拡張も特殊な使い方はしておらず、以下のように普通にインターバルを与えて実行しています。
module AttentiveSidekiq
  class Manager
    def start!
      task = Concurrent::TimerTask.new(options) do
        AttentiveSidekiq::Manager.instance.update_disappeared_jobs
      end
      task.add_observer(AttentiveSidekiq::UpdaterObserver.new)
      task.execute
    end
    private
    
    def options
      { 
        execution_interval: AttentiveSidekiq.execution_interval,
        timeout_interval: AttentiveSidekiq.timeout_interval
      }
    end
この方法のメリット・デメリット
おそらくですが、この拡張で欲しい定期実行であれば、最初の Sidekiq::Scheduled::Poller を使った方法でも実現はできました。(デフォルトのインターバル値を見ると、 Poller のデフォルトの 15 sec では間隔短すぎかもですが)
そういう意味では、この拡張は sidekiq-scheduler が rufus_scheduler という「cron 作るのによく使われている方法」を採用したのと同様に、「定期実行を実現するのによく使われる方法」を採用したというだけなのかもしれません。
sidekiq-limit-fetch のようにスレッドを利用する方法に対しては、TimerTask が内部で Concurrent::SafeTaskExecutor を利用しているため、Observer にエラーがあったことは通知されますが、スレッドが落ちて以降の処理が行われないということは無さそうです。
まとめ
ここまで以下の 4 つの拡張とそれが採用する方法を見てきました。
- sidekiq-cron
- 
Sidekiq::Scheduled::Pollerを継承したクラスを使い、Poller相当のインターバル実行を行う 
 - 
 - sidekiq-scheduler
- 
rufus_schedulerを使い、 cron 実行を素直に実装 
 - 
 - sidekiq-limit-fetch
- スレッドを立ち上げて、内部で無限ループ & sleep を繰り返しインターバル実行を行う
 
 - attentive_sidekiq
- 
Concurrent::TimerTaskを使ってインターバル実行を素直に実装 
 - 
 
それぞれメリット・デメリットがあり、他にもやり方はあると思いますが、実現したいケースに応じて適切に手段を選べると良さそうです。
Appendix
Server プロセスのバックグラウンド処理を開始するのはどこがいいか
「お行儀が悪い」と何度か書きましたが、この手の Server プロセスでのバックグラウンド処理を起動するのには一応用意された場所があります。
Events | Deployment - sidekiq wiki
ここにあるように startup, quiet (停止前に新たな非同期ジョブを生成しなくなる状態), shutdown (実行中だった非同期ジョブを全部実行しおわり、Server プロセス自体が停止する) の 3 つに加えて heartbeat の 4 つがイベントとして用意されています。
この startup で行うのが本来的には良さそうです。