Sidekiq の拡張ポイント
Sidekiq の機能拡張を考える時に、真っ先に検討されるのが Sidekiq middleware です。
Middleware - mperham/sidekiq Wiki
middleware は Client Side (#perform_asyn
を呼び出す方) と Server Side (スレッドベースで動作する Worker の方) の両方を Rack や Faraday のように #call
のインターフェースを持ったクラスを登録するだけで、その挙動を拡張できます。
ただし、middleware は Decorator パターンのようなものなので、Sidekiq の内部の処理に大きく介入できる訳ではありません。
Redis からの Job 取得処理に用意された拡張ポイント
おそらく本家として Pro や Enterprise 版の拡張をやりやすくするために用意しているのだと思いますが(Wiki で文書化されているようには見えなかったので)、Sidekiq には Redis からの Job 取得処理を置き換えるポイントが用意されています。
module Sidekiq
class Processor
include Util
attr_reader :thread
attr_reader :job
def initialize(mgr)
@mgr = mgr
@down = false
@done = false
@job = nil
@thread = nil
# options[:fetch] の方を優先しているのが分かります。
@strategy = (mgr.options[:fetch] || Sidekiq::BasicFetch).new(mgr.options)
@reloader = Sidekiq.options[:reloader]
@executor = Sidekiq.options[:executor]
end
上記は Processor
というクラスになりますが、Sidekiq 内においては、各スレッド毎の処理を担っているものです。
この options の設定は簡単です。以下のように middleware を登録する時のように設定時に渡せば良いだけです。
Sidekiq.configure_server do |config|
config.options[:fetch] = HogeFetcher
end
以降では、この Fetcher の拡張を詳しく見ていきます。
Fetcher をどのように拡張するか?
それを確かめるために、先ほどの Processor
を更に詳しく見ていきます。
Fetch 部分を見る
module Sidekiq
class Processor
# 省略
def run
begin
while !@done
process_one
end
@mgr.processor_stopped(self)
rescue Sidekiq::Shutdown
@mgr.processor_stopped(self)
rescue Exception => ex
@mgr.processor_died(self, ex)
end
end
def process_one
@job = fetch
process(@job) if @job
@job = nil
end
def fetch
j = get_one
if j && @done
j.requeue
nil
else
j
end
end
def get_one
begin
work = @strategy.retrieve_work
(logger.info { "Redis is online, #{Time.now - @down} sec downtime" }; @down = nil) if @down
work
rescue Sidekiq::Shutdown
rescue => ex
handle_fetch_exception(ex)
end
end
先ほどの #initialize
で Fetcher
が @strategy
というインスタンス変数に入れられていたのを思い出してください。
Processor
は #run
の内部でループが実行され、これは Sidekiq に停止命令が出るまで続きます。
そして #process_one
-> #fetch
-> #get_one
とメソッドが呼び出され、 #get_one
の中でようやく @starategy.retrieve_work
が呼び出されているのが分かります。
そして、そこで取得された work
を #process
メソッドに渡して、おそらく Sidekiq::Worker
の処理が実行されます。(ここは後で見ていきます。)
BasicFetch#retrieve_work を読む
では、ここで Fetcher
のデフォルト実装である BasicFetch
を見てみます。
module Sidekiq
class BasicFetch
# We want the fetch operation to timeout every few seconds so the thread
# can check if the process is shutting down.
TIMEOUT = 2
UnitOfWork = Struct.new(:queue, :job) do
def acknowledge
# nothing to do
end
def queue_name
queue.sub(/.*queue:/, ''.freeze)
end
def requeue
Sidekiq.redis do |conn|
conn.rpush("queue:#{queue_name}", job)
end
end
end
def retrieve_work
work = Sidekiq.redis { |conn| conn.brpop(*queues_cmd) }
UnitOfWork.new(*work) if work
end
ここまでで必要なメソッドだけに限定して転載しています。 queues_cmd
はここでは、 Sidekiq に設定した queue の一覧の名前が返ってきていると思ってください。(特に何も設定していなければ default
キューだけが返ってくるはずです)
#retrieve_work
では Redis の BRPOP
を使ってブロッキングしつつ、リストから一つ Job を取得するという単純な処理をしています。
そして、そこで返ってきた Job (コード上は work
) を UnitOfWork
で包んで返しています。
Processor#process と UnitOfWork の役割を見る
では、再び Processor
に戻って、 取得された work
が #process
に渡されて以降の内容を見てみます。
module Sidekiq
class Processor
# 省略
def process(work)
jobstr = work.job
queue = work.queue_name
ack = false
begin
# perform_async を call した時の Worker クラス名や引数の情報を読み出す
job_hash = Sidekiq.load_json(jobstr)
@reloader.call do
klass = job_hash['class'.freeze].constantize
worker = klass.new
worker.jid = job_hash['jid'.freeze]
stats(worker, job_hash, queue) do
# execute_job の前後を middleware の実行で囲むことで middleware の call 処理を実現している
# コメントにある通りだが、middleware の実行でお仕事が失われる可能性があるので、
# ack 変数でコントロールしている。
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
@executor.call do
# Only ack if we either attempted to start this job or
# successfully completed it. This prevents us from
# losing jobs if a middleware raises an exception before yielding
ack = true
execute_job(worker, cloned(job_hash['args'.freeze]))
end
end
end
ack = true
end
rescue Sidekiq::Shutdown
# Had to force kill this job because it didn't finish
# within the timeout. Don't acknowledge the work since
# we didn't properly finish it.
ack = false
rescue Exception => ex
handle_exception(ex, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
raise
ensure
# ack が true の場合のみ UnitOfWork の #acknowledge が呼び出される
work.acknowledge if ack
end
end
def execute_job(worker, cloned_args)
# これは皆さんが実装する Sidekiq::Worker が include されたクラスです
worker.perform(*cloned_args)
end
細かい部分は日本語のコメントを書き足していますが、 #process
が
- 実行する Worker クラスや引数情報を読み出し
- server_middleware を実行しつつ
-
Worker#perform
を実行する
という様子が分かります。
さて、コメントには ack
変数に関してのコメントを記述しましたが、これにはこの時点で 2 つの意味があります。
-
BasicFetch::UnitOfWork
の#acknowledge
の実装は空なので、実質何もしない。- 拡張ポイントとして用意されているといってよさそう
-
server_middleware
によってお仕事が失われる可能性を考慮してack
変数での管理がされているが、この実装を見る限りそこのケアはされていない- OSS 版の Sidekiq は server_middleware の挙動でお仕事が失われる可能性がある
- これは Pro 版でいくつかの手法でサポートされます。
- https://github.com/mperham/sidekiq/wiki/Pro-Reliability-Server
よって、 Fetcher と共に、その出力である UnitOfWork
も拡張ポイントとして利用できることが分かりました。
どのように拡張するかのまとめ
Processor
と、Fetcher のデフォルト実装である BasicFetch
、そして UnitOfWork
のデフォルト実装である BasicFetch::UnitOfWork
を見ることによって、一つの Job が Redis から取り出されてから、処理完了にいたるまでの間で、どのような処理に使われているのかを確認できました。
それらを持った上で、拡張ポイントはパブリックなインターフェースだけを見ると大きくは以下になりそうです。
- Fetcher の
#retrieve_work
の変更- そもそも
Processor
内で Strategy と呼ばれているので、様々変更が考えられそう
- そもそも
- UnitOfWork の
#acknowledge
の呼び出しを考慮したもの- 「処理が終了した」時に呼び出されるということで、単体でも後処理に使えますし、
UnitOfWork#initialize
の処理と組み合わせれば前処理との連携も考えられます。
- 「処理が終了した」時に呼び出されるということで、単体でも後処理に使えますし、
では、この投稿の締めくくりとして、この拡張を使った gem をいくつか紹介して終わりにします。
Fetcher を拡張している gem
というか、Sidekiq 関連の gem ではしごく当たり前の拡張ポイントのようで、かなり多く見つかります。ここでは、実装の説明ができそうなものだけピックアップします。
sidekiq-reliable-fetch
TEA-ebook/sidekiq-reliable-fetch
これは、先ほどの説明時に問題としてあげた、「server_middleware
によって、お仕事が失われる可能性」に対しての一つの解であり、Sidekiq Pro で提供される reliable_fetch の文書情報を参考にしているのだと思います。
RPOPLPUSH という「キュー A から1件リストから取得したら、同時にその 1 件をキュー B に入れてから返す」という Redis の機能を利用しています。
ちょうど以下の記事で説明されている BRPOPLPUSH
のブロッキングでは無い版ですね。
Redisアプリケーションパターン - おそらくはそれさえも平凡な日々
BRPOPLPUSHはジョブキューのようなものを作る上で有用です。リストにジョブが投入されるまで待ち受けて、ジョブが投入されたら、ジョブを取得しつつ、別のリストに実行中のジョブとして投入することができるからです。ジョブの実行が終わったら、LREMコマンドを利用して、実行中ジョブのリストから当該ジョブを削除します。
上記記事からの引用ですが、ほぼこの通りの実装です。
module Sidekiq
class ReliableFetcher
WORKING_QUEUE = 'working'
def retrieve_work
clean_working_queues! if @cleaning_interval != -1 && @nb_fetched_jobs >= @cleaning_interval
@queues_size.times do
queue = @queues_iterator.next
# ここでは rpoplpush
work = Sidekiq.redis { |conn| conn.rpoplpush(queue, "#{queue}:#{WORKING_QUEUE}") }
if work
@nb_fetched_jobs += 1
return UnitOfWork.new(queue, work)
end
end
# We didn't find a job in any of the configured queues. Let's sleep a bit
# to avoid uselessly burning too much CPU
sleep(IDLE_TIMEOUT)
nil
end
UnitOfWork = Struct.new(:queue, :message) do
def acknowledge
# ここで lrem
Sidekiq.redis { |conn| conn.lrem("#{queue}:#{WORKING_QUEUE}", 1, message) }
end
def queue_name
queue.gsub(/.*queue:/, '')
end
def requeue
Sidekiq.redis do |conn|
conn.pipelined do
conn.lpush(queue, message)
conn.lrem("#{queue}:#{WORKING_QUEUE}", 1, message)
end
end
end
end
例によって、関係しそうな個所のみ抜粋してます。
上記記事の内容通りですが、 #retrieve_work
つまりジョブの取得部分では rpoplpush
を使って working キューに取得したジョブを入れており、UnitOfWork#acknowledge
つまり処理完了時では lem
で working キューから取り除いています。
Sidekiq Pro の説明にもスケールしづらい方法と書いていますが、実際このクラスの実装を見てもなかなかしんどそうですね。。
BRPOP
と違って複数のキューから 1 コマンドで取得できないからループも発生するし、クリーニング処理を #retrieve_work
の先頭でやるのもなんとも。。
sidekiq-limit_fetch
キュー毎に同時実行数の制御を可能にするものですね。コネクション数を多く使えるものとあまり使えないものが共存するアプリで使ったりしました。
そもそもデフォルトの Sidekiq でキュー毎の同時実行数制御ができないのはなぜ?
Sidekiq の設定では concurrency
の一つの設定しかできず、各キューは重みしか決められません。
実は Sidekiq は各キューというものにほぼ無関心であり、重みも以下のように単純な実装です。
module Sidekiq
class CLI
# 省略
def parse_config(cfile)
opts = {}
if File.exist?(cfile)
opts = YAML.load(ERB.new(IO.read(cfile)).result) || opts
opts = opts.merge(opts.delete(environment) || {})
parse_queues(opts, opts.delete(:queues) || [])
else
# 省略
end
def parse_queues(opts, queues_and_weights)
queues_and_weights.each { |queue_and_weight| parse_queue(opts, *queue_and_weight) }
end
# [default, 30] みたいな形で渡されるので、 q にキュー名、weight に数値が入る
def parse_queue(opts, q, weight=nil)
[weight.to_i, 1].max.times do
(opts[:queues] ||= []) << q
end
opts[:strict] = false if weight.to_i > 0
end
見ての通りで、queues
に指定された配列形式のフォーマットを元に weight に応じて queue 名を増やした配列に変えているだけです。
:queues:
- ["foo", 1]
- ["bar", 2]
- ["xyzzy", 3]
# parse 後の配列
opts[:queues]
#=> ['foo', 'bar', 'bar', 'xyzzy', 'xyzzy', 'xyzzy']
これを #retrieve_work
で shuffle して取得しているだけなのが重みの実態です。
少し話がそれましたが、OSS 版の Sidekiq は個別のキューには関心が無く、また、現在実行中のお仕事数すら把握はしていません。(動作している worker 数は分かるので、それで把握できていますが、どのキューの処理かまでは分かりません)
どうやってキュー毎の同時実行制御をするか?
考え方自体は簡単です。
- 個別のキューの現在実行数を把握できるようにする
- Redis からの取得時に実行数が上限に達していないキューから取得するようにする
では、それを sidekiq-limit_fetch
がどうやって実現しているかを見ていきます。
module Sidekiq::LimitFetch
def retrieve_work
queue, job = redis_brpop(Queues.acquire)
Queues.release_except(queue)
UnitOfWork.new(queue, job) if job
end
module Sidekiq
class LimitFetch::UnitOfWork < BasicFetch::UnitOfWork
def initialize(queue, job)
super
redis_retryable { Queue[queue_name].increase_busy }
end
def acknowledge
redis_retryable { Queue[queue_name].decrease_busy }
redis_retryable { Queue[queue_name].release }
end
def requeue
super
acknowledge
end
end
end
さて、特筆すべきは UnitOfWork
の方です。
#initialize
つまり、Fetcher でジョブを Redis から取得し生成されたタイミングで Queue[queue_name].increase_busy
おそらくは取得したキュー名の busy
数 (Sidekiq は実行中数を busy で表す) を増やしています。
そして #acknowledge
つまり取得したジョブの処理完了時に Queue[queue_name].decrease_busy
して busy
数を減らしています。
これで、先ほど示した「個別のキューの現在実行数を把握できるようにする」が実現できているわけですね。
sidekiq-limit_fetch
は他にも多くの事をやっているので、ここではこれ以上は踏み込みませんが、Fetcher を置き換えてやっていることを確認しました。
全体のまとめ
Sidekiq は場合によってはそのアプリケーションの中核を担う事も出てくるため、安易にプラグインを導入して事故になると大惨事になりかねません。
今回紹介した Custom の Fetcher を使った拡張や middleware を使った拡張方法を頭に入れておくと、導入しようとするプラグインの内容を把握するのに役立つことがあると思います。
加えて、Redis の機能についても頭に入れておくともっと捗るかもしれませんね。