Sidekiqのミドルウェアを書いてSidekiqを拡張したい人に向けた記事です。
今回の目標
Sidekiqを使ったことがある方は既にご存知だと思いますが、SidekiqはGUIの管理画面が標準で搭載されており、そこから随時Jobの情報を閲覧することができます。(ここでいう情報というのは、例えば、Jobが作られた時間、リトライ回数等々)これはSidekiqがJobに対してHashで管理されている付随情報を持っているからです。
しかしながら、Worker実行中は、そのHashに対してのアクセスは限られます。今回、Jobが持っているHashの情報を利用するために、middlewareを使って機能拡張をする必要がありました。その時の検証を記事化しておきます。
検証環境
- ubuntu 18.10
- Ruby 2.5.3
- sidekiq 5.2.5
Sidekiqのmiddlewareは、Client用とServer向けがあります。
その説明については、公式ページの情報が充実しているのでそちらをご覧いただけると良いと思います。
本記事では公式ページの内容を元にして実際にSidekiqのmiddlewareを書いて機能を拡張していきます。
https://github.com/mperham/sidekiq/wiki/Middleware
念の為リンク先を参照してない方のために、Sidekiqのmiddlewareがなにかというのを一言で説明しておきます。
Client middleware
JobがRedisにpushされる前に、そのJobに対して操作をすることができるようになります。
Server middleware
Jobが実行される前後でそのJobに対して操作をすることができるようになります。
検証環境の準備
今回は検証の為、Railsは使わずplainなRubyファイルを使います。
Redisが必要となるため、もしインストールしてなければインストールを完了し起動しておいてください。
source "https://rubygems.org"
gem "sidekiq"
Sidekiq gemを指定してbundle installします。
require "sidekiq"
#カスタムmiddlewareのクラスを作成し、callメソッドの中でyieldを定義する
class CustomAttribute
def call(worker, job_hash, queue)
yield
end
end
#Client用のConfigure
Sidekiq.configure_client do |config|
config.redis = { url: 'redis:127.0.0.1:6379' }
end
#Server用のConfigure
Sidekiq.configure_server do |config|
config.redis = { url: 'redis:127.0.0.1:6379' }
config.server_middleware do |chain|
#カスタムmiddlewareをchainにaddすることでmiddlewareが実行される
chain.add CustomAttribute
end
end
#ユーザーが実行したいWorkerクラスを定義する
class MyWorker
include Sidekiq::Worker
def perform(lifting)
case lifting
when "heavy"
sleep 10
puts "重労働を実行しました"
when "middle"
sleep 5
puts "普通の作業を実行しました"
else
sleep 1
puts "軽い作業を実行しました"
end
end
end
MyWorker自体には難しいところはなく、見た目通りです。
引数で受け取った内容によって労働の種類が変わります。
早速実行してみましょう。
Sidekiqを立ち上げます。
sidekiq -r ./worker.rb
もう1つのターミナルでirb立ち上げて、
irb -r ./worker.rb
キューにJobを3つ登録していきます。
MyWorker.perform_async("middle")
MyWorker.perform_async("middle")
MyWorker.perform_async("middle")
期待通り?普通にSidekiqを実行したときと同じ内容の出力結果を得ることが確認できました。
普通の作業を実行しました
2019-02-16T14:06:34.563Z 8795 TID-gss2qrifr MyWorker JID-baab186447c6ea89f4871a4c INFO: done: 5.0 sec
普通の作業を実行しました
2019-02-16T14:06:35.039Z 8795 TID-gss2rx16r MyWorker JID-8022ef54e720e0e66f91a39d INFO: done: 5.0 sec
普通の作業を実行しました
2019-02-16T14:06:35.446Z 8795 TID-gss2qs2lv MyWorker JID-ef7be5605c4f7225769cc074 INFO: done: 5.0 sec
そして、ここからMiddleWareの話です。
上記のCustomAttribute
クラスのcallメソッドのyieldではexecute_job
、
つまりWorkerのperform
が実行されます。
ここら辺りの処理はSidekiq::Processor
クラスに定義されています。
コードを貼って置くので、確認してみてください。
def process(work)
jobstr = work.job
queue = work.queue_name
ack = false
begin
# Treat malformed JSON as a special case: job goes straight to the morgue.
job_hash = nil
begin
job_hash = Sidekiq.load_json(jobstr)
rescue => ex
handle_exception(ex, { :context => "Invalid JSON for job", :jobstr => jobstr })
# we can't notify because the job isn't a valid hash payload.
DeadSet.new.kill(jobstr, notify_failure: false)
ack = true
raise
end
ack = true
dispatch(job_hash, queue) do |worker|
Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
execute_job(worker, cloned(job_hash['args']))
end
end
rescue Sidekiq::Shutdown
# Had to force kill this job because it didn't finish
# within the timeout. Don't acknowledge the work since
# we didn't properly finish it.
ack = false
rescue Exception => ex
e = ex.is_a?(::Sidekiq::JobRetry::Skip) && ex.cause ? ex.cause : ex
handle_exception(e, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
raise e
ensure
work.acknowledge if ack
end
end
def execute_job(worker, cloned_args)
worker.perform(*cloned_args)
end
上記コード中でexecute_job
に引数で渡されている、job_hash['args']
について確認します。
Redisへのenqueからdequeそして、Jobのretryと、Sidekiqでは、このhashの情報を使いまわしているのですが、基本的にWorker.perform
の際にはjob_hash
の値はユーザーがJobを登録した時に渡した引数のみが渡されているようです。
さて、今回の目的としては、Worker内でjob_hash
の値を参照することにあるので、middlewareを使ってそれを実現してみます。
require "sidekiq"
#カスタムmiddlewareのクラスを作成し、callメソッドの中でyieldを定義する
class CustomAttribute
def call(worker, job_hash, queue)
job_hash["args"] << job_hash["retry_count"]
yield
end
end
#Client用のconfigure
Sidekiq.configure_client do |config|
config.redis = { url: 'redis:127.0.0.1:6379' }
end
#Worker用のConfigure
Sidekiq.configure_server do |config|
config.redis = { url: 'redis:127.0.0.1:6379' }
config.server_middleware do |chain|
#カスタムmiddlewareをchainにaddすることでmiddlewareが実行される
chain.add CustomAttribute
end
end
#ユーザーが実行したいWorkerクラスを定義する
class MyWorker
include Sidekiq::Worker
def perform(lifting, *args)
case lifting
when "heavy"
sleep 10
puts "重労働を実行しました"
p args
raise "error"
when "middle"
sleep 5
puts "普通の作業を実行しました"
else
sleep 1
puts "軽い作業を実行しました"
end
end
end
重労働を実行してみてください。
このケースではjob_hash["args"]
に対して再試行回数を追加することによって、
初回はnilですが、以降の再試行では数値が規定回数までカウントアップされていくretry_countの値を確認できます。
副作用は?
middlewareを使うことで、参照だけではなく、job_hash
の値に対する書き換えの可能性もあり、副作用が気になる所です。
以下は、Sidekiq::Processor
クラスのdispatchメソッドです。ここではJobのretry時に使われる@retrier.global
にSidekiqのJob情報を渡していますが、一旦、pristine
という変数に代入される時に、Marshalで深いコピーがされているのがわかります。これにより、再試行時は編集の可能性がないjob_hash
の値が使われています。
def dispatch(job_hash, queue)
# since middleware can mutate the job hash
# we clone here so we report the original
# job structure to the Web UI
pristine = cloned(job_hash)
Sidekiq::Logging.with_job_hash_context(job_hash) do
@retrier.global(pristine, queue) do
@logging.call(job_hash, queue) do
stats(pristine, queue) do
# Rails 5 requires a Reloader to wrap code execution. In order to
# constantize the worker and instantiate an instance, we have to call
# the Reloader. It handles code loading, db connection management, etc.
# Effectively this block denotes a "unit of work" to Rails.
@reloader.call do
klass = constantize(job_hash['class'])
worker = klass.new
worker.jid = job_hash['jid']
@retrier.local(worker, pristine, queue) do
yield worker
end
end
end
end
end
end
end
middlewareを考える時の注意点
今回はmiddlewareによってSidekiqの機能拡張実験を試みました。middlewareにより簡単に処理を追加できて素晴らしいと感じる反面、SidekiqはJobの管理をするというその性格上、実際にSidekiq本体のコードを慎重に追いながら機能追加の判断していくことが必要だと強く感じました。