LoginSignup
26
22

More than 5 years have passed since last update.

Rails でのバックグラウンドジョブの実行と transaction 利用について

Posted at

ここで扱う「バックグラウンドジョブ」は ActiveJobperform_later のような、一度ジョブキューに入れて、別プロセスが実際の処理を行うようなものの事を指しています。

以下では基本的にバックグラウンドジョブは Sidekiq を例に扱っています。

バックグラウンドジョブと transaction の注意点

class UserMailWorker
  include Sidekiq::Worker

  def perform(user_id)
    user = User.find(user_id)

    # 送信処理
  end
end

User.transaction do
  user = User.create!(param)

  UserMailWorker.perform_async(user.id)

  # 他の transaction 内でするべき処理など
end

このコードの問題点は、User.transaction が COMMIT される前に UserMailWorker がジョブを実行してしまうと対象の User のデータはまだ永続化されていないため取得ができない事です。

注意点に関する基本となる解決方法

愚直な方法

User.transaction do
  user = User.create!(param)

  # 他の transaction 内でするべき処理など
end

UserMailWorker.perform_async(user.id)

コードの配置場所として違和感がなければ上記が最も単純な解決方法です。

after_commit を利用する方法

上記の場合は User の作成に紐付いているので以下でも解決できます。

user.rb
class User < ApplicationRecord
  after_create_commit :send_mail

  def send_mail
    UserMailWorker.perform_async(id)
  end

これらはいずれも Sidekiq 側から提示されている

FAQ - Sidekiq Wiki

Why am I seeing a lot of "Can't find ModelName with ID=12345" errors with Sidekiq?

Your client is creating the Model instance within a transaction and pushing a job to Sidekiq. Sidekiq is trying to execute your job before the transaction has actually committed. Use Rails's after_commit :on => :create hook or move the job creation outside of the transaction block.

ほとんどのケースではこの方法で解消していくことになると思います。

以降では、少し面倒なケースを見ていきます。

status のような複数回更新があるようなケースはどうするか

上記では after_create_commit だったので、これは作成時以外には呼び出されません。一方で status のような enum で宣言されそうな属性に応じてバックグラウンドジョブを呼び分けたいケースではどうするでしょうか?

ActiveModel::Dirty の previsous_changes を利用する

class Conversation < ActiveRecord::Base
  enum status: [:inactive :active, :archived]

  after_update_commit :notify_active

  def notify_active
    if previous_changes.key?(:status) && previous_changes[:status].first.to_s == 'active'
      NotifyWorker.perform_async(id)
    end
  end
end

正直この例だと active の更新するメソッドを作って、その中で呼べばいいじゃん感もありますが、ここでは transaction をどの範囲で利用されるか分からないという状況を想定しているので、こうする必要があるケースもあるかなと。

AASM を使う時は注意

status のような属性を扱うと AASM のような gem を使いたくなることがあります。

AASM は after_commit フックを event (status の変更が発生するイベントのこと)単位で after_commit を作成することができ、今回のようなケースでは使いやすいように見えます。

class Conversation < ActiveRecord::Base
  enum status: [:inactive :active, :archived]

  aasm column: :status, enum: true do
    state :inactive, initial: true
    state :active
    state :archived

    event :activate, after_commit: :notify_active do
      transitions from: :inactive, to: :active
    end

    event :archive do
      transitions from: %i(inactive active), to: :archive
    end
  end

  def notify_active
    # activate イベントでしか実行されないので、ここに条件は不要
    NotifyWorker.perform_async(id)
  end
end

ただ、AASM の after_commit は Rails の after_commit とは異なっており transaction とは連動していません。

lib/aasm/persistence/orm.rb
module AASM
  module Persistence
    module ORM

      def aasm_fire_event(state_machine_name, name, options, *args, &block)
        if aasm_supports_transactions? && options[:persist]
          event = self.class.aasm(state_machine_name).state_machine.events[name]
          event.fire_callbacks(:before_transaction, self, *args)
          event.fire_global_callbacks(:before_all_transactions, self, *args)

          begin
            # 🏁 ここの super で lifecycle が実行される
            # https://github.com/aasm/aasm#lifecycle
            success = if options[:persist] && use_transactions?(state_machine_name)
              aasm_transaction(requires_new?(state_machine_name), requires_lock?(state_machine_name)) do
                super
              end
            else
              super
            end
            # ☠️ after_commit は単に lifecycle が終了した時に呼ばれるだけで、transaction とは関係が無い
            if success
              event.fire_callbacks(:after_commit, self, *args)
              event.fire_global_callbacks(:after_all_commits, self, *args)
            end

            success
          ensure
            event.fire_callbacks(:after_transaction, self, *args)
            event.fire_global_callbacks(:after_all_transactions, self, *args)
          end
        else
          super
        end
      end

インラインでコメントしている通りですが、AASM の after_commit は AASM の lifecycle (もちろん実際のイベント更新含む)を正常に終えた時に呼ばれるだけであり、transaction には関係しません。

以下の条件を満たす場合には、期待通りに動作します。

  • AASM のイベント更新メソッドは永続化させるものを使う (conversation.activate!)
  • 上記が transaction で囲まれていない

after_commit を利用せずに transaction の終了後に実行されるようにするケース

この動機はおそらくいくつかあって、「after_commit での分岐が辛い」「transaction が開かれている場所から遠い」などありそうです。

gem を利用する

いくつかありますが、ここでは二つ紹介します。

grosser/ar_after_transaction

READMEの例
class User
  after_create :do_stuff, :oops

  def do_stuff
    after_transaction do
      send_an_email # cannot be rolled back
    end
    comments.create(...) # will be rolled back
  end

  def oops
    raise "do the rolback!"
  end
end

上記のREADMEの例が分かりやすいですが、after_transaction というメソッドを提供してくれます。

これは after_transaction がよびだされた際に、その時の transaction の open 状態を確認し、0 でなければコネクションに生やした配列に逃し、transaction の終了の度に open 状態を確認し、0 になれば実行するというものです。

ActiveRecord::Base を直接拡張しているため、switch_point のような複数の接続先につなぐライブラリを使っていると上手く動かないかもしれません。

以下のブログで作成経緯等にも触れられています。
When Create Does Not Create in Ruby on Rails, Wait for Transaction! - netguru

instructure/after_transaction_commit

こちらも上記に似ています。(最近更新されて、最新は Rails5 系のみサポートになっています)

READMEの例
ActiveRecord::Base.transaction do
  ActiveRecord::Base.after_transaction_commit { run_some_background_job }
  # run_some_background_job has not run yet
end
# now, it has run

# this one runs immediately, since we are outside a transaction
ActiveRecord::Base.after_transaction_commit { some_other_task }

こちらもREADMEの例の通りです。

こちらは先程の gem とは拡張の仕方が異なります。
先程の gem は ActiveRecord::Base.transaction (実際には Transactions module にある) に alias_method を使ってメソッドの置き換えをして拡張していましたが、この gem はそのさらに内部の ActiveRecord::ConnectionAdapters::Transactioncommit_records を override しています。

https://github.com/instructure/after_transaction_commit/blob/master/lib/after_transaction_commit/database_statements.rb
このコミットで一番外側の transaction を取得しているのですが transaction に joinable というオプションがあることに初めて気づきました。

gem を利用しない

最後に紹介した gem で ActiveRecord::ConnectionAdapters::Transactioncommit_records が出てきましたが、これを利用できます。

コード

module ActiveRecord
  module Transactions

    def destroy #:nodoc:
      with_transaction_returning_status { super }
    end

    def save(*) #:nodoc:
      rollback_active_record_state! do
        with_transaction_returning_status { super }
      end
    end

    def save!(*) #:nodoc:
      with_transaction_returning_status { super }
    end

    def touch(*) #:nodoc:
      with_transaction_returning_status { super }
    end

    def with_transaction_returning_status
      status = nil
      self.class.transaction do
        add_to_transaction
        begin
          status = yield
        rescue ActiveRecord::Rollback
          clear_transaction_record_state
          status = nil
        end

        raise ActiveRecord::Rollback unless status
      end
      status
    ensure
      if @transaction_state && @transaction_state.committed?
        clear_transaction_record_state
      end
    end

    def add_to_transaction
      if has_transactional_callbacks?
        self.class.connection.add_transaction_record(self)
      else
        sync_with_transaction_state
        set_transaction_state(self.class.connection.transaction_state)
      end
      remember_transaction_record_state
    end

    private

      def has_transactional_callbacks?
        !_rollback_callbacks.empty? || !_commit_callbacks.empty? || !_before_commit_callbacks.empty?
      end

上記のコードを見てもらうとなんとなくわかりますが、save のようなタイミングで #with_transaction_returning_status が呼ばれ、そこで #add_to_transaction が呼ばれます。
これは #has_transactional_callbacks? が true だった場合に self.class.connection.add_transaction_record(self) を呼び出すのですが、この #has_transactional_callbacks?after_commit のような実際の DB transaction に関連する callback が設定されている時に true となります。

コード

module ActiveRecord
  module ConnectionAdapters # :nodoc:
    module DatabaseStatements

      def add_transaction_record(record)
        current_transaction.add_record(record)
      end

コード

module ActiveRecord
  module ConnectionAdapters

    class Transaction #:nodoc:
      attr_reader :connection, :state, :records, :savepoint_name
      attr_writer :joinable

      def initialize(connection, options, run_commit_callbacks: false)
        @connection = connection
        @state = TransactionState.new
        @records = []
        @joinable = options.fetch(:joinable, true)
        @run_commit_callbacks = run_commit_callbacks
      end

      def add_record(record)
        records << record
      end

      def commit_records
        ite = records.uniq
        while record = ite.shift
          if @run_commit_callbacks
            record.committed!
          else
            # if not running callbacks, only adds the record to the parent transaction
            record.add_to_transaction
          end
        end
      ensure
        ite.each { |i| i.committed!(should_run_callbacks: false) }
      end

二つのファイルのコードを抜粋しましたが、最終的に Transaction@records に追加されていっているのが分かります。
そして、ここで最後に紹介した gem が override していた #commit_records が登場するのが分かります。

コード

module ActiveRecord
  module ConnectionAdapters
    class TransactionManager #:nodoc:
      def initialize(connection)
        @stack = []
        @connection = connection
      end


      def begin_transaction(options = {})
        # 略
      end


      def commit_transaction
        @connection.lock.synchronize do
          transaction = @stack.last

          begin
            transaction.before_commit_records
          ensure
            @stack.pop
          end

          transaction.commit

          # 🏁 ここで commit_records
          transaction.commit_records
        end
      end


      def rollback_transaction(transaction = nil)
        @connection.lock.synchronize do
          transaction ||= @stack.pop
          transaction.rollback
          transaction.rollback_records
        end
      end


      def within_new_transaction(options = {})
        @connection.lock.synchronize do
          begin
            transaction = begin_transaction options
            yield
          rescue Exception => error
            if transaction
              rollback_transaction
              after_failure_actions(transaction, error)
            end
            raise
          ensure
            unless error
              if Thread.current.status == "aborting"
                rollback_transaction if transaction
              else
                begin
                  # 🏁 ここで commit_transaction
                  commit_transaction
                rescue Exception
                  rollback_transaction(transaction) unless transaction.state.completed?
                  raise
                end
              end
            end
          end
        end
      end

最後にざくっと TransactionManager のコードを貼ってしまうのですが、transaction が正常に終了した場合に #commit_records が呼び出されていることが分かります。

ダミーの record を登録し commit_records に呼び出させる

class AsyncRecord
  def initialize(*args)
    @args = args
  end

  def has_transactional_callbacks?
    true
  end

  def committed!(*_, **__)
    do_after_commit
  end

  def rolledback!(*_, **__)
    # after_rollback 相当の処理
  end

  def do_after_commit
    # after_commit の処理
  end
end
def perform_after_transaction(*args)
  async_record = AsyncRecord.new(*args)
  if ActiveRecord::Base.connection.transaction_open?
    ActiveRecord::Base.
      connection.
      current_transaction.
      add_record(async_record)
  else
    async_record.do_after_commit
  end
end

record が反応するべきメソッドだけ作っておき、後はそれを transaction に直接 add_record するだけです。

これは、拡張はしていないものの、内部の実装に依存していますので gem を使うのとどちらがいいとは一概には言えません。

最後に

今回はバックグラウンドジョブと DB transaction の間での注意点について書きました。
自分たちだけかもしれませんが、Rails を使っていると transaction の境界をどこにするかが「ここ!」と決めにくいです。(Service 層等は使っていないので)

加えて、Sidekiq は非常に高速にジョブを持っていくため、容易に問題が発生します。複雑にならない程度に今回紹介したような transaction の外で実行が保証される方法を使ってみるのも良いかもしれません。

参考

26
22
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
26
22