Ruby
Rails

Sidekiqで実行中・待機状態・再試行のジョブを操作する

参考

操作方法

実行中ジョブ

実行中ジョブを操作するには、Sidekiqプロセス内で実行中のスレッドを管理するSidekiq::Workersクラスを使用します。
ソースコードには以下のように書かれています。

A worker is a thread that is currently processing a job.
ワーカーは、現在ジョブを処理しているスレッドです。

スレッド数取得

Sidekiq::Workers.new.size

イテレータ

Sidekiq::Workers.new.each do |process_id, thread_id, job|
    puts "#{process_id}, #{thread_id}, #{job}"
end

jobには、各スレッドのjob情報が以下のようにhashで入ります。

{"queue"=>"default", "payload"=>{"class"=>"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", "wrapped"=>"ジョブクラス名", "queue"=>"default", "args"=>[{"job_class"=>"ジョブクラス名", "job_id"=>"7a7e0556-5f87-475e-b66a-21358f53646a", "queue_name"=>"default", "arguments"=>[1, 1, 1], "locale"=>"ja"}], "retry"=>true, "jid"=>"8ecd5d2cde045235484aee72", "created_at"=>1509599520.8792522, "enqueued_at"=>1509599520.8793135}, "run_at"=>1509599520}

待機状態ジョブ

待機状態ジョブを操作するには、ジョブキューを管理するSidekiq::Queueクラスを使用します。
ソースコードには以下のように書かれています。

Encapsulates a queue within Sidekiq.
# Allows enumeration of all jobs within the queue
# and deletion of jobs.

Sidekiq内のキューをカプセル化します。
キュー内のすべてのジョブの列挙と削除を許可します。

全ジョブ取得

Sidekiq::Queue.all

イテレータ

Sidekiq::Queue.new.each do|job|
    puts "#{job}"
end

ジョブ数の取得

Sidekiq::Queue.new.size

余談:RedisのList型はリスト長の情報をキャッシュするので、QueueがList型の場合はO(1)の操作になります。

全ジョブの削除

Sidekiq::Queue.new.clear

再試行ジョブ

再試行ジョブを操作するには、エラージョブを管理するSidekiq::RetrySetクラスについて、ソースコードには以下のように書かれています。

Allows enumeration of retries within Sidekiq.
# Based on this, you can search/filter for jobs. Here's an
# example where I'm selecting all jobs of a certain type
# and deleting them from the retry queue.

Sidekiq内での再試行の列挙を許可します。
これに基づいて、ジョブを検索/フィルタリングすることができます。 ここでは、特定のタイプのすべてのジョブを選択し、再試行キューから削除する例を示します。

ジョブ数の取得

Sidekiq::RetrySet.new.size

全ジョブの削除

Sidekiq::RetrySet.new.clear

イテレータ

Sidekiq::RetrySet.new.each do |job|
    puts "#{job}"
end

【余談】ジョブ管理方法

sidekiqは各ジョブをどのように管理しているのでしょうか?
Redisのクライアントツールであるredis-cliを使用して、各keyの中身を見ていきます。
*今回はperform_laterメソッドでenqueueした場合のみ紹介します。

$ redis-cli
> keys *
1)"546c114bf85a:1:8c91b75fe690:workers" // 実行中ジョブ
2)"queue:default" // 待機中ジョブ
3)"retry" // 再試行ジョブ
4)...
5)...

> type "546c114bf85a:1:8c91b75fe690:workers"
hash
> type "queue:default"
list
> type "retry"
zset


> lindex "queue:default" 0
"{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"ジョブクラス名\",\"queue\":\"default\",\"args\":[{\"job_class\":\"ジョブクラス名\",\"job_id\":\"40ecbc5c-64c2-4e49-9e3f-2d48a6b0fb98\",\"queue_name\":\"default\",\"arguments\":[1,1,1],\"locale\":\"ja\"}],\"retry\":true,\"jid\":\"d217606aec10d4e9eeff009b\",\"created_at\":1509601789.5275505,\"enqueued_at\":1509601789.5276136}"
> zrange "retry" 0 1
1) "{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"ジョブクラス名\",\"queue\":\"default\",\"args\":[{\"job_class\":\"ジョブクラス名\",\"job_id\":\"a3e7f5d4-33e7-4e05-9494-3f6eb2e1ac89\",\"queue_name\":\"default\",\"arguments\":[1,1,1],\"locale\":\"ja\"}],\"retry\":true,\"jid\":\"044c9290a96e5dc64f355a41\",\"created_at\":1509610228.816872,\"enqueued_at\":1509610228.8169212,\"error_message\":\"\xe3\x83\x86\xe3\x82\xb9\xe3\x83\x88\",\"error_class\":\"RuntimeError\",\"failed_at\":1509610228.8240032,\"retry_count\":0}"

再試行ジョブ

再試行ジョブはソート済みset型として管理されていました。
型をソート済みsetにしてscoreにリトライ回数を設定します。こうすることで同じジョブが複数回リトライする場合でも、scoreをインクリメントするだけでOKになり、書き込み時間が短縮されます。

待機中ジョブ

待機中ジョブはlist型として管理されていました。(待機中ジョブの型はenqueueの仕方によって変わります)
list型にすることで、O(1)でのADD/DELETEが可能になります。

実行中ジョブ

実行中ジョブは、オブジェクトを表現するのに適したhash型でした。