はじめに
Railsで処理を何らかの理由で遅延させた場合や非同期に処理を行いたいときに多くの人がActive Jobを使用していると思います。
とても便利で良いやつなのですがキューに積んだジョブを削除しようとするとたちまち暗雲が立ち込めます。
前提
アダプタは記事のタイトルにあるようにSidekiqを利用しています。
Sidekiq API使う
> Sidekiq::ScheduledSet.new.find_job(job_id).delete
これだとジョブを全件取得してeachで回してデータをパースしてjid(ジョブのid)が一致するか見るので遅いです。
他にもdelete系のメソッドがあったのですが、引数の意味が分からず一先ずSidekiqでRedisがどのように使われているのかを見ることにしました。
Redis
SidekiqではRedisにジョブを保存するのにzset型というデータ型を使っています。
zset型
zsetは保存されてる値にscoreを持たせてscoreでソートしてるデータ型です。
http://redis.shibu.jp/commandreference/sortedsets.html
Sidekiqでのscore
scoreは単純に実行する日時のunix時間になってます。
zsetで使えるコマンド見てみる
https://www.rubydoc.info/github/ezmobius/redis-rb/Redis
どうやらzremで値そのものを指定すれば削除できるらしいです。
https://www.rubydoc.info/github/ezmobius/redis-rb/Redis:zrem
$ docker-compose exec redis redis-cli
などでredis-cliを起動して
下記のように試してみると削除できることを確認できました。
redis> select 5
redis> zrange schedule 0 5 withscores
# 5件取得
redis> zrangebyscore schedule 1597600801 1697600801
# 1597600801から1697600801までのscoreのジョブを取得
redis> zrem schedule "ジョブの中身"
# 削除
redisを直接叩いて削除を試みる
保存されるジョブの情報どこにあるの問題
実際にredisに保存されているジョブは以下のようになっています。
{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"TestJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"TestJob\",\"job_id\":\"482fb871-cdec-44ec-89e9-36ccccco9839\",\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"test_id\":1,\"_aj_symbol_keys\":[\"test_id\",]}],\"executions\":0,\"locale\":\"ja\"}],\"retry\":true,\"jid\":\"ef2f98642ac6560fea66b999\",\"created_at\":1597812804.077493}"
ですが、この情報をどのフローで見れるか分からない。
sidekiqのドキュメント一通り読んでみる
clientのMiddlewareを使えばキューに積むタイミングで色々できるらしい。
https://github.com/mperham/sidekiq/wiki/Middleware
ここでジョブの中身見てみたら求めていたjsonだったのでここで保存することにしました。
Middleware見れるジョブとredisで保存されるジョブのjson形式が若干違う問題
空白有無、nilとnullやhashの:と->などのフォーマットが異なっていました。
辛いけど job.except('at').inspect.delete(' ').gsub(/nil/, 'null').gsub(/=>/, ':')
で手を加えることにしました。
結果のコード
Middleware
ここでジョブをRDBに保存します。
module ClientMiddleware
class TestJobAttribute
def call(_worker_class, job, _queue, _redis_pool)
# 削除したいジョブのときだけ
if job['args'].first['job_class'] == 'TestJob'
test = Test.find(job['args'].first['arguments'].first)
# Testモデルのjobカラムにジョブのデータをそのまま入れる
return false unless test.update(job: adjust_for_redis(job))
end
yield
end
private
def adjust_for_redis(job)
job.except('at').inspect.delete(' ').gsub(/nil/, 'null').gsub(/=>/, ':')
end
end
end
# 省略
Sidekiq.configure_client do |config|
config.redis = { url: url }
config.client_middleware do |chain|
chain.add ClientMiddleware::TestJobAttribute
end
end
ジョブ削除するクライアント
class Sidekiq::RedisClient
def remove_jobs!(jobs)
return if jobs.blank?
Sidekiq.redis do |conn|
fixnum = conn.zrem('schedule', jobs)
raise SidekiqRedisClientError.new(jobs.length, fixnum) if jobs.length != fixnum
end
end
end
class SidekiqRedisClientError < StandardError
def initialize(len, fixnum)
super("The jobs could not be deleted. jobs.length is #{len}. fixnum is #{fixnum}.")
end
end
Sidekiq::RedisClient.new.remove_jobs!([Test.first.job])
でジョブを削除できるはずです。