ここで扱う「バックグラウンドジョブ」は ActiveJob
の perform_later
のような、一度ジョブキューに入れて、別プロセスが実際の処理を行うようなものの事を指しています。
以下では基本的にバックグラウンドジョブは Sidekiq を例に扱っています。
バックグラウンドジョブと transaction の注意点
class UserMailWorker
include Sidekiq::Worker
def perform(user_id)
user = User.find(user_id)
# 送信処理
end
end
User.transaction do
user = User.create!(param)
UserMailWorker.perform_async(user.id)
# 他の transaction 内でするべき処理など
end
このコードの問題点は、User.transaction
が COMMIT される前に UserMailWorker
がジョブを実行してしまうと対象の User のデータはまだ永続化されていないため取得ができない事です。
注意点に関する基本となる解決方法
愚直な方法
User.transaction do
user = User.create!(param)
# 他の transaction 内でするべき処理など
end
UserMailWorker.perform_async(user.id)
コードの配置場所として違和感がなければ上記が最も単純な解決方法です。
after_commit を利用する方法
上記の場合は User の作成に紐付いているので以下でも解決できます。
class User < ApplicationRecord
after_create_commit :send_mail
def send_mail
UserMailWorker.perform_async(id)
end
これらはいずれも Sidekiq 側から提示されている
Why am I seeing a lot of "Can't find ModelName with ID=12345" errors with Sidekiq?
Your client is creating the Model instance within a transaction and pushing a job to Sidekiq. Sidekiq is trying to execute your job before the transaction has actually committed. Use Rails's after_commit :on => :create hook or move the job creation outside of the transaction block.
ほとんどのケースではこの方法で解消していくことになると思います。
以降では、少し面倒なケースを見ていきます。
status のような複数回更新があるようなケースはどうするか
上記では after_create_commit
だったので、これは作成時以外には呼び出されません。一方で status
のような enum で宣言されそうな属性に応じてバックグラウンドジョブを呼び分けたいケースではどうするでしょうか?
ActiveModel::Dirty の previsous_changes を利用する
class Conversation < ActiveRecord::Base
enum status: [:inactive :active, :archived]
after_update_commit :notify_active
def notify_active
if previous_changes.key?(:status) && previous_changes[:status].first.to_s == 'active'
NotifyWorker.perform_async(id)
end
end
end
正直この例だと active
の更新するメソッドを作って、その中で呼べばいいじゃん感もありますが、ここでは __transaction をどの範囲で利用されるか分からない__という状況を想定しているので、こうする必要があるケースもあるかなと。
AASM を使う時は注意
status のような属性を扱うと AASM のような gem を使いたくなることがあります。
AASM は after_commit
フックを event (status の変更が発生するイベントのこと)単位で after_commit
を作成することができ、今回のようなケースでは使いやすいように見えます。
class Conversation < ActiveRecord::Base
enum status: [:inactive :active, :archived]
aasm column: :status, enum: true do
state :inactive, initial: true
state :active
state :archived
event :activate, after_commit: :notify_active do
transitions from: :inactive, to: :active
end
event :archive do
transitions from: %i(inactive active), to: :archive
end
end
def notify_active
# activate イベントでしか実行されないので、ここに条件は不要
NotifyWorker.perform_async(id)
end
end
ただ、AASM の after_commit
は Rails の after_commit
とは異なっており transaction とは連動していません。
module AASM
module Persistence
module ORM
def aasm_fire_event(state_machine_name, name, options, *args, &block)
if aasm_supports_transactions? && options[:persist]
event = self.class.aasm(state_machine_name).state_machine.events[name]
event.fire_callbacks(:before_transaction, self, *args)
event.fire_global_callbacks(:before_all_transactions, self, *args)
begin
# 🏁 ここの super で lifecycle が実行される
# https://github.com/aasm/aasm#lifecycle
success = if options[:persist] && use_transactions?(state_machine_name)
aasm_transaction(requires_new?(state_machine_name), requires_lock?(state_machine_name)) do
super
end
else
super
end
# ☠️ after_commit は単に lifecycle が終了した時に呼ばれるだけで、transaction とは関係が無い
if success
event.fire_callbacks(:after_commit, self, *args)
event.fire_global_callbacks(:after_all_commits, self, *args)
end
success
ensure
event.fire_callbacks(:after_transaction, self, *args)
event.fire_global_callbacks(:after_all_transactions, self, *args)
end
else
super
end
end
インラインでコメントしている通りですが、AASM の after_commit は AASM の lifecycle (もちろん実際のイベント更新含む)を正常に終えた時に呼ばれるだけであり、transaction には関係しません。
以下の条件を満たす場合には、期待通りに動作します。
- AASM のイベント更新メソッドは永続化させるものを使う (conversation.activate!)
- 上記が transaction で囲まれていない
after_commit を利用せずに transaction の終了後に実行されるようにするケース
この動機はおそらくいくつかあって、「after_commit
での分岐が辛い」「transaction が開かれている場所から遠い」などありそうです。
gem を利用する
いくつかありますが、ここでは二つ紹介します。
grosser/ar_after_transaction
class User
after_create :do_stuff, :oops
def do_stuff
after_transaction do
send_an_email # cannot be rolled back
end
comments.create(...) # will be rolled back
end
def oops
raise "do the rolback!"
end
end
上記のREADMEの例が分かりやすいですが、after_transaction
というメソッドを提供してくれます。
これは after_transaction
がよびだされた際に、その時の transaction の open 状態を確認し、0 でなければコネクションに生やした配列に逃し、transaction の終了の度に open 状態を確認し、0 になれば実行するというものです。
ActiveRecord::Base
を直接拡張しているため、switch_point
のような複数の接続先につなぐライブラリを使っていると上手く動かないかもしれません。
以下のブログで作成経緯等にも触れられています。
When Create Does Not Create in Ruby on Rails, Wait for Transaction! - netguru
instructure/after_transaction_commit
こちらも上記に似ています。(最近更新されて、最新は Rails5 系のみサポートになっています)
ActiveRecord::Base.transaction do
ActiveRecord::Base.after_transaction_commit { run_some_background_job }
# run_some_background_job has not run yet
end
# now, it has run
# this one runs immediately, since we are outside a transaction
ActiveRecord::Base.after_transaction_commit { some_other_task }
こちらもREADMEの例の通りです。
こちらは先程の gem とは拡張の仕方が異なります。
先程の gem は ActiveRecord::Base.transaction
(実際には Transactions module にある) に alias_method
を使ってメソッドの置き換えをして拡張していましたが、この gem はそのさらに内部の ActiveRecord::ConnectionAdapters::Transaction
の commit_records
を override しています。
https://github.com/instructure/after_transaction_commit/blob/master/lib/after_transaction_commit/database_statements.rb
このコミットで一番外側の transaction を取得しているのですが transaction に joinable というオプションがあることに初めて気づきました。
gem を利用しない
最後に紹介した gem で ActiveRecord::ConnectionAdapters::Transaction
の commit_records
が出てきましたが、これを利用できます。
module ActiveRecord
module Transactions
def destroy #:nodoc:
with_transaction_returning_status { super }
end
def save(*) #:nodoc:
rollback_active_record_state! do
with_transaction_returning_status { super }
end
end
def save!(*) #:nodoc:
with_transaction_returning_status { super }
end
def touch(*) #:nodoc:
with_transaction_returning_status { super }
end
def with_transaction_returning_status
status = nil
self.class.transaction do
add_to_transaction
begin
status = yield
rescue ActiveRecord::Rollback
clear_transaction_record_state
status = nil
end
raise ActiveRecord::Rollback unless status
end
status
ensure
if @transaction_state && @transaction_state.committed?
clear_transaction_record_state
end
end
def add_to_transaction
if has_transactional_callbacks?
self.class.connection.add_transaction_record(self)
else
sync_with_transaction_state
set_transaction_state(self.class.connection.transaction_state)
end
remember_transaction_record_state
end
private
def has_transactional_callbacks?
!_rollback_callbacks.empty? || !_commit_callbacks.empty? || !_before_commit_callbacks.empty?
end
上記のコードを見てもらうとなんとなくわかりますが、save のようなタイミングで #with_transaction_returning_status
が呼ばれ、そこで #add_to_transaction
が呼ばれます。
これは #has_transactional_callbacks?
が true だった場合に self.class.connection.add_transaction_record(self)
を呼び出すのですが、この #has_transactional_callbacks?
は after_commit
のような実際の DB transaction に関連する callback が設定されている時に true となります。
module ActiveRecord
module ConnectionAdapters # :nodoc:
module DatabaseStatements
def add_transaction_record(record)
current_transaction.add_record(record)
end
module ActiveRecord
module ConnectionAdapters
class Transaction #:nodoc:
attr_reader :connection, :state, :records, :savepoint_name
attr_writer :joinable
def initialize(connection, options, run_commit_callbacks: false)
@connection = connection
@state = TransactionState.new
@records = []
@joinable = options.fetch(:joinable, true)
@run_commit_callbacks = run_commit_callbacks
end
def add_record(record)
records << record
end
def commit_records
ite = records.uniq
while record = ite.shift
if @run_commit_callbacks
record.committed!
else
# if not running callbacks, only adds the record to the parent transaction
record.add_to_transaction
end
end
ensure
ite.each { |i| i.committed!(should_run_callbacks: false) }
end
二つのファイルのコードを抜粋しましたが、最終的に Transaction
の @records
に追加されていっているのが分かります。
そして、ここで最後に紹介した gem が override していた #commit_records
が登場するのが分かります。
module ActiveRecord
module ConnectionAdapters
class TransactionManager #:nodoc:
def initialize(connection)
@stack = []
@connection = connection
end
def begin_transaction(options = {})
# 略
end
def commit_transaction
@connection.lock.synchronize do
transaction = @stack.last
begin
transaction.before_commit_records
ensure
@stack.pop
end
transaction.commit
# 🏁 ここで commit_records
transaction.commit_records
end
end
def rollback_transaction(transaction = nil)
@connection.lock.synchronize do
transaction ||= @stack.pop
transaction.rollback
transaction.rollback_records
end
end
def within_new_transaction(options = {})
@connection.lock.synchronize do
begin
transaction = begin_transaction options
yield
rescue Exception => error
if transaction
rollback_transaction
after_failure_actions(transaction, error)
end
raise
ensure
unless error
if Thread.current.status == "aborting"
rollback_transaction if transaction
else
begin
# 🏁 ここで commit_transaction
commit_transaction
rescue Exception
rollback_transaction(transaction) unless transaction.state.completed?
raise
end
end
end
end
end
end
最後にざくっと TransactionManager
のコードを貼ってしまうのですが、transaction が正常に終了した場合に #commit_records
が呼び出されていることが分かります。
ダミーの record を登録し commit_records に呼び出させる
class AsyncRecord
def initialize(*args)
@args = args
end
def has_transactional_callbacks?
true
end
def committed!(*_, **__)
do_after_commit
end
def rolledback!(*_, **__)
# after_rollback 相当の処理
end
def do_after_commit
# after_commit の処理
end
end
def perform_after_transaction(*args)
async_record = AsyncRecord.new(*args)
if ActiveRecord::Base.connection.transaction_open?
ActiveRecord::Base.
connection.
current_transaction.
add_record(async_record)
else
async_record.do_after_commit
end
end
record が反応するべきメソッドだけ作っておき、後はそれを transaction に直接 add_record
するだけです。
これは、拡張はしていないものの、内部の実装に依存していますので gem を使うのとどちらがいいとは一概には言えません。
- Run it in a background job, after a commit, from a service object
after_commit
callback? - cerebris/jsonapi-resources- 2 years after the first event - The Saga Pattern
最後に
今回はバックグラウンドジョブと DB transaction の間での注意点について書きました。
自分たちだけかもしれませんが、Rails を使っていると transaction の境界をどこにするかが「ここ!」と決めにくいです。(Service 層等は使っていないので)
加えて、Sidekiq は非常に高速にジョブを持っていくため、容易に問題が発生します。複雑にならない程度に今回紹介したような transaction の外で実行が保証される方法を使ってみるのも良いかもしれません。
参考
-
ActiveRecordのCallbackを利用して、Sidekiqにenqueueしてハマった - Qiita
- after_commit を利用する例です
-
Complicated workflows using active job - rebased
- AASM を利用する例です
-
A Couple of Callback Gotchas (And a Rails 5 Fix) - Justin Weiss
- after_commit は Rails の test が transaction を rollback して実行する時に辛い問題。(でも Rails5 で解消)
- 今回紹介していない gem