LoginSignup
8
1

More than 1 year has passed since last update.

Sidekiqのジョブをパフォーマンスを考えて削除する

Last updated at Posted at 2020-12-12

はじめに

Railsで処理を何らかの理由で遅延させた場合や非同期に処理を行いたいときに多くの人がActive Jobを使用していると思います。
とても便利で良いやつなのですがキューに積んだジョブを削除しようとするとたちまち暗雲が立ち込めます。

前提

アダプタは記事のタイトルにあるようにSidekiqを利用しています。

Sidekiq API使う

> Sidekiq::ScheduledSet.new.find_job(job_id).delete
これだとジョブを全件取得してeachで回してデータをパースしてjid(ジョブのid)が一致するか見るので遅いです。
他にもdelete系のメソッドがあったのですが、引数の意味が分からず一先ずSidekiqでRedisがどのように使われているのかを見ることにしました。

Redis

SidekiqではRedisにジョブを保存するのにzset型というデータ型を使っています。

zset型

zsetは保存されてる値にscoreを持たせてscoreでソートしてるデータ型です。
http://redis.shibu.jp/commandreference/sortedsets.html

Sidekiqでのscore

scoreは単純に実行する日時のunix時間になってます。

zsetで使えるコマンド見てみる

https://www.rubydoc.info/github/ezmobius/redis-rb/Redis
どうやらzremで値そのものを指定すれば削除できるらしいです。
https://www.rubydoc.info/github/ezmobius/redis-rb/Redis:zrem

$ docker-compose exec redis redis-cliなどでredis-cliを起動して
下記のように試してみると削除できることを確認できました。
redis> select 5
redis> zrange schedule 0 5 withscores # 5件取得
redis> zrangebyscore schedule 1597600801 1697600801 # 1597600801から1697600801までのscoreのジョブを取得
redis> zrem schedule "ジョブの中身" # 削除

redisを直接叩いて削除を試みる

保存されるジョブの情報どこにあるの問題

実際にredisに保存されているジョブは以下のようになっています。

{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"TestJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"TestJob\",\"job_id\":\"482fb871-cdec-44ec-89e9-36ccccco9839\",\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"test_id\":1,\"_aj_symbol_keys\":[\"test_id\",]}],\"executions\":0,\"locale\":\"ja\"}],\"retry\":true,\"jid\":\"ef2f98642ac6560fea66b999\",\"created_at\":1597812804.077493}"

ですが、この情報をどのフローで見れるか分からない。

sidekiqのドキュメント一通り読んでみる

clientのMiddlewareを使えばキューに積むタイミングで色々できるらしい。
https://github.com/mperham/sidekiq/wiki/Middleware
ここでジョブの中身見てみたら求めていたjsonだったのでここで保存することにしました。

Middleware見れるジョブとredisで保存されるジョブのjson形式が若干違う問題

空白有無、nilとnullやhashの:と->などのフォーマットが異なっていました。
辛いけど job.except('at').inspect.delete(' ').gsub(/nil/, 'null').gsub(/=>/, ':')で手を加えることにしました。

結果のコード

Middleware

ここでジョブをRDBに保存します。

module ClientMiddleware
  class TestJobAttribute
    def call(_worker_class, job, _queue, _redis_pool)
      # 削除したいジョブのときだけ
      if job['args'].first['job_class'] == 'TestJob'
        test = Test.find(job['args'].first['arguments'].first)
        # Testモデルのjobカラムにジョブのデータをそのまま入れる
        return false unless test.update(job: adjust_for_redis(job))
      end
      yield
    end

    private

    def adjust_for_redis(job)
      job.except('at').inspect.delete(' ').gsub(/nil/, 'null').gsub(/=>/, ':')
    end
  end
end

# 省略

Sidekiq.configure_client do |config|
  config.redis = { url: url }
  config.client_middleware do |chain|
    chain.add ClientMiddleware::TestJobAttribute
  end
end

ジョブ削除するクライアント

class Sidekiq::RedisClient
  def remove_jobs!(jobs)
    return if jobs.blank?
    Sidekiq.redis do |conn|
      fixnum = conn.zrem('schedule', jobs)
      raise SidekiqRedisClientError.new(jobs.length, fixnum) if jobs.length != fixnum
    end
  end
end

class SidekiqRedisClientError < StandardError
  def initialize(len, fixnum)
    super("The jobs could not be deleted. jobs.length is #{len}. fixnum is #{fixnum}.")
  end
end

Sidekiq::RedisClient.new.remove_jobs!([Test.first.job])
でジョブを削除できるはずです。

8
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
1