LoginSignup
13
7

More than 5 years have passed since last update.

Sidekiq の Server プロセス内でバックグラウンド処理をするケースの手法を見てみた

Last updated at Posted at 2017-01-15

Sidekiq の Server プロセス内でバックグラウンド処理をするとは

Sidekiq は Client Side (Worker#perform_asyn を呼び出す方) と Server Side (スレッドベースで動作し Worker#perform を実行する方) の 2 つで構成されています。

Server Side の方はそれ自身がプロセスとして動作しており、そのプロセス内でさらにバックグラウンド処理を行っている拡張がいくつか存在します。

私が知るものの多くは「定期実行」と「cron のようなスケジュール」に関するものです。

ここでは、それらの拡張を見ていきながら、どういう手法でバックグラウンド処理を実施しているのかを見ていきます。

cron 処理に関するもの

Sidekiq 本家としては

Ent Periodic Jobs - sidekiq wiki

上記の Enterprise 版で機能を提供しています。
また、OSS で構成する例として以下の clockwork を用いたサンプルを上げています。

sidekiq/examples/clockwork.rb - sidekiq

以下は抜粋です。

module Clockwork
  # Kick off a bunch of jobs early in the morning
  every 1.day, 'my_worker.late_night_work', :at => '4:30 am' do
    MyWorker.late_night_work # この中で perform_async を呼び出しているだけ
  end

  every 1.hour do
    HourlyWorker.perform_async
  end
end

個人的には、ジョブスケジューラは何らか使っていると思うので、それに乗っかるぐらいでいいのではと思っています。

一方で、この手の拡張は多く作られています。

Recurring jobs | Related Projects - sidekiq wiki

sidekiq-cron

上記のリストの中では一番人気のあった endofunky/sidetiq が Sidekiq の Celluloid 脱却についていけなくなってしまったことで、現状はその次に人気のあったこちらが使われているのかもしれません。

ではどのように cron 処理を実現しているかを見ていきます

Sidekiq のポーリング処理をそのまま使う

ある意味一番 Sidekiq に乗っかった実装をしています。

lib/sidekiq/cron/poller.rb
module Sidekiq
  module Cron
    # The Poller checks Redis every N seconds for sheduled cron jobs
    class Poller < Sidekiq::Scheduled::Poller
      def enqueue
        Sidekiq::Cron::Job.all.each do |job|
          enqueue_job(job)
        end
      rescue => ex
        # ...
      end

上記の通り、 Sidekiq::Scheduled::Poller という Sidekiq 本体のスケジューラ実装を継承しています。

enqueue_job の内容をざっくり説明すると、渡された job が cron の情報自身なので、現在時刻と比較して起動させるべきな場合は Worker#perform_async を実行するというものです。

続いて、この Poller の呼び出しは以下のようになっています。
https://github.com/ondrejbartas/sidekiq-cron/blob/01ec3dc55b889cd9cba7e18d4ac5c676480a19a6/lib/sidekiq/cron/launcher.rb#L22-L22

lib/sidekiq/cron/launcher.rb
module Sidekiq
  class Launcher
    attr_reader :cron_poller

    alias_method :old_initialize, :initialize
    def initialize(options)
      @cron_poller = Sidekiq::Cron::Poller.new
      old_initialize options
    end

    alias_method :old_run, :run
    def run
      old_run
      cron_poller.start
    end

上記は Sidekiq の Launcher 自身にパッチを当てており、少し行儀が悪くなっているのですが、やりたいこととしては Sidekiq の Server side プロセスが立ち上がる時に、一緒に cron の poller も起動させる というものです。

この方法のメリット

Sidekiq::Scheduled::Poller 自身を継承することで、 Sidekiq がスケジュールジョブを実行するために、 定期的なポーリングを行っている処理 をそのまま活用できるため、自身でポーリング処理を書く必要がありません。

この方法のデメリット

デメリットを説明する前に、 Sidekiq 自身の Poller を見てみましょう。

必要最低限に抜粋したつもりですが、それでもコメントが多くて長いです :sweat_smile:

lib/sidekiq/scheduled.rb
module Sidekiq
  module Scheduled
    class Poller
      include Util

      INITIAL_WAIT = 10

      def start
        @thread ||= safe_thread("scheduler") do
          initial_wait

          while !@done
            enqueue
            wait
          end
        end
      end

      def enqueue
        begin
          @enq.enqueue_jobs
        rescue => ex
          # ...
        end
      end

      private

      def wait
        @sleeper.pop(random_poll_interval)
      rescue Timeout::Error
        # expected
      rescue => ex
        # ...
      end

      # Calculates a random interval that is ±50% the desired average.
      def random_poll_interval
        poll_interval_average * rand + poll_interval_average.to_f / 2
      end

      # We do our best to tune the poll interval to the size of the active Sidekiq
      # cluster.  If you have 30 processes and poll every 15 seconds, that means one
      # Sidekiq is checking Redis every 0.5 seconds - way too often for most people
      # and really bad if the retry or scheduled sets are large.
      #
      # Instead try to avoid polling more than once every 15 seconds.  If you have
      # 30 Sidekiq processes, we'll poll every 30 * 15 or 450 seconds.
      # To keep things statistically random, we'll sleep a random amount between
      # 225 and 675 seconds for each poll or 450 seconds on average.  Otherwise restarting
      # all your Sidekiq processes at the same time will lead to them all polling at
      # the same time: the thundering herd problem.
      #
      # We only do this if poll_interval_average is unset (the default).
      def poll_interval_average
        Sidekiq.options[:poll_interval_average] ||= scaled_poll_interval
      end

      # Calculates an average poll interval based on the number of known Sidekiq processes.
      # This minimizes a single point of failure by dispersing check-ins but without taxing
      # Redis if you run many Sidekiq processes.
      def scaled_poll_interval
        pcount = Sidekiq::ProcessSet.new.size
        pcount = 1 if pcount == 0
        pcount * Sidekiq.options[:average_scheduled_poll_interval]
      end
    end
  end
end

まず、Poller 自身は start された後は @done が false の限り続くループに入ります。
その中で enqueuewait を繰り返す訳ですが、ポーリング間隔は事実上 wait がどれぐらいになるかにかかってきます。

そして waitrandom_poll_interval で決定されるのですが、その内容は以下の要素から計算されて決まります。

  • Sidekiq の Server プロセスの数 (Server プロセスは 1 つ以上起動することができる)
  • poll_interval_average オプションの数値 (デフォルトは nil )
    • この数値が与えられていれば、この数値の +-50% で決まる。
  • average_scheduled_poll_interval オプションの数値 (デフォルトは 15)

例えば、オプションがデフォルト値でプロセスが 1 の場合は 15 秒の +-50% がポーリング間隔となります。

sidekiq-cron は最小の間隔単位が 1 分なのですが、それであっても ポーリング時の直前の cron の一致した時刻と 60 秒以上ずれていないか? でしか判断ができていません。
sidekiq-cron を使うようなケースで問題が起こるような課題ではないと思いますが、留意しておいても良いと思います。

加えて、対象のオプションは sidekiq-cron だけで使うオプションではなく、万が一 Sidekiq のスケジューラの設定を見直すつもりでオプションを変更してしまうと、その影響をもろに受けてしまいます :bomb: :scream:

デメリットをまとめると、以下になりそうです。

  • Sidekiq 自身のスケジューラのオプションを変更するときは sidekiq-cron の事も考慮する必要がある
  • ポーリング間隔に注意しないと、狙った時刻に起動できない可能性がある

sidekiq-scheduler

sidekiq-cron に続いて人気のありそうな拡張です。

こちらはどのように cron 処理を行っているのでしょうか。

rufus-scheduler をそのまま使う

sidekiq-cron もそうでしたが、 https://github.com/jmettraux/rufus-scheduler という gem を利用しています。ただし、sidekiq-cron は単に cron 記法のパーサーとして利用しているだけでしたが、こちらではスケジュール処理自体もその gem に任せています。

lib/sidekiq/scheduler.rb
# interval_type は cron, every, at, in, interval のいずれか
def new_job(name, interval_type, config, args)
  opts = { :job => true, :tags => [name] }

  rufus_scheduler.send(interval_type, *args, opts) do |job, time|
    idempotent_job_enqueue(name, time, sanitize_job_config(config)) if job_enabled?(name)
  end
end

ここだけ見ても分かりにくいのですが、指定された cron のスケジュールの job を登録する処理において、rufus_scheduler のスケジュール登録を行っています。

この方法のメリット・デメリット

先ほどの sidekiq-cron とは異なり、 Sidekiq とは独立した rufus_scheduler というスケジューラに依存していることがメリットにもデメリットにもなりえそうです。

rufus_scheduler 自身もスレッドで動作しているため、もし Sidekiq との相性が悪いような状況にならないとも限りません。一方で、Sidekiq の内部の設定等にも依存しないため安心して cron の設定をすることができそうです。

rufus_scheduler を読めていないので、意味のあるコメントができていませんね。。

cron 以外の定期実行に関するもの

さまざまあると思います。監視や拡張固有のキューのクリーンアップなど、用途は拡張によってそれぞれですが、ここでは、上記の 「Sidekiq::Scheduled::Poller 利用」「rufus_scheduler 利用」以外の例を 2 つ紹介します。

sidekiq-limit_fetch

Sidekiq の Server プロセスにおいて、特定のキューの並列数を制限できる拡張です。( Sidekiq 自体は、全体の並列数の指定のみで、キュー単位ではできない)

この拡張がバックグラウンドで行うのは以下です。

  • 複数の Server プロセスにおける並列数の制限もサポートしているため、 Server プロセスが現在どれぐらい存在しているかの監視と古いプロセス情報の削除
  • 対象となるキューを動的に追加する処理

スレッドを立ち上げて無限ループ

Sidekiq 本体が、Celluloid から Ruby 自身のスレッド処理に乗り換えたのはまだ記憶に新しいですが、sidekiq-limit-fetch も Server プロセスの開始時にスレッドを立ち上げて、後は無限ループをしているだけの実装です。

lib/sidekiq/limit_fetch/global/monitor.rb
module Sidekiq::LimitFetch::Global
  module Monitor
    extend self

    HEARTBEAT_PREFIX = 'limit:heartbeat:'
    PROCESS_SET = 'limit:processes'
    HEARTBEAT_TTL = 20
    REFRESH_TIMEOUT = 5

    def start!(ttl=HEARTBEAT_TTL, timeout=REFRESH_TIMEOUT)
      Thread.new do
        loop do
          Sidekiq::LimitFetch.redis_retryable do
            add_dynamic_queues
            update_heartbeat ttl
            invalidate_old_processes
          end

          sleep timeout
        end
      end
    end

呼び出しはこちらも少しお行儀が悪いですが、Sidekiq 自身の Manager にパッチを当てています。

lib/sidekiq/extensions/manager.rb
class Sidekiq::Manager
  module InitLimitFetch
    def start
      Sidekiq::LimitFetch::Queues.start options
      Sidekiq::LimitFetch::Global::Monitor.start!
      super
    end
  end

  prepend InitLimitFetch
end

この方法のメリット・デメリット

これは、この方法だからという訳ではなく、 sidekiq-limit-fetch の課題ではありますが、ここで上がったスレッドの内部で例外があがったとしたらどうなるでしょうか?

おそらく、その Server プロセスでのバックグラウンド処理は行われなくなります。

Sidekiq 自身もスレッドで処理を行っていますが、Sidekiq はスレッドが例外で死んだ場合には、Manager 側に処理を戻し、再度起こし直しています。

lib/sidekiq/processor.rb
module Sidekiq
  # Thread として実行されるクラス。limit-fetch の方と同様に Manager から実行される。
  class Processor
    def run
      begin
        while !@done
          process_one
        end
        @mgr.processor_stopped(self)
      rescue Sidekiq::Shutdown
        @mgr.processor_stopped(self)
      rescue Exception => ex
        # 例外が発生されると、親の Manager を呼び出す
        @mgr.processor_died(self, ex)
      end
    end

lib/sidekiq/manager.rb
module Sidekiq
  class Manager
    def processor_died(processor, reason)
      @plock.synchronize do
        @workers.delete(processor)
        unless @done
          p = Processor.new(self)
          @workers << p
          p.start
        end
      end
    end

sidekiq-limit-fetch が行っている Monitor の処理はそうそう失敗するものではないですが、この点は注意が必要ですし、この方法を使う場合には上手く対処してあげる必要がでてきます。

attentive_sidekiq

消失してしまった Sidekiq job を管理するための拡張です。

この拡張がバックグラウンドで行うのは、 server middleware で溜め続けている実行 job の記録から、消失したと疑われる job を定期的に探し、消失リストに加えることです。

concurrent-ruby の Concurrent::TimerTask を使う

https://github.com/ruby-concurrency/concurrent-ruby
http://www.rubydoc.info/github/ruby-concurrency/concurrent-ruby/Concurrent/TimerTask

私自身が Concurrent::TimerTask に対して不勉強なので説明はできないのですが、 concurrent-ruby 製の定期実行なのかなと思います。処理に対して Observer を付けられるのが面白いですね。

この拡張も特殊な使い方はしておらず、以下のように普通にインターバルを与えて実行しています。

module AttentiveSidekiq
  class Manager
    def start!
      task = Concurrent::TimerTask.new(options) do
        AttentiveSidekiq::Manager.instance.update_disappeared_jobs
      end
      task.add_observer(AttentiveSidekiq::UpdaterObserver.new)
      task.execute
    end

    private

    def options
      { 
        execution_interval: AttentiveSidekiq.execution_interval,
        timeout_interval: AttentiveSidekiq.timeout_interval
      }
    end

この方法のメリット・デメリット

おそらくですが、この拡張で欲しい定期実行であれば、最初の Sidekiq::Scheduled::Poller を使った方法でも実現はできました。(デフォルトのインターバル値を見ると、 Poller のデフォルトの 15 sec では間隔短すぎかもですが)

そういう意味では、この拡張は sidekiq-schedulerrufus_scheduler という「cron 作るのによく使われている方法」を採用したのと同様に、「定期実行を実現するのによく使われる方法」を採用したというだけなのかもしれません。

sidekiq-limit-fetch のようにスレッドを利用する方法に対しては、TimerTask が内部で Concurrent::SafeTaskExecutor を利用しているため、Observer にエラーがあったことは通知されますが、スレッドが落ちて以降の処理が行われないということは無さそうです。

まとめ

ここまで以下の 4 つの拡張とそれが採用する方法を見てきました。

  • sidekiq-cron
    • Sidekiq::Scheduled::Poller を継承したクラスを使い、 Poller 相当のインターバル実行を行う
  • sidekiq-scheduler
    • rufus_scheduler を使い、 cron 実行を素直に実装
  • sidekiq-limit-fetch
    • スレッドを立ち上げて、内部で無限ループ & sleep を繰り返しインターバル実行を行う
  • attentive_sidekiq
    • Concurrent::TimerTask を使ってインターバル実行を素直に実装

それぞれメリット・デメリットがあり、他にもやり方はあると思いますが、実現したいケースに応じて適切に手段を選べると良さそうです。

Appendix

Server プロセスのバックグラウンド処理を開始するのはどこがいいか

「お行儀が悪い」と何度か書きましたが、この手の Server プロセスでのバックグラウンド処理を起動するのには一応用意された場所があります。

Events | Deployment - sidekiq wiki

ここにあるように startup, quiet (停止前に新たな非同期ジョブを生成しなくなる状態), shutdown (実行中だった非同期ジョブを全部実行しおわり、Server プロセス自体が停止する) の 3 つに加えて heartbeat の 4 つがイベントとして用意されています。

この startup で行うのが本来的には良さそうです。

sidekiq-cron は本家からツッコまれている

sidekiq-scheculer はそのとおりにやってる

13
7
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
13
7