Help us understand the problem. What is going on with this article?

Sidekiq における Redis からの job の取得を、 Custom の Fetcher を使って拡張する

More than 3 years have passed since last update.

Sidekiq の拡張ポイント

Sidekiq の機能拡張を考える時に、真っ先に検討されるのが Sidekiq middleware です。

Middleware - mperham/sidekiq Wiki

middleware は Client Side (#perform_asyn を呼び出す方) と Server Side (スレッドベースで動作する Worker の方) の両方を Rack や Faraday のように #call のインターフェースを持ったクラスを登録するだけで、その挙動を拡張できます。

ただし、middleware は Decorator パターンのようなものなので、Sidekiq の内部の処理に大きく介入できる訳ではありません。

Redis からの Job 取得処理に用意された拡張ポイント

おそらく本家として Pro や Enterprise 版の拡張をやりやすくするために用意しているのだと思いますが(Wiki で文書化されているようには見えなかったので)、Sidekiq には Redis からの Job 取得処理を置き換えるポイントが用意されています。

https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/processor.rb#L38

sidekiq/lib/sidekiq/processor.rb
module Sidekiq
  class Processor

    include Util

    attr_reader :thread
    attr_reader :job

    def initialize(mgr)
      @mgr = mgr
      @down = false
      @done = false
      @job = nil
      @thread = nil

      # options[:fetch] の方を優先しているのが分かります。
      @strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)

      @reloader = Sidekiq.options[:reloader]
      @executor = Sidekiq.options[:executor]
    end

上記は Processor というクラスになりますが、Sidekiq 内においては、各スレッド毎の処理を担っているものです。

この options の設定は簡単です。以下のように middleware を登録する時のように設定時に渡せば良いだけです。

Sidekiq.configure_server do |config|
  config.options[:fetch] = HogeFetcher
end

以降では、この Fetcher の拡張を詳しく見ていきます。

Fetcher をどのように拡張するか?

それを確かめるために、先ほどの Processor を更に詳しく見ていきます。

Fetch 部分を見る

module Sidekiq
  class Processor
    # 省略

    def run
      begin
        while !@done
          process_one
        end
        @mgr.processor_stopped(self)
      rescue Sidekiq::Shutdown
        @mgr.processor_stopped(self)
      rescue Exception => ex
        @mgr.processor_died(self, ex)
      end
    end

    def process_one
      @job = fetch
      process(@job) if @job
      @job = nil
    end

    def fetch
      j = get_one
      if j && @done
        j.requeue
        nil
      else
        j
      end
    end

    def get_one
      begin
        work = @strategy.retrieve_work
        (logger.info { "Redis is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down
        work
      rescue Sidekiq::Shutdown
      rescue => ex
        handle_fetch_exception(ex)
      end
    end

先ほどの #initializeFetcher@strategy というインスタンス変数に入れられていたのを思い出してください。

Processor#run の内部でループが実行され、これは Sidekiq に停止命令が出るまで続きます。

そして #process_one -> #fetch -> #get_one とメソッドが呼び出され、 #get_one の中でようやく @starategy.retrieve_work が呼び出されているのが分かります。

そして、そこで取得された work#process メソッドに渡して、おそらく Sidekiq::Worker の処理が実行されます。(ここは後で見ていきます。)

BasicFetch#retrieve_work を読む

では、ここで Fetcher のデフォルト実装である BasicFetch を見てみます。

https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/fetch.rb

sidekiq/lib/sidekiq/fetch.rb
module Sidekiq
  class BasicFetch
    # We want the fetch operation to timeout every few seconds so the thread
    # can check if the process is shutting down.
    TIMEOUT = 2

    UnitOfWork = Struct.new(:queue, :job) do
      def acknowledge
        # nothing to do
      end

      def queue_name
        queue.sub(/.*queue:/, ''.freeze)
      end

      def requeue
        Sidekiq.redis do |conn|
          conn.rpush("queue:#{queue_name}", job)
        end
      end
    end

    def retrieve_work
      work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
      UnitOfWork.new(*work) if work
    end

ここまでで必要なメソッドだけに限定して転載しています。 queues_cmd はここでは、 Sidekiq に設定した queue の一覧の名前が返ってきていると思ってください。(特に何も設定していなければ default キューだけが返ってくるはずです)

#retrieve_work では Redis の BRPOP を使ってブロッキングしつつ、リストから一つ Job を取得するという単純な処理をしています。

そして、そこで返ってきた Job (コード上は work) を UnitOfWork で包んで返しています。

Processor#process と UnitOfWork の役割を見る

では、再び Processor に戻って、 取得された work#process に渡されて以降の内容を見てみます。

module Sidekiq
  class Processor
    # 省略

    def process(work)
      jobstr = work.job
      queue = work.queue_name

      ack = false
      begin

        # perform_async を call した時の Worker クラス名や引数の情報を読み出す
        job_hash = Sidekiq.load_json(jobstr)
        @reloader.call do
          klass  = job_hash['class'.freeze].constantize
          worker = klass.new
          worker.jid = job_hash['jid'.freeze]

          stats(worker, job_hash, queue) do

            # execute_job の前後を middleware の実行で囲むことで middleware の call 処理を実現している
            # コメントにある通りだが、middleware の実行でお仕事が失われる可能性があるので、
            # ack 変数でコントロールしている。
            Sidekiq.server_middleware.invoke(worker, job_hash, queue) do

              @executor.call do
                # Only ack if we either attempted to start this job or
                # successfully completed it. This prevents us from
                # losing jobs if a middleware raises an exception before yielding
                ack = true
                execute_job(worker, cloned(job_hash['args'.freeze]))
              end
            end
          end
          ack = true
        end
      rescue Sidekiq::Shutdown
        # Had to force kill this job because it didn't finish
        # within the timeout.  Don't acknowledge the work since
        # we didn't properly finish it.
        ack = false
      rescue Exception => ex
        handle_exception(ex, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
        raise
      ensure

        # ack が true の場合のみ UnitOfWork の #acknowledge が呼び出される
        work.acknowledge if ack
      end
    end

    def execute_job(worker, cloned_args)

      # これは皆さんが実装する Sidekiq::Worker が include されたクラスです
      worker.perform(*cloned_args)
    end

細かい部分は日本語のコメントを書き足していますが、 #process

  1. 実行する Worker クラスや引数情報を読み出し
  2. server_middleware を実行しつつ
  3. Worker#perform を実行する

という様子が分かります。

さて、コメントには ack 変数に関してのコメントを記述しましたが、これにはこの時点で 2 つの意味があります。

  • BasicFetch::UnitOfWork#acknowledge の実装は空なので、実質何もしない。
    • 拡張ポイントとして用意されているといってよさそう
  • server_middleware によってお仕事が失われる可能性を考慮して ack 変数での管理がされているが、この実装を見る限りそこのケアはされていない

よって、 Fetcher と共に、その出力である UnitOfWork も拡張ポイントとして利用できることが分かりました。

どのように拡張するかのまとめ

Processor と、Fetcher のデフォルト実装である BasicFetch、そして UnitOfWork のデフォルト実装である BasicFetch::UnitOfWork を見ることによって、一つの Job が Redis から取り出されてから、処理完了にいたるまでの間で、どのような処理に使われているのかを確認できました。

それらを持った上で、拡張ポイントはパブリックなインターフェースだけを見ると大きくは以下になりそうです。

  • Fetcher の #retrieve_work の変更
    • そもそも Processor 内で Strategy と呼ばれているので、様々変更が考えられそう
  • UnitOfWork の #acknowledge の呼び出しを考慮したもの
    • 「処理が終了した」時に呼び出されるということで、単体でも後処理に使えますし、UnitOfWork#initialize の処理と組み合わせれば前処理との連携も考えられます。

では、この投稿の締めくくりとして、この拡張を使った gem をいくつか紹介して終わりにします。

Fetcher を拡張している gem

というか、Sidekiq 関連の gem ではしごく当たり前の拡張ポイントのようで、かなり多く見つかります。ここでは、実装の説明ができそうなものだけピックアップします。

sidekiq-reliable-fetch

TEA-ebook/sidekiq-reliable-fetch

これは、先ほどの説明時に問題としてあげた、「server_middleware によって、お仕事が失われる可能性」に対しての一つの解であり、Sidekiq Pro で提供される reliable_fetch の文書情報を参考にしているのだと思います。

RPOPLPUSH という「キュー A から1件リストから取得したら、同時にその 1 件をキュー B に入れてから返す」という Redis の機能を利用しています。

ちょうど以下の記事で説明されている BRPOPLPUSH のブロッキングでは無い版ですね。

Redisアプリケーションパターン - おそらくはそれさえも平凡な日々

BRPOPLPUSHはジョブキューのようなものを作る上で有用です。リストにジョブが投入されるまで待ち受けて、ジョブが投入されたら、ジョブを取得しつつ、別のリストに実行中のジョブとして投入することができるからです。ジョブの実行が終わったら、LREMコマンドを利用して、実行中ジョブのリストから当該ジョブを削除します。

上記記事からの引用ですが、ほぼこの通りの実装です。

https://github.com/TEA-ebook/sidekiq-reliable-fetch/blob/master/lib/sidekiq/reliable_fetcher.rb

sidekiq-reliable-fetch/lib/sidekiq/reliable_fetcher.rb
module Sidekiq
  class ReliableFetcher
    WORKING_QUEUE = 'working'

    def retrieve_work
      clean_working_queues! if @cleaning_interval != -1 && @nb_fetched_jobs >= @cleaning_interval

      @queues_size.times do
        queue = @queues_iterator.next

        # ここでは rpoplpush
        work = Sidekiq.redis { |conn| conn.rpoplpush(queue, "#{queue}:#{WORKING_QUEUE}") }

        if work
          @nb_fetched_jobs += 1
          return UnitOfWork.new(queue, work)
        end
      end

      # We didn't find a job in any of the configured queues. Let's sleep a bit
      # to avoid uselessly burning too much CPU
      sleep(IDLE_TIMEOUT)

      nil
    end

    UnitOfWork = Struct.new(:queue, :message) do
      def acknowledge

        # ここで lrem
        Sidekiq.redis { |conn| conn.lrem("#{queue}:#{WORKING_QUEUE}", 1, message) }
      end

      def queue_name
        queue.gsub(/.*queue:/, '')
      end

      def requeue
        Sidekiq.redis do |conn|
          conn.pipelined do
            conn.lpush(queue, message)
            conn.lrem("#{queue}:#{WORKING_QUEUE}", 1, message)
          end
        end
      end
    end

例によって、関係しそうな個所のみ抜粋してます。

上記記事の内容通りですが、 #retrieve_work つまりジョブの取得部分では rpoplpush を使って working キューに取得したジョブを入れており、UnitOfWork#acknowledge つまり処理完了時では lem で working キューから取り除いています。

Sidekiq Pro の説明にもスケールしづらい方法と書いていますが、実際このクラスの実装を見てもなかなかしんどそうですね。。
BRPOP と違って複数のキューから 1 コマンドで取得できないからループも発生するし、クリーニング処理を #retrieve_work の先頭でやるのもなんとも。。

sidekiq-limit_fetch

brainopia/sidekiq-limit_fetch

キュー毎に同時実行数の制御を可能にするものですね。コネクション数を多く使えるものとあまり使えないものが共存するアプリで使ったりしました。

そもそもデフォルトの Sidekiq でキュー毎の同時実行数制御ができないのはなぜ?

Sidekiq の設定では concurrency の一つの設定しかできず、各キューは重みしか決められません。

実は Sidekiq は各キューというものにほぼ無関心であり、重みも以下のように単純な実装です。

https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/cli.rb#L395-L400

module Sidekiq
  class CLI
    # 省略
    def parse_config(cfile)
      opts = {}
      if File.exist?(cfile)
        opts = YAML.load(ERB.new(IO.read(cfile)).result) || opts
        opts = opts.merge(opts.delete(environment) || {})

        parse_queues(opts, opts.delete(:queues) || [])
      else
        # 省略
    end

    def parse_queues(opts, queues_and_weights)
      queues_and_weights.each { |queue_and_weight| parse_queue(opts, *queue_and_weight) }
    end

    # [default, 30] みたいな形で渡されるので、 q にキュー名、weight に数値が入る
    def parse_queue(opts, q, weight=nil)
      [weight.to_i, 1].max.times do
       (opts[:queues] ||= []) << q
      end
      opts[:strict] = false if weight.to_i > 0
    end

見ての通りで、queues に指定された配列形式のフォーマットを元に weight に応じて queue 名を増やした配列に変えているだけです。

sidekiq.yml
:queues:
  - ["foo", 1]
  - ["bar", 2]
  - ["xyzzy", 3]
# parse 後の配列
opts[:queues]
 #=> ['foo', 'bar', 'bar', 'xyzzy', 'xyzzy', 'xyzzy']

これを #retrieve_work で shuffle して取得しているだけなのが重みの実態です。

少し話がそれましたが、OSS 版の Sidekiq は個別のキューには関心が無く、また、現在実行中のお仕事数すら把握はしていません。(動作している worker 数は分かるので、それで把握できていますが、どのキューの処理かまでは分かりません)

どうやってキュー毎の同時実行制御をするか?

考え方自体は簡単です。

  • 個別のキューの現在実行数を把握できるようにする
  • Redis からの取得時に実行数が上限に達していないキューから取得するようにする

では、それを sidekiq-limit_fetch がどうやって実現しているかを見ていきます。

https://github.com/brainopia/sidekiq-limit_fetch/blob/master/lib/sidekiq/limit_fetch.rb

sidekiq-limit_fetch/lib/sidekiq/limit_fetch.rb
module Sidekiq::LimitFetch
  def retrieve_work
    queue, job = redis_brpop(Queues.acquire)
    Queues.release_except(queue)
    UnitOfWork.new(queue, job) if job
  end

https://github.com/brainopia/sidekiq-limit_fetch/blob/master/lib/sidekiq/limit_fetch/unit_of_work.rb

sidekiq-limit_fetch/lib/sidekiq/limit_fetch/unit_of_work.rb
module Sidekiq
  class LimitFetch::UnitOfWork < BasicFetch::UnitOfWork
    def initialize(queue, job)
      super
      redis_retryable { Queue[queue_name].increase_busy }
    end

    def acknowledge
      redis_retryable { Queue[queue_name].decrease_busy }
      redis_retryable { Queue[queue_name].release }
    end

    def requeue
      super
      acknowledge
    end
  end
end

さて、特筆すべきは UnitOfWork の方です。
#initialize つまり、Fetcher でジョブを Redis から取得し生成されたタイミングで Queue[queue_name].increase_busy おそらくは取得したキュー名の busy 数 (Sidekiq は実行中数を busy で表す) を増やしています。
そして #acknowledge つまり取得したジョブの処理完了時に Queue[queue_name].decrease_busy して busy 数を減らしています。

これで、先ほど示した「個別のキューの現在実行数を把握できるようにする」が実現できているわけですね。

sidekiq-limit_fetch は他にも多くの事をやっているので、ここではこれ以上は踏み込みませんが、Fetcher を置き換えてやっていることを確認しました。

全体のまとめ

Sidekiq は場合によってはそのアプリケーションの中核を担う事も出てくるため、安易にプラグインを導入して事故になると大惨事になりかねません。
今回紹介した Custom の Fetcher を使った拡張や middleware を使った拡張方法を頭に入れておくと、導入しようとするプラグインの内容を把握するのに役立つことがあると思います。

加えて、Redis の機能についても頭に入れておくともっと捗るかもしれませんね。

dany1468
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
Comments
No comments
Sign up for free and join this conversation.
If you already have a Qiita account
Why do not you register as a user and use Qiita more conveniently?
You need to log in to use this function. Qiita can be used more conveniently after logging in.
You seem to be reading articles frequently this month. Qiita can be used more conveniently after logging in.
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
ユーザーは見つかりませんでした