Sidekiqが提供しているAPI
最終更新 2019/10/19 編集者 brian p o'rourke
Sidekiqには、ワーカー、キュー、およびジョブに関するリアルタイム情報へのアクセスを許可するパブリックAPIがあります。
RDocについては sidekiq/api をご覧ください。
下記のコードを追加するだけでAPIにアクセスすることができます。
require 'sidekiq/api'
Web UIはこのAPIを使って構築されています。つまり、UIでできることはすべて、APIでスクリプト化できます。
キュー
すべてのキューを取得する。
Sidekiq::Queue.all
一つのキューを取得する。
Sidekiq::Queue.new # the "default" queue
Sidekiq::Queue.new("mailer")
あるキューの中に入っているジョブの数を取得する。
Sidekiq::Queue.new.size # => 4
キューを削除して、キュー内のすべてのジョブを削除する。
Sidekiq::Queue.new.clear
mailerキューから jid
abcdef1234567890
を持つジョブを削除する。
queue = Sidekiq::Queue.new("mailer")
queue.each do |job|
job.klass # => 'MyWorker'
job.args # => [1, 2, 3]
job.delete if job.jid == 'abcdef1234567890'
end
注意:キューからジョブの一覧を取得するプロセスは競合状態になりやすいです。例えば、Sidekiq::Queue.new.each { |job| puts job.id }
は、他のプロセスが同時にキューを更新している場合、ジョブをスキップしたり、同じジョブを複数回表示したりする場合があります。これは、ロックなしで共有データ構造(Redis内のキュー)を変更する場合の通常通りの動作です。
キューのレイテンシー(秒単位)を計算する。(現在時刻 - 最も古いジョブがキューに入れられた時刻)
> Sidekiq::Queue.new.latency
14.5
名前付きキュー
時間指定ジョブのキュー(Scheduled)
Scheduled Sorted Set(時間指定されたジョブのための順序付きセット型データ構造)には、すべての時間指定されたジョブが時系列順に並べられます。
詳細については sidekiq/api
のコードを直接参照してください。
ss = Sidekiq::ScheduledSet.new
ss.size
ss.clear
このAPIを使うことでSidekiqで時間指定されたジョブの一覧を取得できます。取得したジョブの一覧を使うことで、ジョブを検索/フィルタリングできます。
特定の種類のすべてのジョブを取得し、入っていたキューから削除するコード例は次の通りです。(注意:このコードは非効率です)
ss = Sidekiq::ScheduledSet.new
# `scan` を使用することでRedis内の要素を高速にフィルターしています。
# このコードは "*SomeWorker*" にマッチするペイロードを持つジョブをすべて返します。
jobs = ss.scan("SomeWorker").select {|retri| retri.klass == 'SomeWorker' }
# Rubyで同じことをするコードです。上記の例に比べて低速です。
jobs = ss.select {|retri| retri.klass == 'SomeWorker' }
jobs.each(&:delete)
再試行ジョブのキュー(Retries)
ジョブで例外が発生した場合、SidekiqはそのジョブをRetrySetに入れて、後で自動再試行できるようにします。
ジョブは、次に再試行する時刻に基づいてソートされます。
rs = Sidekiq::RetrySet.new
rs.size
rs.clear
このAPIを使うことでSidekiqでRetrySetに入れられたジョブの一覧を取得できます。取得したジョブの一覧を使うことで、ジョブを検索/フィルタリングできます。
特定の種類のすべてのジョブを取得し、入っていたキューから削除するコード例は次の通りです。(注意:このコードは非効率です)
query = Sidekiq::RetrySet.new
query.select do |job|
job.klass == 'Sidekiq::Extensions::DelayedClass' &&
# For Sidekiq::Extensions (e.g., Foo.delay.bar(*args)),
# YAMLにシリアライズされているため、引数を取り出すためにはデシリアライズする必要があります。
((klass, method, args) = YAML.load(job.args[0])) &&
klass == User &&
method == :setup_new_subscriber
end.map(&:delete)
# より効率的なアプローチです。Sidekiq 6.0から導入された `scan` をプリフィルターとして使用しています。
query.scan("setup_new_subscriber").select do |job|
job.klass == 'Sidekiq::Extensions::DelayedClass' &&
# For Sidekiq::Extensions (e.g., Foo.delay.bar(*args)),
# YAMLにシリアライズされているため、引数を取り出すためにはデシリアライズする必要があります。
((klass, method, args) = YAML.load(job.args[0])) &&
klass == User &&
method == :setup_new_subscriber
end.map(&:delete)
デッドジョブのキュー(Dead)
RetrySetやScheduledSetと同様に、DeadSetはSidekiqが失敗して終了したとみなしたすべてのジョブを、失敗した時刻順に並べます。他のSetと同じ基本操作をサポートします。
ds = Sidekiq::DeadSet.new
ds.size
ds.clear
Scan
Sidekiq 6.0では、globパターンによる名前付きセットのスキャンのサポートが追加されました。Rubyで実行するよりもはるかに高速に、Redis内で無関係なジョブを除外できます。
スキャンは生のJSONペイロードで動作するため、正確なパターンを使用していることを確認する必要があります。
ss = Sidekiq::ScheduledSet.new
# 生のJSONに対してclass名でスキャンしている。
ss.scan("\"class\":\"HardWorker\"") { |job| ... }
# より簡潔な書き方ではあるものの、"HardWorker" という文字列がペイロードに偶然あった場合、それも含んでしまう。
ss.scan("HardWorker") { |job| ... }
ブロックを渡さない場合、スキャンはEnumeratorを返します。
globパターンマッチングの詳細については、Redisのドキュメントを参照してください。完全な正規表現はサポートされていません。
便宜上、Sidekiqは、*
を含んでいなかった場合、どちらかの側に感嘆符(*
)を付けてパターンをラップします。
この振る舞いをより細かく制御したい場合は、必要に応じて文字列を自分でラップする必要があります。
処理中のジョブのキュー(Processes)
Sidekiq::ProcessSet
を使うと、実行中のジョブに関するほぼリアルタイムの(5秒ごとに更新)情報にアクセスできます。
さらに、プロセスをリモートで制御することもできます。
ps = Sidekiq::ProcessSet.new
ps.size # => 2
ps.each do |process|
p process['busy'] # => 3
p process['hostname'] # => 'myhost.local'
p process['pid'] # => 16131
end
ps.each(&:quiet!) # TSTP シグナルと同じ (USR1 for version < 5)
ps.each(&:stop!) # TERM シグナルと同じ
上記のようなRubyコードを通したリモート制御は、Windows、JRuby、JVMまたはHerokuなどのシグナルがサポートされていない状況で役立ちます。
ワーカースレッド(Workers)
すべてのSidekiqプロセスの現在アクティブなワーカーセットにRubyコードからアクセスすることができます。
「ワーカー」とは、現在ジョブを処理しているスレッドのことです。
注:このデータは非同期に更新されます。(5秒ごと)ミリ秒単位ではありません。
workers = Sidekiq::Workers.new
workers.size # => 2
workers.each do |process_id, thread_id, work|
# process_id はSidekiqプロセスごとのユニークなIDです。
# thread_id はワーカースレッドごとのユニークなIDです。
# work は下記のような構造の Hash です。
# { 'queue' => name, 'run_at' => timestamp, 'payload' => msg }
# run_at はエポック秒です。
# payload は下記のような構造のHashです。
# { 'retry' => true,
# 'queue' => 'default',
# 'class' => 'Redacted',
# 'args' => [1, 2, 'foo'],
# 'jid' => '80b1e7e46381a20c0c567285',
# 'enqueued_at' => 1427811033.2067106 }
end
統計情報(Stats)
Sidekiqに関する様々な統計情報です。
stats = Sidekiq::Stats.new
stats.processed # => 100
stats.failed # => 3
stats.queues # => { "default" => 1001, "email" => 50 }
すべてのキューにエンキューされたジョブの数を取得する。(再試行および時間指定されたジョブは含まれません)
stats.enqueued # => 5
統計情報の履歴(Stats History)
すべての日付はUTCにフォーマットされており、統計情報の履歴は5年間保持されます。
failed/processed に関する統計情報を取得する。
s = Sidekiq::Stats::History.new(2) # 今日から数えて何日間前までのデータを取得するのかを指定する。
s.failed # => { "2012-12-05" => 120, "2012-12-04" => 234 }
s.processed # => { "2012-12-05" => 1010, "2012-12-04" => 1500 }
起点となる日付を指定する。
s = Sidekiq::Stats::History.new( 3, Date.parse("2012-12-3") )
s.failed # => { "2012-12-03" => 10, "2012-12-02" => 24, "2012-12-01" => 4 }
s.processed # => { "2012-12-03" => 124, "2012-12-02" => 345, "2012-12-01" => 355 }
Sidekiq APIに関する注意事項
-
いくつかのAPIはスケーラブルでは ありません。そのため、自動化して大量にアクセスすることは避けるべきです。
-
Sidekiq APIのコア部分は、Redis内の共有可変データ構造を ロック無し で利用しています。これはつまり、このデータ構造に対するループ処理はベストエフォートであり、結果の一貫性が保証されないということを意味します。例えば、
Sidekiq::Queue#find_job(jid)
を呼び出す場合、ループ処理の途中でキューが変更されると、正しくジョブを見つけられない可能性があります。 -
ほとんどの小規模なシステムではこれらの競合状態に気づかないかもしれませんが、大規模で稼働率の高いシステムではおそらく気づくでしょう。何か問題が発生し、データを手動で修復する避けられない事態にならない限り、queues/sets を自分でスキャンしてジョブを削除することはしないでください。