同時に処理が走ってほしくないメソッド
def foo
AggregateTablesService.new.call # 数十万行のレコードの集計を行い、別のテーブルに集計を保存するサービス
@something = CreateGraph.new.call # 集計結果からグラフを生み出しキャッシュに保存するサービス。キャッシュがあればそれを読み込む
end
def exec
AggregateTablesService.new.call
CreateGraph.new.call
end
大量かつ複雑なレコードを扱うにあたって、1)一旦それらを扱いやすいように&処理を高速化するために集計テーブルにまとめる、2)その集計データを使ってグラフを作るという処理を前提におきます。
まず、何度もメソッドが走ることに対しては、「レコードやキャッシュがあればreturnで処理を抜ける」という方法で対策することができるでしょう。
class AggregateTablesService
def call
return if SummaryTable.where().present? # もしすでに集計されていたら処理を抜ける
# ~~~具体的な処理〜〜〜
end
end
class CreateGraph
def call
cache = Rails.cache.read(some_key)
return cache if cache.present?
# ~~~具体的な処理〜〜〜
graph # 最後にグラフオブジェクトを返す
end
end
ただ、この状態では同時にリクエストが送られてきたときに1)最初の集計が重複して保存されたり、2)最初の集計が中途半端な状態でグラフを作ってしまったりといった問題が発生します。
そこで、コントローラーとバッチ両者で重複して処理が走らないようにガードをかけてみようというのがこの記事の趣旨です。
状態を管理するオブジェクトを作る
方針として、「処理が実行中であれば何らかの対策をする」ということなので、今この瞬間にどこかの誰か(バッチ含めて)がプログラムを実行している最中なのかを管理するオブジェクトを用意してみます。
class ProgressState
def start
redis.write('in_progress', '')
end
def finish
redis.delete('in_progress')
end
def running?
redis.exists?('in_progress')
end
private
def redis
Rails.cache
end
これで、処理をスタートするときにredisに適当なキャッシュを保存し、毎回処理を実行する前にredisにキャッシュがあるか確認すれば、処理が同時に走ることを防げそうです。コントローラーとバッチに実装してみます。
補足:
Stateと銘打ってますが、デザインパターンにおけるStateパターンとは違うもので、役割的にはほぼリポジトリのようなことをやっています。ただ、このオブジェクトの役目は「データの読み書き」ではなくあくまでプログラムの実行状態を管理することなので、ProgressStateとしてみました(他に良い命名案があればコメントください。)
バッチ:処理を抜ける
def exec
progress_state = ProgressState.new
return if progress_state.running?
progress_state.start
AggregateTablesService.new.call
CreateGraph.new.call
progress_state.finish
end
バッチでは、「別の処理が実行中である」ということはもはや集計やグラフのキャッシュ作成が行われているということなので、単純に処理を抜けてあげます。
コントローラー:処理をスリープさせる
def foo
progress_state = ProgressState.new
SleepProgressService.new(progress_state).call
progress_state.start
AggregateTablesService.new.call
@something = CreateGraph.new.call
progress_state.finish
end
コントローラー側では処理を抜けたいのではなくて、重複処理を防ぎながらも最終的にグラフを画面に渡したいため、「他で処理中の場合は待つ」という方法を取ってみます。
class SleepProgressService
SLEEP_SEC = 10
RETRY_COUNT = 24
def initialize(progress_state)
@progress_state = progress_state
end
def call
count = 0
loop do
if @progress_state.running?
break if count > RETRY_COUNT
count += 1
Rails.logger.info("主要指標:別の集計処理が実行中のためスリープしました。スリープ回数:#{count}回目")
sleep SLEEP_SEC
else
break
end
end
end
end
問題:キャッシュを一度読み込むとredis側でデータを削除してもRailsが「キャッシュは存在する」と言い続ける…
上記のコードで実際に同時にプログラムを走らせてみましょう。コントローラーを2回同時に叩いてみます。すると、片方の画面では集計とグラフ作成が終了して画面が表示されており、redisからもin_progress
のキャッシュはfinishメソッドで確かに消されているにも関わらず、もう片方の画面はsleepしたままになってしまいます。
ログを取って確認してみると、redis側のキャッシュは消えているにも関わらず@progress_state.running?
が24回trueを返し続けるという現象が発生していました。
おそらくRails側のredis_cacheの設定の問題だと思うのですが、Rails.cacheを使わずに自前でRedis.newすることでこの問題を回避できました。
class ProgressState
def initialize
Rails.configuration.cache_store[1]
redis_host = config[:host]
redis_port = config[:port]
redis_db = config[:db]
@redis = Redis.new(host: redis_host, port: redis_port, db: redis_db)
end
def start
@redis.set('in_progress', '')
end
def finish
@redis.del('in_progress')
end
def running?
@redis.exists('in_progress') == 1 # exists()が0/1のintで返却されるため
end
特筆することはあまりないですが、あえて云えば最初からオブジェクトとしてProgressStateを定義していたことで、変更箇所がこのオブジェクトのみで済んだことでしょうか。
前述のSleepProgressServiceも、使いまわしたいからコントローラーから切り離したのではなく、コントローラーの責任領域をできる限り狭め、「処理が開始するのを待つ」ということについてはこのサービスの責任として隔離したかったという意図があります。
最後に
実際のコードでは日付や各種IDで別々に実行状態を管理したりともう少し複雑な実装になっていますが、根幹の機能は上記の内容で満たせているはずです。
それでは、最後までお付き合いいただきありがとうございました。