1
0

More than 3 years have passed since last update.

DBのトランザクションがコミットされる前に非同期ジョブを実行してしまっていた、、、

Posted at

はじめに

DBの更新処理が完了したら、非同期ジョブを実行するようにしていたが、更新前の状態で非同期ジョブが実行されていた、、

ここでは例として、下記要件があるとする

  • メッセージが送られたら、ユーザーアプリへ非同期ジョブにてプッシュ通知を送る
  • 全てのメッセージ送信に対して、ログテーブルで履歴を残す
  • 特定のメッセージの送信が完了したら、ポイントがつく

動作環境

  • ActiveJob
  • Sidekiq
  • Redis

更新処理の実装

class Message < ApplicationRecord
  enum status: {
    prepare: 0, # 下書き
    complete: 1 # 送信完了
  }

  def complete_message!
    transaction do
      complete!
      update_message_log! # メッセージログテーブルを更新する処理
    end
    MessagePushJob.perform_later(self) # 非同期ジョブにキューする
  end

  def send_message_with_point!
    transaction do
      message_complete!
      give_point! # 送信者にポイントを与える
    end
  end
end

(give_point!もcomplete_message!メソッドに書いてないのは、ポイントがつかないメッセージ送信にcomplete_message!が使われているからとする)

ジョブの実装

class MessagePushJob < ApplicationJob
  queue_as :message_push_notice

  def perform(message)
    # messageのステータスがcompleteの状態だったら
    if message.complete?
      # 受信者にpush通知を送る
    end
  end
end

ここでポイントなのが、引数のmessageのstatusの値が何になるかということ!
想定では、completeステータスになっていることを期待していたが、なんとprepareステータスとなっていた、、
なので、後述の処理が実行されてなかった!!😢

引数でcompleteステータスのインスタンスを渡しているのになんで!?

ジョブの引数を変換している処理が下記である
今回の場合は、argumentsにMessageのインスタンスが格納されている

# arguments = [ Messageのインスタンス ]
def serialize
  {
    "job_class"  => self.class.name,
    "job_id"     => job_id,
    "queue_name" => queue_name,
    "priority"   => priority,
    "arguments"  => serialize_arguments(arguments),
    "executions" => executions,
    "locale"     => I18n.locale.to_s
  }
end

serialize_argumentsの中を追っていくと

def serialize(arguments)
  arguments.map { |argument| serialize_argument(argument) }
end

def serialize_argument(argument)
  case argument
  when *PERMITTED_TYPES
    argument
  when GlobalID::Identification
    convert_to_global_id_hash(argument)
  when Array
    argument.map { |arg| serialize_argument(arg) }
  when ActiveSupport::HashWithIndifferentAccess
    serialize_indifferent_hash(argument)
  when Hash
    symbol_keys = argument.each_key.grep(Symbol).map!(&:to_s)
    aj_hash_key = if Hash.ruby2_keywords_hash?(argument)
      RUBY2_KEYWORDS_KEY
    else
      SYMBOL_KEYS_KEY
    end
    result = serialize_hash(argument)
    result[aj_hash_key] = symbol_keys
    result
  when -> (arg) { arg.respond_to?(:permitted?) }
    serialize_indifferent_hash(argument.to_h)
  else
    Serializers.serialize(argument)
  end
end

ActiveRecordのインスタンスの場合は、GlobalID::Identificationの分岐に入り、出力内容はGlobalIDのインスタンスが返却される(詳しくは後述)
よく見ると、テーブル名とIDしか渡されていない!💡

そもそもGlobalIDとは

モデルのインスタンスを一意に識別するためのURI
デフォルトでActiveRecordにインクルードされている

今回の場合は下記のような値となる

> Message.find(1).to_global_id
#<GlobalID:0x00007f19ccb9d178 @uri=#<URI::GID gid://test/Message/1>>

ジョブの実行時はGlobalIDインスタンスをどうしている?

ジョブが実行する前に、デシリアライズされている

def deserialize_argument(argument)
  case argument
  when String
    argument
  when *PERMITTED_TYPES
    argument
  when Array
    argument.map { |arg| deserialize_argument(arg) }
  when Hash
    if serialized_global_id?(argument)
      deserialize_global_id argument
    elsif custom_serialized?(argument)
      Serializers.deserialize(argument)
    else
      deserialize_hash(argument)
    end
  else
    raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
  end
end

def deserialize_global_id(hash)
  GlobalID::Locator.locate hash[GLOBALID_KEY]
end

def locate(gid, options = {})
  if gid = GlobalID.parse(gid)
    locator_for(gid).locate gid if find_allowed?(gid.model_class, options[:only])
  end
end

ちょっと省略するが、結局下記のコードにいきついて、findしている

def locate(gid)
  gid.model_class.find gid.model_id
end

修正を検討

問題を振り返る

問題のコードを再掲

def complete_message!
  transaction do
    complete!
    update_message_log! # メッセージログテーブルを更新する処理
  end
  MessagePushJob.perform_later(self) # 非同期ジョブにキューする
end

def send_message!
  transaction do
    message_complete!
    give_point! # 送信者にポイントを与える
  end
end
  1. send_message!を実行
  2. complete!メソッドでmessageステータスをcompleteステータスに変更(この時点ではDBに変更が反映されていない)
  3. update_message_log!でログテーブルを更新
  4. MessagePushJob.perform_laterで非同期ジョブをキューする
  5. 非同期ジョブが実行される(この時点でも、まだDBに変更が反映されていない
  6. give_point!で送信者のポイントを更新
  7. トランザクションが完了したので、ここでDBに変更が反映される

ということで、DBの更新前に非同期ジョブが実行されてしまっていることが問題となっている

修正案 2案

案1 send_message!のトランザクションを外す

仕様次第でばあるが、ポイントの更新が完了しないとメッセージが送れないというのはよくないので、トランザクションをはずす
(ポイントの処理に問題が出れば、ログテーブルから再付与すればいいだろう)
ちなみに、実際はステータス更新ともっと関係ない処理にトランザクションが貼られていたので、こちらの案1を採用した

def complete_message!
  transaction do
    complete!
    update_message_log! # メッセージログテーブルを更新する処理
  end
  MessagePushJob.perform_later(self) # 非同期ジョブにキューする
end

def send_message!
  message_complete!
  give_point! # 送信者にポイントを与える
end
案2 send_message!に処理をまとめる

ポイントの更新も完了して、DBに変更が更新されてから、非同期ジョブをキューするようにする
ちなみに、complete_message!は他の箇所で使用しているので消せないので、重複したコードになってしまう

def complete_message!
  transaction do
    complete!
    update_message_log! # メッセージログテーブルを更新する処理
  end
  MessagePushJob.perform_later(self) # 非同期ジョブにキューする
end

def send_message!
  transaction do
    complete!
    update_message_log! # メッセージログテーブルを更新する処理
    give_point! # 送信者にポイントを与える
  end
  MessagePushJob.perform_later(self) # 非同期ジョブにキューする
end

まとめ

インスタンスを渡しているはずが、インスタンスのIDだけ渡しているっていうのが、暗黙的な仕様で初見殺しだなと思った(当たり前の人にとっては当たり前なのかな?)
ジョブ実行時に明示的にfindする必要はあるけど、明示的に引数にはインスタンスではなく、IDを渡すように変更してもいいのかもしれない

1
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
1
0