はじめに
本記事は Active Job とアダプタとして Sidekiq を使ってバックグラウンドジョブを実行する環境を整えた時の備忘録です。
Active Job とは
Active Job は Rails におけるバックグラウンドジョブを動かすための共通インタフェースです。
バックグラウンドジョブを動かす Sidekiq、Resque、Delayed Job をアダプタとして利用できます。
Rails で提供されるのはジョブをメモリに保持するインプロセスのキューイングシステムだけなので Rails を再起動するとジョブは全て失われます。(アダプタを指定しなかった場合のデフォルト動作)(参考)
Rails4.2 から Active Job が利用できるようになりました。(参考)
ジョブを作成する
Rails コマンドの generate コマンドを使って作成できます。Rails ガイドにも作り方と Active Job の開設があるので参考にしてみて下さい。(参考)
$ bin/rails g job generate_ticket
Running via Spring preloader in process 13038
invoke test_unit
create test/jobs/generate_ticket_job_test.rb
create app/jobs/generate_ticket_job.rb
すると app/job/ 配下に application_job.rb と generate でオプション指定された名前の末尾に _job
が付いたファイルが作成されます。(例では generate_ticket_job.rb
)
ジョブが実行すべきタイミングになると perform
メソッドが呼び出されるので、このメソッドにジョブで実行したい処理を記述することになります。
作成されたファイルには何もすることが書かれておらずコメント「# Do something later
」があるのみなので、ひとまず Hello Active Job.
と stdout に出力することにします。
class ApplicationJob < ActiveJob::Base
end
class GenerateTicketJob < ApplicationJob
queue_as :default
def perform(*args)
p "Hello Active Job."
end
end
以上で Active Job を呼び出す準備は終わりです。(まだアダプタは利用しない)
次のようにジョブクラスを使って perform_later メソッドを呼び出すだけです。
呼び出す際に引数が必要であれば指定できます。(perform メソッドの仮引数 *args
で受け取れる。
# キューが空き次第実行する(実行時引数は無し)
GenerateTicketJob.perform_later
# 実行時引数 some_arg を渡してキューが空き次第実行する
GenerateTicketJob.perform_later some_arg
また、5秒後に実行したいといった時間指定は set
メソッドをメソッドチェーンでつなぐことで実現できます。
# 5秒後に実行する
GenerateTicketJob.set(wait: 5.second).perform_later
ここまでで Rails でジョブを使うことは出来ます。
しかし、全てメモリ内で管理されているため Rails が停止すると実行する前のジョブは全て消え去り、Rails を起動しても実行されることはありません。
次に、Sidekiq を Active Job のアダプタとして使うことで、Rails が停止してもジョブが失われないようにしていきます。
Active Job のアダプタとして Sidekiq を使う
Active Job アダプタとして Sidekiq を使うことが出来ます。
Sidekiq はバックグラウンドジョブを動作させるためのフレームワークです。
Ruby が動作する環境であれば AcitveJob(Rails) を使わずとも Sidekiq は使えます。
Sidekiq を使うためには Client, Redis, Server の 3 つが必要です。
Rails から Sidekiq を使う場合、Client は Rails アプリケーション自身を指します。
GenerateTicketJob.perform_later
等を実行する主体が Client です。
Server は Rails と別プロセスとして起動します。
bundle exec sidekiq
で実行します。
Redis は Job をキューイングするために使います。
Client からアクセスできるようにする必要があります。
本記事では Client, Server, Redis は同一ホストにインストールすることにします。
Sidekiq と Redis をインストールして起動する (Ubuntu用)
$ sudo apt-get -y update && sudo apt-get install -y redis-server
# Redis を起動する
$ sudo systemctl start redis-server
# (必要に応じて) 自動起動設定を確認する
$ sudo systemctl status redis-server
# (必要に応じて) 自動起動設定を有効にする
$ sudo systemctl enable redis-server
# Sidekiq をインストールする
$ cat app/Gemfile
: <snip>
# use sidekiq (https://github.com/mperham/sidekiq)
gem 'sidekiq'
$ bundle install
# Sidekiq を起動する
$ bundle exec sidekiq
Rails アプリケーションを設定する
config.active_job.queue_adapter
にて :sidekiq
を指定します。(参考)
: <snip>
module Rails52SampleApp
class Application < Rails::Application
: <snip>
# use sidekiq
config.active_job.queue_adapter = :sidekiq
end
end
Sidekiq が動作することを確認する
以上で Rails を Client として Sidekiq を動作させるための設定は完了です。
Sidekiq が動作していること、Redis が動作していることを確認して Rails server を再起動しましょう。
Hello Active Job.
が表示されたら成功です。
Sidekiq で動作したことを確認するためにも以下のように実行したジョブが増えていることを確認するとよいでしょう。
$ rails c --sandbox
irb(main):014:0> require 'sidekiq/api'
=> false
irb(main):014:0> Sidekiq::Stats.new.processed
=> 400
irb(main):014:0> Sidekiq::Stats.new.processed # 実行が完了すると数が増える
=> 401
Active Job に処理を記述する
perform メソッドに記述する時に出来る事・出来ないことなどを記述します。
ジョブ内でモデルを使う方法
Active Job 内では特別なことを行うことなく ActiveRecord が利用できます。(参考)
Each Sidekiq server process pulls jobs from the queue in Redis and processes them. Like your web processes, Sidekiq boots Rails so your jobs and workers have the full Rails API, including Active Record, available for use. The server will instantiate the worker and call perform with the given arguments. Everything else is up to your code.
例えば Ticket モデルがあり、そのモデルを作成したり特定の属性を表示させようとしたら次のように記述出来ます。
class GenerateTicketJob < ApplicationJob
queue_as :default
def perform
ticket = Ticket.create!(name: '550e8400-e29b-41d4-a716-446655440000')
p "Generating ticket #{ticket.name} ..."
end
end
ジョブの引数に Rails のモデルを指定する
Sidekiq は perform_sync
メソッドの引数に渡された値を JSON へ変換して Redis に保存します。
しかし複雑なオブジェクトの場合は JSON に変換されないことや、仮にキューがバックアップされて引用したオブジェクト側を変更した場合にどうなるか等を考えると、引数としてはインスタンスを渡さずに ID を渡す方がベストプラクティスのようです。
Active Job の perform
メソッドにモデルを引数として渡す場合はクラスと ID を指定する必要はなく、簡潔に書くことが出来るとのことです。参考
class TrashableCleanupJob < ApplicationJob
def perform(trashable, depth)
trashable.cleanup(depth)
end
end
Active Job を制御する
Rails の Controller 等から Active Job を制御する時の方法を記述します。
ジョブを同期的に実行する
ジョブをインスタンス化して、perform メソッドを直接呼び出せば同期的に実行できます。
GenerateTicketJob.new.perform
ジョブが終了するまでロックする
with_advisory_lock 等を利用してジョブを実行する前にロックをかけて、ジョブが終了したらロックを解除するとよいでしょう。
但し、ジョブが終了されずに強制削除される場合等を考慮して、ロックをいつでも解除できる仕組みは用意しておいた方がよいと思われます。
ジョブを Rails console で操作する
以下、共通で Rails console にて require 'sidekiq/api'
を実行していることが前提となります。
'sidekiq/api' を読み込むことで Sidekiq::Queue
, Sidekiq::RetrySet
が実行できるようになります。
ジョブ実行結果の統計情報を表示する
irb> Sidekiq::Stats.new.to_json
irb(main):065:0> p Sidekiq::Stats.new.to_json
"{\"stats\":{\"processed\":401,\"failed\":334,\"scheduled_size\":0,\"retry_size\":0,\"dead_size\":0,\"processes_size\":1,\"default_queue_latency\":0,\"workers_size\":0,\"enqueued\":0}}"
項目 | 説明 |
---|---|
processed | 実行完了数 |
failed | 実行失敗数 |
scheduled_size | 予定キュー内ジョブ数 |
retry_size | リトライキュー内ジョブ数 |
dead_size | デッド状態(※1)のジョブ数 |
processes_size | 実行中のジョブ数 |
default_queue_latency | デフォルトキューの遅延時間(※2) |
workers_size | Worker の数 |
enqueued | 全キュー内のジョブ数(リトライキューと予定キューは除く) |
※1: ジョブ実行時に例外が発生するとリトライ数(デフォルトでは25回)だけリトライした後にデッド状態となる
※2: キュー内の最初のジョブがキューに入るまでの時間 (参考)
参考: https://github.com/mperham/sidekiq/wiki/API#stats, https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/api.rb
リトライキューにあるジョブを確認・削除する
リトライキューは Sidekiq::RetrySet
インスタンスをつかって操作できます。
ジョブを確認する
irb> Sidekiq::RetrySet.new.each {|job| puts "#{job.jid} #{job.klass} #{job.args}"}
irb(main):024:0> Sidekiq::RetrySet.new.find_job('59318b9a94f9da40e973a951')
=> #<Sidekiq::SortedEntry:0x00007f9e1c2cb250 @args=nil,
@value="{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",
\"wrapped\":\"GenerateTicketJob\",\"queue\":\"default\",
\"args\":[{\"job_class\":\"GenerateTicketJob\",
\"job_id\":\"dabb26ff-87fd-438c-b93f-f6964e624fec\",\"provider_job_id\":null,
\"queue_name\":\"default\",\"priority\":null,
\"arguments\":[{\"_aj_globalid\":\"gid://rails52-sample-app/Ticket/4\"}],
\"executions\":0,\"locale\":\"en\"}],\"retry\":true,
\"jid\":\"59318b9a94f9da40e973a951\",\"created_at\":1552203698.4819345,\"enqueued_at\":1552203762.003836,
\"error_message\":\"Validation failed: Diff summary can't be blank\",
\"error_class\":\"ActiveRecord::RecordInvalid\",\"failed_at\":1552203698.826664,
\"retry_count\":2,\"retried_at\":1552203762.1639638}",
@item={"class"=>"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
"wrapped"=>"GenerateTicketJob", "queue"=>"default",
"args"=>[{"job_class"=>"GenerateTicketJob", "job_id"=>"dabb26ff-87fd-438c-b93f-f6964e624fec",
"provider_job_id"=>nil, "queue_name"=>"default", "priority"=>nil,
"arguments"=>[{"_aj_globalid"=>"gid://rails52-sample-app/Ticket/4"}],
"executions"=>0, "locale"=>"en"}], "retry"=>true, "jid"=>"59318b9a94f9da40e973a951",
"created_at"=>1552203698.4819345, "enqueued_at"=>1552203762.003836,
"error_message"=>"Validation failed: Diff summary can't be blank",
"error_class"=>"ActiveRecord::RecordInvalid", "failed_at"=>1552203698.826664,
"retry_count"=>2, "retried_at"=>1552203762.1639638}, @queue="default",
@score=1552203835.1639743, @parent=#<Sidekiq::RetrySet:0x00007f9e1c2e9660 @name="retry",
@_size=1>>
irb(main):007:0> Sidekiq::RetrySet.new.each {|job| p job }
#<Sidekiq::SortedEntry:0x00007f9e1c2963c0 @args=nil,
@value="{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",
\"wrapped\":\"GenerateTicketJob\",\"queue\":\"default\",
\"args\":[{\"job_class\":\"GenerateTicketJob\",\"job_id\":\"674c8510-02fe-4183-864c-9ce038ead984\",
\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,
\"arguments\":[{\"_aj_globalid\":\"gid://rails52-sample-app/Ticket/7\"}],\
"executions\":0,\"locale\":\"en\"}],\"retry\":true,\"jid\":\"bb447a9176f436a1343043ac\",
\"created_at\":1552206799.7265983,\"enqueued_at\":1552206799.7266936,
\"error_message\":\"Validation failed: Diff summary can't be blank\",
\"error_class\":\"ActiveRecord::RecordInvalid\",\"failed_at\":1552206800.230454,\"retry_count\":0}",
@item={"class"=>"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper",
"wrapped"=>"GenerateTicketJob", "queue"=>"default",
"args"=>[{
"job_class"=>"GenerateTicketJob", "job_id"=>"674c8510-02fe-4183-864c-9ce038ead984",
"provider_job_id"=>nil, "queue_name"=>"default", "priority"=>nil,
"arguments"=>[{
"_aj_globalid"=>"gid://rails52-sample-app/Ticket/7"}], "executions"=>0, "locale"=>"en"}],
"retry"=>true, "jid"=>"bb447a9176f436a1343043ac", "created_at"=>1552206799.7265983,
"enqueued_at"=>1552206799.7266936, "error_message"=>"Validation failed: Diff summary can't be blank",
"error_class"=>"ActiveRecord::RecordInvalid", "failed_at"=>1552206800.230454,
"retry_count"=>0}, @queue="default", @score=1552206832.2304695,
@parent=#<Sidekiq::RetrySet:0x00007f9e1c297b30 @name="retry", @_size=1>>
=> nil
ジョブを削除する
irb> Sidekiq::RetrySet.new.find_job(<JID>).delete
irb(main):026:0> Sidekiq::RetrySet.new.find_job('59318b9a94f9da40e973a951').delete
=> true
irb> Sidekiq::RetrySet.new.clear
irb(main):008:0> Sidekiq::RetrySet.new.clear
=> 1
Sidekiq ダッシュボードを使ってジョブを確認・削除する
ダッシュボード用のルートをマウントすれば http://localhost:3000/sidekiq/
にアクセスすることで表示できます。
require 'sidekiq/web'
mount Sidekiq::Web => '/sidekiq'
実行中のジョブを確認・停止(Sidekiqプロセスの停止)する
- 実行中のジョブを確認する
- 「実行中」タブを選択する
- 「すべて処理終了」ボタン
- 全プロセスを、新規ジョブ実行を受け付けない(Quit)状態にする
- 「すべて停止」ボタン
- 全プロセスを停止させる(bundle exec sidekiq プロセスが停止する)
- 実行中のジョブは Redis に戻される。Sidekiq を起動すると再度 Redis からジョブが読みだされて実行される)
2019-03-10T16:51:36.753Z 13783 TID-gq5pri3aj INFO: Shutting down
2019-03-10T16:51:36.753Z 13783 TID-gq5pri3aj INFO: Terminating quiet workers
2019-03-10T16:51:36.753Z 13783 TID-gq5qinqn7 INFO: Scheduler exiting...
2019-03-10T16:51:36.859Z 13783 TID-gq5pri3aj INFO: Pausing to allow workers to finish...
2019-03-10T16:51:44.754Z 13783 TID-gq5pri3aj WARN: Terminating 1 busy worker threads
2019-03-10T16:51:44.754Z 13783 TID-gq5pri3aj WARN: Work still in progress [#<struct Sidekiq::BasicFetch::UnitOfWork queue="queue:default", job="{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"GenerateTicketJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"GenerateTicketJob\",\"job_id\":\"5e6c6b86-3f62-4604-ab86-5431eabbfbaf\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"_aj_globalid\":\"gid://rails52-sample-app/Ticket/13\"}],\"executions\":0,\"locale\":\"en\"}],\"retry\":true,\"jid\":\"7871b44f67cf4d9d995e7cd0\",\"created_at\":1552236631.6320188,\"enqueued_at\":1552236656.0809422}">]
2019-03-10T16:51:44.755Z 13783 TID-gq5pri3aj INFO: Pushed 1 jobs back to Redis
2019-03-10T16:51:44.810Z 13783 TID-gq5pri3aj INFO: Bye!
2019-03-10T16:51:44.812Z 13783 TID-gq5qinppv GenerateTicketJob JID-7871b44f67cf4d9d995e7cd0 INFO: fail: 48.731 sec
キューにあるジョブを確認・削除する
- キューにあるジョブを確認する
- 「予定」タブを選択する
- キューにあるジョブを1つ削除する
- 「予定」タブから削除したいジョブを選択して「削除」ボタンを押す
リトライキューにあるジョブを確認・削除する
- リトライキューにあるジョブを確認する
- 「再試行」タブを選択する
- リトライキューにあるジョブを1つ削除する
- 「再試行」タブから削除したいジョブを選択して「削除」ボタンを押す
- リトライキューにあるジョブを全て削除する
- 「再試行」タブから「全て削除」ボタンを押す