Qiita Teams that are logged in
You are not logged in to any team

Log in to Qiita Team
Community
OrganizationEventAdvent CalendarQiitadon (β)
Service
Qiita JobsQiita ZineQiita Blog
71
Help us understand the problem. What are the problem?

More than 1 year has passed since last update.

posted at

updated at

Organization

Ruby on Rails の Active Job と SideKiq でバックグラウンドジョブをキューイングして実行する

はじめに

本記事は Active Job とアダプタとして Sidekiq を使ってバックグラウンドジョブを実行する環境を整えた時の備忘録です。

Active Job とは

Active Job は Rails におけるバックグラウンドジョブを動かすための共通インタフェースです。
バックグラウンドジョブを動かす Sidekiq、Resque、Delayed Job をアダプタとして利用できます。

Rails で提供されるのはジョブをメモリに保持するインプロセスのキューイングシステムだけなので Rails を再起動するとジョブは全て失われます。(アダプタを指定しなかった場合のデフォルト動作)(参考)

Rails4.2 から Active Job が利用できるようになりました。(参考)

ジョブを作成する

Rails コマンドの generate コマンドを使って作成できます。Rails ガイドにも作り方と Active Job の開設があるので参考にしてみて下さい。(参考)

ActiveJobのジョブを作成する
$ bin/rails g job generate_ticket
Running via Spring preloader in process 13038
      invoke  test_unit
      create    test/jobs/generate_ticket_job_test.rb
      create  app/jobs/generate_ticket_job.rb

すると app/job/ 配下に application_job.rb と generate でオプション指定された名前の末尾に _job が付いたファイルが作成されます。(例では generate_ticket_job.rb )

ジョブが実行すべきタイミングになると perform メソッドが呼び出されるので、このメソッドにジョブで実行したい処理を記述することになります。

作成されたファイルには何もすることが書かれておらずコメント「# Do something later」があるのみなので、ひとまず Hello Active Job. と stdout に出力することにします。

app/jobs/application_job.rb
class ApplicationJob < ActiveJob::Base
end
app/jobs/generate_ticket_job.rb
class GenerateTicketJob < ApplicationJob
  queue_as :default

  def perform(*args)
    p "Hello Active Job."
  end
end

以上で Active Job を呼び出す準備は終わりです。(まだアダプタは利用しない)

次のようにジョブクラスを使って perform_later メソッドを呼び出すだけです。
呼び出す際に引数が必要であれば指定できます。(perform メソッドの仮引数 *args で受け取れる。

# キューが空き次第実行する(実行時引数は無し)
GenerateTicketJob.perform_later
# 実行時引数 some_arg を渡してキューが空き次第実行する
GenerateTicketJob.perform_later some_arg

また、5秒後に実行したいといった時間指定は set メソッドをメソッドチェーンでつなぐことで実現できます。

# 5秒後に実行する
GenerateTicketJob.set(wait: 5.second).perform_later

ここまでで Rails でジョブを使うことは出来ます。
しかし、全てメモリ内で管理されているため Rails が停止すると実行する前のジョブは全て消え去り、Rails を起動しても実行されることはありません。

次に、Sidekiq を Active Job のアダプタとして使うことで、Rails が停止してもジョブが失われないようにしていきます。

Active Job のアダプタとして Sidekiq を使う

Active Job アダプタとして Sidekiq を使うことが出来ます。

Sidekiq はバックグラウンドジョブを動作させるためのフレームワークです。
Ruby が動作する環境であれば AcitveJob(Rails) を使わずとも Sidekiq は使えます。

Sidekiq を使うためには Client, Redis, Server の 3 つが必要です。

Rails から Sidekiq を使う場合、Client は Rails アプリケーション自身を指します。
GenerateTicketJob.perform_later 等を実行する主体が Client です。

Server は Rails と別プロセスとして起動します。
bundle exec sidekiq で実行します。

Redis は Job をキューイングするために使います。
Client からアクセスできるようにする必要があります。

本記事では Client, Server, Redis は同一ホストにインストールすることにします。

Sidekiq と Redis をインストールして起動する (Ubuntu用)

Redisのインストール
$ sudo apt-get -y update && sudo apt-get install -y redis-server
Redisを起動する(必要に応じて自動設定を有効にする)
# Redis を起動する
$ sudo systemctl start redis-server

# (必要に応じて) 自動起動設定を確認する
$ sudo systemctl status redis-server

# (必要に応じて) 自動起動設定を有効にする
$ sudo systemctl enable redis-server
Sidekiqをインストールする(Gemfileに記載してbundleを使ってinstallする場合)
# Sidekiq をインストールする
$ cat app/Gemfile
  : <snip>
# use sidekiq (https://github.com/mperham/sidekiq)
gem 'sidekiq'
$ bundle install

# Sidekiq を起動する
$ bundle exec sidekiq

Rails アプリケーションを設定する

config.active_job.queue_adapter にて :sidekiq を指定します。(参考)

config/application.rb
  : <snip>
module Rails52SampleApp
  class Application < Rails::Application
      : <snip>
    # use sidekiq
    config.active_job.queue_adapter = :sidekiq
  end
end

Sidekiq が動作することを確認する

以上で Rails を Client として Sidekiq を動作させるための設定は完了です。

Sidekiq が動作していること、Redis が動作していることを確認して Rails server を再起動しましょう。

Hello Active Job. が表示されたら成功です。

Sidekiq で動作したことを確認するためにも以下のように実行したジョブが増えていることを確認するとよいでしょう。

$ rails c --sandbox
irb(main):014:0> require 'sidekiq/api'
=> false
irb(main):014:0> Sidekiq::Stats.new.processed
=> 400
irb(main):014:0> Sidekiq::Stats.new.processed # 実行が完了すると数が増える
=> 401

Active Job に処理を記述する

perform メソッドに記述する時に出来る事・出来ないことなどを記述します。

ジョブ内でモデルを使う方法

Active Job 内では特別なことを行うことなく ActiveRecord が利用できます。(参考)

Each Sidekiq server process pulls jobs from the queue in Redis and processes them. Like your web processes, Sidekiq boots Rails so your jobs and workers have the full Rails API, including Active Record, available for use. The server will instantiate the worker and call perform with the given arguments. Everything else is up to your code.

例えば Ticket モデルがあり、そのモデルを作成したり特定の属性を表示させようとしたら次のように記述出来ます。

app/jobs/generate_ticket_job.rb
class GenerateTicketJob < ApplicationJob
  queue_as :default

  def perform
    ticket = Ticket.create!(name: '550e8400-e29b-41d4-a716-446655440000')
    p "Generating ticket #{ticket.name} ..."
  end
end

ジョブの引数に Rails のモデルを指定する

Sidekiq は perform_sync メソッドの引数に渡された値を JSON へ変換して Redis に保存します。
しかし複雑なオブジェクトの場合は JSON に変換されないことや、仮にキューがバックアップされて引用したオブジェクト側を変更した場合にどうなるか等を考えると、引数としてはインスタンスを渡さずに ID を渡す方がベストプラクティスのようです。

参考

Active Job の perform メソッドにモデルを引数として渡す場合はクラスと ID を指定する必要はなく、簡潔に書くことが出来るとのことです。参考

class TrashableCleanupJob  < ApplicationJob
  def perform(trashable, depth)
    trashable.cleanup(depth)
  end
end

Active Job を制御する

Rails の Controller 等から Active Job を制御する時の方法を記述します。

ジョブを同期的に実行する

ジョブをインスタンス化して、perform メソッドを直接呼び出せば同期的に実行できます。

GenerateTicketJob.new.perform

ジョブが終了するまでロックする

with_advisory_lock 等を利用してジョブを実行する前にロックをかけて、ジョブが終了したらロックを解除するとよいでしょう。

但し、ジョブが終了されずに強制削除される場合等を考慮して、ロックをいつでも解除できる仕組みは用意しておいた方がよいと思われます。

ジョブを Rails console で操作する

以下、共通で Rails console にて require 'sidekiq/api' を実行していることが前提となります。

'sidekiq/api' を読み込むことで Sidekiq::Queue, Sidekiq::RetrySet が実行できるようになります。

ジョブ実行結果の統計情報を表示する

irb> Sidekiq::Stats.new.to_json
irb(main):065:0> p Sidekiq::Stats.new.to_json
"{\"stats\":{\"processed\":401,\"failed\":334,\"scheduled_size\":0,\"retry_size\":0,\"dead_size\":0,\"processes_size\":1,\"default_queue_latency\":0,\"workers_size\":0,\"enqueued\":0}}"
項目 説明
processed 実行完了数
failed 実行失敗数
scheduled_size 予定キュー内ジョブ数
retry_size リトライキュー内ジョブ数
dead_size デッド状態(※1)のジョブ数
processes_size 実行中のジョブ数
default_queue_latency デフォルトキューの遅延時間(※2)
workers_size Worker の数
enqueued 全キュー内のジョブ数(リトライキューと予定キューは除く)

※1: ジョブ実行時に例外が発生するとリトライ数(デフォルトでは25回)だけリトライした後にデッド状態となる
※2: キュー内の最初のジョブがキューに入るまでの時間 (参考)

参考: https://github.com/mperham/sidekiq/wiki/API#stats, https://github.com/mperham/sidekiq/blob/master/lib/sidekiq/api.rb

リトライキューにあるジョブを確認・削除する

リトライキューは Sidekiq::RetrySet インスタンスをつかって操作できます。

ジョブを確認する

リトライキューにある全てのジョブを確認する
irb> Sidekiq::RetrySet.new.each {|job| puts "#{job.jid} #{job.klass} #{job.args}"}
リトライキューにある特定のジョブを確認する(実行例※見やすいよう改行してます)
irb(main):024:0> Sidekiq::RetrySet.new.find_job('59318b9a94f9da40e973a951')
=> #<Sidekiq::SortedEntry:0x00007f9e1c2cb250 @args=nil,
@value="{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",
\"wrapped\":\"GenerateTicketJob\",\"queue\":\"default\",
\"args\":[{\"job_class\":\"GenerateTicketJob\",
\"job_id\":\"dabb26ff-87fd-438c-b93f-f6964e624fec\",\"provider_job_id\":null,
\"queue_name\":\"default\",\"priority\":null,
\"arguments\":[{\"_aj_globalid\":\"gid://rails52-sample-app/Ticket/4\"}],
\"executions\":0,\"locale\":\"en\"}],\"retry\":true,
\"jid\":\"59318b9a94f9da40e973a951\",\"created_at\":1552203698.4819345,\"enqueued_at\":1552203762.003836,
\"error_message\":\"Validation failed: Diff summary can't be blank\",
\"error_class\":\"ActiveRecord::RecordInvalid\",\"failed_at\":1552203698.826664,
\"retry_count\":2,\"retried_at\":1552203762.1639638}", 
@item={"class"=>"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", 
"wrapped"=>"GenerateTicketJob", "queue"=>"default", 
"args"=>[{"job_class"=>"GenerateTicketJob", "job_id"=>"dabb26ff-87fd-438c-b93f-f6964e624fec", 
"provider_job_id"=>nil, "queue_name"=>"default", "priority"=>nil, 
"arguments"=>[{"_aj_globalid"=>"gid://rails52-sample-app/Ticket/4"}], 
"executions"=>0, "locale"=>"en"}], "retry"=>true, "jid"=>"59318b9a94f9da40e973a951", 
"created_at"=>1552203698.4819345, "enqueued_at"=>1552203762.003836, 
"error_message"=>"Validation failed: Diff summary can't be blank", 
"error_class"=>"ActiveRecord::RecordInvalid", "failed_at"=>1552203698.826664, 
"retry_count"=>2, "retried_at"=>1552203762.1639638}, @queue="default", 
@score=1552203835.1639743, @parent=#<Sidekiq::RetrySet:0x00007f9e1c2e9660 @name="retry", 
@_size=1>>
リトライキューを確認する(実行例※見やすいよう改行してます)
irb(main):007:0> Sidekiq::RetrySet.new.each {|job| p job }
#<Sidekiq::SortedEntry:0x00007f9e1c2963c0 @args=nil, 
@value="{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",
\"wrapped\":\"GenerateTicketJob\",\"queue\":\"default\",
\"args\":[{\"job_class\":\"GenerateTicketJob\",\"job_id\":\"674c8510-02fe-4183-864c-9ce038ead984\",
\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,
\"arguments\":[{\"_aj_globalid\":\"gid://rails52-sample-app/Ticket/7\"}],\
"executions\":0,\"locale\":\"en\"}],\"retry\":true,\"jid\":\"bb447a9176f436a1343043ac\",
\"created_at\":1552206799.7265983,\"enqueued_at\":1552206799.7266936,
\"error_message\":\"Validation failed: Diff summary can't be blank\",
\"error_class\":\"ActiveRecord::RecordInvalid\",\"failed_at\":1552206800.230454,\"retry_count\":0}", 
@item={"class"=>"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper", 
"wrapped"=>"GenerateTicketJob", "queue"=>"default", 
"args"=>[{
"job_class"=>"GenerateTicketJob", "job_id"=>"674c8510-02fe-4183-864c-9ce038ead984", 
"provider_job_id"=>nil, "queue_name"=>"default", "priority"=>nil, 
"arguments"=>[{
"_aj_globalid"=>"gid://rails52-sample-app/Ticket/7"}], "executions"=>0, "locale"=>"en"}], 
"retry"=>true, "jid"=>"bb447a9176f436a1343043ac", "created_at"=>1552206799.7265983, 
"enqueued_at"=>1552206799.7266936, "error_message"=>"Validation failed: Diff summary can't be blank", 
"error_class"=>"ActiveRecord::RecordInvalid", "failed_at"=>1552206800.230454, 
"retry_count"=>0}, @queue="default", @score=1552206832.2304695, 
@parent=#<Sidekiq::RetrySet:0x00007f9e1c297b30 @name="retry", @_size=1>>
=> nil

ジョブを削除する

リトライキューを1つ削除する
irb> Sidekiq::RetrySet.new.find_job(<JID>).delete
リトライキューを1つ削除する(実行例)
irb(main):026:0> Sidekiq::RetrySet.new.find_job('59318b9a94f9da40e973a951').delete
=> true
リトライキューを全て削除する
irb> Sidekiq::RetrySet.new.clear
リトライキューを全て削除する(実行例)
irb(main):008:0> Sidekiq::RetrySet.new.clear
=> 1

Sidekiq ダッシュボードを使ってジョブを確認・削除する

ダッシュボード用のルートをマウントすれば http://localhost:3000/sidekiq/ にアクセスすることで表示できます。

routes.rb
require 'sidekiq/web'
mount Sidekiq::Web => '/sidekiq'

image.png

実行中のジョブを確認・停止(Sidekiqプロセスの停止)する

image.png

  • 実行中のジョブを確認する
    • 「実行中」タブを選択する
  • 「すべて処理終了」ボタン
    • 全プロセスを、新規ジョブ実行を受け付けない(Quit)状態にする
  • 「すべて停止」ボタン
    • 全プロセスを停止させる(bundle exec sidekiq プロセスが停止する)
    • 実行中のジョブは Redis に戻される。Sidekiq を起動すると再度 Redis からジョブが読みだされて実行される)
「すべて停止」ボタン実行時のSidekiqServerログ例(実行中のジョブが残っていた場合)
2019-03-10T16:51:36.753Z 13783 TID-gq5pri3aj INFO: Shutting down
2019-03-10T16:51:36.753Z 13783 TID-gq5pri3aj INFO: Terminating quiet workers
2019-03-10T16:51:36.753Z 13783 TID-gq5qinqn7 INFO: Scheduler exiting...
2019-03-10T16:51:36.859Z 13783 TID-gq5pri3aj INFO: Pausing to allow workers to finish...
2019-03-10T16:51:44.754Z 13783 TID-gq5pri3aj WARN: Terminating 1 busy worker threads
2019-03-10T16:51:44.754Z 13783 TID-gq5pri3aj WARN: Work still in progress [#<struct Sidekiq::BasicFetch::UnitOfWork queue="queue:default", job="{\"class\":\"ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper\",\"wrapped\":\"GenerateTicketJob\",\"queue\":\"default\",\"args\":[{\"job_class\":\"GenerateTicketJob\",\"job_id\":\"5e6c6b86-3f62-4604-ab86-5431eabbfbaf\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[{\"_aj_globalid\":\"gid://rails52-sample-app/Ticket/13\"}],\"executions\":0,\"locale\":\"en\"}],\"retry\":true,\"jid\":\"7871b44f67cf4d9d995e7cd0\",\"created_at\":1552236631.6320188,\"enqueued_at\":1552236656.0809422}">]
2019-03-10T16:51:44.755Z 13783 TID-gq5pri3aj INFO: Pushed 1 jobs back to Redis
2019-03-10T16:51:44.810Z 13783 TID-gq5pri3aj INFO: Bye!
2019-03-10T16:51:44.812Z 13783 TID-gq5qinppv GenerateTicketJob JID-7871b44f67cf4d9d995e7cd0 INFO: fail: 48.731 sec

キューにあるジョブを確認・削除する

image.png

  • キューにあるジョブを確認する
    • 「予定」タブを選択する
  • キューにあるジョブを1つ削除する
    • 「予定」タブから削除したいジョブを選択して「削除」ボタンを押す

リトライキューにあるジョブを確認・削除する

image.png

  • リトライキューにあるジョブを確認する
    • 「再試行」タブを選択する
  • リトライキューにあるジョブを1つ削除する
    • 「再試行」タブから削除したいジョブを選択して「削除」ボタンを押す
  • リトライキューにあるジョブを全て削除する
    • 「再試行」タブから「全て削除」ボタンを押す
Why not register and get more from Qiita?
  1. We will deliver articles that match you
    By following users and tags, you can catch up information on technical fields that you are interested in as a whole
  2. you can read useful information later efficiently
    By "stocking" the articles you like, you can search right away
71
Help us understand the problem. What are the problem?