はじめに
DBの更新処理が完了したら、非同期ジョブを実行するようにしていたが、更新前の状態で非同期ジョブが実行されていた、、
ここでは例として、下記要件があるとする
- メッセージが送られたら、ユーザーアプリへ非同期ジョブにてプッシュ通知を送る
- 全てのメッセージ送信に対して、ログテーブルで履歴を残す
- 特定のメッセージの送信が完了したら、ポイントがつく
動作環境
- ActiveJob
- Sidekiq
- Redis
更新処理の実装
class Message < ApplicationRecord
enum status: {
prepare: 0, # 下書き
complete: 1 # 送信完了
}
def complete_message!
transaction do
complete!
update_message_log! # メッセージログテーブルを更新する処理
end
MessagePushJob.perform_later(self) # 非同期ジョブにキューする
end
def send_message_with_point!
transaction do
message_complete!
give_point! # 送信者にポイントを与える
end
end
end
(give_point!もcomplete_message!メソッドに書いてないのは、ポイントがつかないメッセージ送信にcomplete_message!が使われているからとする)
ジョブの実装
class MessagePushJob < ApplicationJob
queue_as :message_push_notice
def perform(message)
# messageのステータスがcompleteの状態だったら
if message.complete?
# 受信者にpush通知を送る
end
end
end
ここでポイントなのが、引数のmessageのstatusの値が何になるかということ!
想定では、completeステータスになっていることを期待していたが、なんとprepareステータスとなっていた、、
なので、後述の処理が実行されてなかった!!😢
引数でcompleteステータスのインスタンスを渡しているのになんで!?
ジョブの引数を変換している処理が下記である
今回の場合は、argumentsにMessageのインスタンスが格納されている
# arguments = [ Messageのインスタンス ]
def serialize
{
"job_class" => self.class.name,
"job_id" => job_id,
"queue_name" => queue_name,
"priority" => priority,
"arguments" => serialize_arguments(arguments),
"executions" => executions,
"locale" => I18n.locale.to_s
}
end
serialize_argumentsの中を追っていくと
def serialize(arguments)
arguments.map { |argument| serialize_argument(argument) }
end
def serialize_argument(argument)
case argument
when *PERMITTED_TYPES
argument
when GlobalID::Identification
convert_to_global_id_hash(argument)
when Array
argument.map { |arg| serialize_argument(arg) }
when ActiveSupport::HashWithIndifferentAccess
serialize_indifferent_hash(argument)
when Hash
symbol_keys = argument.each_key.grep(Symbol).map!(&:to_s)
aj_hash_key = if Hash.ruby2_keywords_hash?(argument)
RUBY2_KEYWORDS_KEY
else
SYMBOL_KEYS_KEY
end
result = serialize_hash(argument)
result[aj_hash_key] = symbol_keys
result
when -> (arg) { arg.respond_to?(:permitted?) }
serialize_indifferent_hash(argument.to_h)
else
Serializers.serialize(argument)
end
end
ActiveRecordのインスタンスの場合は、GlobalID::Identificationの分岐に入り、出力内容はGlobalIDのインスタンスが返却される(詳しくは後述)
よく見ると、テーブル名とIDしか渡されていない!💡
そもそもGlobalIDとは
モデルのインスタンスを一意に識別するためのURI
デフォルトでActiveRecordにインクルードされている
今回の場合は下記のような値となる
> Message.find(1).to_global_id
#<GlobalID:0x00007f19ccb9d178 @uri=#<URI::GID gid://test/Message/1>>
ジョブの実行時はGlobalIDインスタンスをどうしている?
ジョブが実行する前に、デシリアライズされている
def deserialize_argument(argument)
case argument
when String
argument
when *PERMITTED_TYPES
argument
when Array
argument.map { |arg| deserialize_argument(arg) }
when Hash
if serialized_global_id?(argument)
deserialize_global_id argument
elsif custom_serialized?(argument)
Serializers.deserialize(argument)
else
deserialize_hash(argument)
end
else
raise ArgumentError, "Can only deserialize primitive arguments: #{argument.inspect}"
end
end
def deserialize_global_id(hash)
GlobalID::Locator.locate hash[GLOBALID_KEY]
end
def locate(gid, options = {})
if gid = GlobalID.parse(gid)
locator_for(gid).locate gid if find_allowed?(gid.model_class, options[:only])
end
end
ちょっと省略するが、結局下記のコードにいきついて、findしている
def locate(gid)
gid.model_class.find gid.model_id
end
修正を検討
問題を振り返る
問題のコードを再掲
def complete_message!
transaction do
complete!
update_message_log! # メッセージログテーブルを更新する処理
end
MessagePushJob.perform_later(self) # 非同期ジョブにキューする
end
def send_message!
transaction do
message_complete!
give_point! # 送信者にポイントを与える
end
end
- send_message!を実行
- complete!メソッドでmessageステータスをcompleteステータスに変更(この時点ではDBに変更が反映されていない)
- update_message_log!でログテーブルを更新
- MessagePushJob.perform_laterで非同期ジョブをキューする
- 非同期ジョブが実行される(この時点でも、まだDBに変更が反映されていない)
- give_point!で送信者のポイントを更新
- トランザクションが完了したので、ここでDBに変更が反映される
ということで、DBの更新前に非同期ジョブが実行されてしまっていることが問題となっている
修正案 2案
案1 send_message!のトランザクションを外す
仕様次第でばあるが、ポイントの更新が完了しないとメッセージが送れないというのはよくないので、トランザクションをはずす
(ポイントの処理に問題が出れば、ログテーブルから再付与すればいいだろう)
ちなみに、実際はステータス更新ともっと関係ない処理にトランザクションが貼られていたので、こちらの案1を採用した
def complete_message!
transaction do
complete!
update_message_log! # メッセージログテーブルを更新する処理
end
MessagePushJob.perform_later(self) # 非同期ジョブにキューする
end
def send_message!
message_complete!
give_point! # 送信者にポイントを与える
end
案2 send_message!に処理をまとめる
ポイントの更新も完了して、DBに変更が更新されてから、非同期ジョブをキューするようにする
ちなみに、complete_message!は他の箇所で使用しているので消せないので、重複したコードになってしまう
def complete_message!
transaction do
complete!
update_message_log! # メッセージログテーブルを更新する処理
end
MessagePushJob.perform_later(self) # 非同期ジョブにキューする
end
def send_message!
transaction do
complete!
update_message_log! # メッセージログテーブルを更新する処理
give_point! # 送信者にポイントを与える
end
MessagePushJob.perform_later(self) # 非同期ジョブにキューする
end
まとめ
インスタンスを渡しているはずが、インスタンスのIDだけ渡しているっていうのが、暗黙的な仕様で初見殺しだなと思った(当たり前の人にとっては当たり前なのかな?)
ジョブ実行時に明示的にfindする必要はあるけど、明示的に引数にはインスタンスではなく、IDを渡すように変更してもいいのかもしれない