なぜ書くのか?
ユーザー登録時にslackに通知する機能を実装した際に、他の実装を真似てApplicationJob
を継承した。実装自体はできたが、Active Job
の役割は非同期処理をするくらいの認識で、その仕組みについてしらないことだらけ。なので、Railsガイドに参考に分からない単語を調べたり、実際に実装してみて確かめながら理解を深めていくことにする。
Active Jobの目的
Active Jobの主要な目的は、あらゆるRailsアプリケーションにジョブ管理インフラを配置することです。これにより、Delayed JobとResqueなどのように、さまざまなジョブ実行機能のAPIの違いを気にせずにジョブフレームワーク機能やその他のgemを搭載することができるようになります。バックエンドでのキューイング作業では、操作方法以外のことを気にせずに済みます。さらに、ジョブ管理フレームワークを切り替える際にジョブを書き直さずに済みます。
railsガイドより引用
色々難しいことを書いているが、ここでは一旦Active Job
は非同期処理を実装するための機能であり、複数のgemと連携して使うこともできると理解しておく。
まずは言葉の理解から
Active jobの理解の前に頻繁にでてくる単語、Job(ジョブ)とqueue(キュー)について理解する。技術ブログ ActiveJobとはを参考
jobとは
ジョブは現実世界で例えるなら、買い物にいく、料理をする、片付けをするといったような一つ一つのタスクのようなもの。
queueとは
このようにタスクを順番にqueueに登録していき、jobを順番に実行していく。このフローのことをジョブキューと呼ばれ、先に登録されたものから先に実行される。(先入れ先出し)
実装例
例えば、ユーザー登録時にメール送信とslackに通知する場合
キューを利用することで、メール送信とslack通知を待たずに、次の処理に入ることできる。またキューで登録された処理が仮に失敗しても、次の処理を行うことも可能にする。
jobとqueueについて理解できたので、下記では実際にジョブを作成しキューに登録、実行までの流れを確認していく。
簡単なJobを作成して実行して見る
- User modelを作成し、UserJobを作成
$ rails new active_job_sample
$ cd active_job_sample
$ bin/rails db:create
$ bin/rails g model user name
$ bin/rails db:migrate
#自動で下記のクラスを作成する
$ bin/rails generate job user
class UserJob < ApplicationJob
queue_as :default
def perform(name="hoge")
# 下記は追記した
User.create!(title: name)
end
end
performメソッドの種類について確認
- 上記のperformは2種類の呼び出し方がある
メソッド名 | 概要 |
---|---|
perform_now | 同期処理: キューに入ることなく即座に実行される |
perform_later | 非同期処理: ジョブをキューに入れ、キューが空き次第ジョブを実行する |
rails c
でjobの動作確認したいときなどに、perform_now
を使うとよい!
Active Jobはキューイングのバックエンドが必要 ?
Rails自身が提供するのは、ジョブをメモリに保持するインプロセスのキューイングシステムだけ。 プロセスがクラッシュしたりコンピュータをリセットしたりすると、デフォルトの非同期バックエンドの振る舞いによって主要なジョブが失われてしまう。なので、開発環境や小規模アプリケーションの場合には、ActiveJobのみでも問題なさそうだが、本番環境でアプリケーションを運用していく場合は、キューイングのバックエンドが必要になる。
キューイングライブラリ
キューイングライブラリとして有名なのは、Delayed Job
やSidekiq
、Resque
など。これらは、Active Job
が導入される以前から、非同期処理を行うgemとして使われていたらしい。
Active Job
はどのgemに対しても、キューイングバックエンドに接続できるアダプタがビルトインで用意されている(優秀)。また、Active Job
を使用せずに上記のgemのみを使用することも可能である。今回は上記のgemの中でも最もdefaultなSidekiq
を使用してみる。
Sidekiqって?
sidekiqの特長↓
項目 | |
---|---|
DB | Redis |
スレッド | マルチスレッド |
リトライ処理 | あり |
- Redisとは
- Redisは、ネットワーク接続された永続化可能なインメモリデータベース。ジョブの情報を保存している。
- スレッドとはプログラムの一連の処理のまとまりのこと。マルチスレッドのプログラムは、複数の処理を並行して実行させることが可能。
- Job(Worker)の中でraiseをキャッチして、自動でリトライする。
- デフォルトの設定では21日間に25回のリトライを行う。リトライは、m2秒後、4秒後、8秒後・・・というように指数関数的に間隔をあけ、行われる。
sidekiqを利用してみる
gem 'sidekiq'
gem 'sinatra', require: false # ダッシュボードを利用するため
bundle install
module ActiveJobSample
class Application < Rails::Application
...
config.active_job.queue_adapter = :sidekiq
...
end
end
Sidekiq.configure_server do |config|
config.Redis = { url: 'Redis://localhost:6379'}
end
Sidekiq.configure_client do |config|
config.Redis = { url: 'Redis://localhost:6379'}
end
:verbose: false # 対話モード
:concurrency: 10 # worker process 数
:queues: # 処理するキュー名
- default
- 大きなプロジェクトではqueueを複数に分けて、queueごとにsidekiqのプロセス(インスタンス)を分けるが、今回は動作確認程度なので
default
にする。
$ bundle exec sidekiq -C config/sidekiq.yml
-
http://localhost:3000/sidekiq
にアクセスしてダッシュボードを確認
これでようやく、sidekiqのキューにジョブを登録する準備が整った
コンソールからジョブを実行してみる
$ rails c
irb(main):001:0> UserJob.perform_later
Enqueued UserJob (Job ID: 02002483-9077-4832-963b-1058ef6534fe) to Sidekiq(default)
=> #<UserJob:0x00007ffa08f36f38 @arguments=[], @job_id="02002483-9077-4832-963b-1058ef6534fe", @queue_name="default", @priority=nil, @executions=0, @exception_executions={}, @timezone="UTC", @provider_job_id="26bcc5664da36768f743b851">
完了が1になった!いい感じにsidekiq
が機能し、非同期で実装できていそう!
この他にも、set
メソッドを使いジョブの実行をスケジュールすることも可能
内部実装を見ていく
Activejobかキューイングライブラリsidekiqと連携するまで
module ActiveJob # :nodoc:
...
class Base
include Enqueuing
...
end
...
end
ActiveJob::Base
はActiveJob::Enqueuing
をincludeしており、ActiveJob::Enqueuing
にクラスメソッドとしてperform_later
メソッドが定義されている
module ActiveJob
module Enqueuing
extend ActiveSupport::Concern
...
module ClassMethods
def perform_later(...)
job = job_or_instantiate(...)
enqueue_result = job.enqueue
yield job if block_given?
enqueue_result
end
private
def job_or_instantiate(*args) # :doc:
args.first.is_a?(self) ? args.first : new(*args)
end
...
end
...
def enqueue(options = {})
self.scheduled_at = options[:wait].seconds.from_now.to_f if options[:wait]
self.scheduled_at = options[:wait_until].to_f if options[:wait_until]
self.queue_name = self.class.queue_name_from_part(options[:queue]) if options[:queue]
self.priority = options[:priority].to_i if options[:priority]
self.successfully_enqueued = false
run_callbacks :enqueue do
if scheduled_at
queue_adapter.enqueue_at self, scheduled_at
else
queue_adapter.enqueue self
end
self.successfully_enqueued = true
rescue EnqueueError => e
self.enqueue_error = e
end
...
end
end
perform_laterの引数に何も指定しなければ定義したActiveJobがインスタンス化され、enqueue
メソッドが呼び出される。enqueue
メソッドで、ジョブがscheduledされていないか?やどのキューに登録するかを確認する。queue_adapter
はconfig.active_job.queue_adapter
で指定した名前から生成されるアダプタクラスになる。つまり、今回では:sidekiq
を指定しているので、下記のqueue_adapter
メソッドを呼びActiveJob::QueueAdapters::SidekiqAdapter
が生成されることになる。
def queue_adapter=(name_or_adapter)
case name_or_adapter
when Symbol, String
queue_adapter = ActiveJob::QueueAdapters.lookup(name_or_adapter).new
assign_adapter(name_or_adapter.to_s, queue_adapter)
else
if queue_adapter?(name_or_adapter)
adapter_name = "#{name_or_adapter.class.name.demodulize.remove('Adapter').underscore}"
assign_adapter(adapter_name, name_or_adapter)
else
raise ArgumentError
end
end
end
その後、ActiveJob::QueueAdapters::SidekiqAdapter
内で、ジョブのクラス名や引数を Sidekiq::Client
に投げている。
def enqueue(job) # :nodoc:
# Sidekiq::Client does not support symbols as keys
job.provider_job_id = Sidekiq::Client.push \
"class" => JobWrapper,
"wrapped" => job.class,
"queue" => job.queue_name,
"args" => [ job.serialize ]
end
Sidekiqによるエンキューとデキュー
コードが多くなりすぎるので、テキストのみ掲載
Redisのキューにジョブの情報を登録
-
Sidekiq.client.push
処理でsidekiq内の実装で、受け取ったジョブ情報をJsonに直し、ジョブIDの情報を追加して、 Redisのqueues:#{queue_name}
キーに追加
Redisのキューから登録したジョブを実行する
-
bundle exec sidekiq
を実行し、Sidekiq::Manager
がworker作成。その後、workerはそれぞれのスレッドを起動 -
Sidekiq::Processor
がメインの処理で、queueから取り出したjobを実行する-
process_one
メソッド内で、Redisからjobの情報をdequeue
する -
process
メソッドで、取得したjobから各種情報を引き出しロードし、dispatch
メソッド内でjobのインスタンスを作成して、jobの処理を実行している。
-
感想
社内のソースコードを真似すればいい感じに実装されるといったことが多々あり、あたかも自分が簡単に複雑な機能を実装したと勘違いしてしまいやすい。今回のslack通知を実装した際も、なんか非同期になってる、なんかRedisにジョブの情報を保存してるといった曖昧な理解しかできなかった。なんとなくでその場を乗り切ったとしても、自分の実力は上がらないし知識も増えない。だから、今後も日々の業務で理解しきれなかったことを今回のようにミニマムで実際に動かしてみたり、コードリーディングを通して仕組みを理解するようにする。
参考記事