はじめに
弊社では、平均500rpm、高い時間帯だと1000rpm程度のジョブをDelayedJobを利用して処理しているサービスがあります。
最近、Amazon Aurora バージョン2(MySQL5.7互換)から3(MySQL8.0互換)にあげてから、パフォーマンスが一部劣化してしまい、その対応策について検討していました。
その際、MySQL8.0系から利用できるようになった"Skip Locked"の機能を利用できればパフォーマンス改善につながるかと思い、いろいろ調べていましたが、DelayedJobではそのような実装はないようで、代替ライブラリを探していたらSolidQueueに辿り着きました。
Sidekiqは揮発性のメモリを利用するため重要なジョブを処理するのに少し抵抗感がありましたが、SolidQueueはRDBを利用しながら高速に動作するとのことて、その仕組みが気になり調べてみたので記載していこうと思います。
SolidQueue概要
https://github.com/rails/solid_queue?tab=readme-ov-file
ChatGPTさんに翻訳してもらったもの
Solid Queueは、Active Job向けに設計されたシンプルかつ高性能なDBベースのキューイングバックエンドです。
通常のジョブのエンキューおよび処理に加えて、Solid Queueは以下の機能をサポートしています:遅延ジョブ、同時実行制御、定期ジョブ、キューの一時停止、ジョブごとの数値優先度、キュー順による優先度設定、バルクエンキュー(Active Jobのperform_all_laterに対応したenqueue_all)。
Solid Queueは、MySQL、PostgreSQL、SQLiteなどのSQLデータベースで使用可能です。また、利用可能な場合には、FOR UPDATE SKIP LOCKED句を活用し、ジョブポーリング時にロックのブロックや待機を回避します。リトライ、破棄、エラー処理、シリアライゼーション、遅延についてはActive Jobに依存しており、Ruby on Railsのマルチスレッド機能とも互換性があります。
Rails8からこのSolidQueueはデフォルトで採用されているようですし、SKIP LOCKED句を利用しているということで高速に動作しそうな気がします。
もう少しDelayedJobとの実装の違いを細かく見ていきましょう。
DelayedJobとの比較
1. 複数のアクターが存在する
DelayedJobはジョブを処理するワーカが1種類あるだけでしたが、SolidQueueには複数の役割が存在していました。
-
ワーカー (Workers)
実行可能なジョブをキューから取り出し、処理を行う。
solid_queue_ready_executions テーブルを基に動作する。 -
ディスパッチャー (Dispatchers)
実行時刻が到来したジョブを選択し、それを solid_queue_scheduled_executions テーブルから solid_queue_ready_executions テーブルに移動します。これにより、ワーカーがジョブを取得可能になる。
並行性制御に関連するメンテナンス作業を行う。
・ブロックされたジョブのアンブロック
・セマフォのクリーンアップ
・ハートビートが失効したプロセスのジョブを解放 -
スケジューラー (Scheduler)
定期タスクを管理し、実行時刻にジョブをエンキューする。 -
スーパーバイザー (Supervisor)
ワーカーやディスパッチャーを設定に従って起動・停止させる。
ハートビート(状態確認)を監視し、プロセスの再起動などを行う。
各ワーカー、ディスパッチャー、スケジューラーごとに独立したプロセスをフォークする。
2. 利用するテーブルが多い
DelayedJobはジョブの実行に必要な情報やジョブの状態、エラー情報を一つのテーブルですべて持っていましたが、SolidQueueは11個のテーブルが存在していました。
定期実行ジョブの管理に利用するテーブル等もあるため、11個すべてが直接ジョブの実行処理に直接関与しているわけではありませんが、ジョブの状態をカラムでなくテーブルで管理するという大きな違いがありました。
-
solid_queue_semaphores
- 用途: 同時実行制御(Concurrency Control)のためのセマフォを管理するテーブル
- 機能: 特定のジョブやジョブグループに対して同時に実行可能なジョブの数を制御
セマフォが解放されるまで新しいジョブの実行を待機(ブロック)させる
-
solid_queue_scheduled_executions
- 用途: 将来実行予定のジョブをスケジュール管理するテーブル
- 機能:実行時刻が到来していないジョブを格納する
スケジューラー(Dispatcher)によってsolid_queue_ready_executionsに移動されることで、ワーカーが取得可能になる
-
solid_queue_recurring_tasks
- 用途: 定期的なタスク(Recurring Tasks)の設定を管理するテーブル
- 機能: 定期実行タスクのキー、ジョブクラス、スケジュール、引数などを保存する
例: 毎日8時に実行するタスクの設定
-
solid_queue_recurring_executions
- 用途: 定期タスクの実行履歴を管理するテーブル
- 機能: 各定期タスクの実行時刻を記録し、タスクが重複して実行されるのを防ぐ
重複防止には、task_keyとrun_atの一意制約が使用される
-
solid_queue_ready_executions
- 用途: 実行可能な状態のジョブを管理するテーブル
- 機能: 実行時刻が到来し、ワーカーが取得可能なジョブを保存する
ワーカーはこのテーブルをポーリングしてジョブを取得し、実行する
-
solid_queue_processes
- 用途: ワーカーやスケジューラーなどのプロセスを監視するためのテーブル
- 機能: 各プロセスのハートビート(生存確認)情報を記録する
ハートビートが一定時間更新されない場合、そのプロセスが停止したと見なされる
-
solid_queue_pauses
- 用途: キューやタスクの一時停止状態を管理するテーブル
- 機能: 特定のキューやジョブの実行を一時停止する情報を保存する
管理者がタスクを手動で停止・再開できるようにする
-
solid_queue_jobs
- 用途: 全ジョブのメタデータを管理するメインテーブル
- 機能: 各ジョブのID、キュー名、優先度、ステータスなどを記録する
実行中、待機中、完了したジョブが含まれる
-
solid_queue_failed_executions
- 用途: 失敗したジョブの履歴を管理するテーブル
- 機能: 実行に失敗したジョブの詳細(エラーメッセージ、スタックトレースなど)を記録する
管理者が失敗したジョブを再エンキューまたは破棄するために使用する
-
solid_queue_blocked_executions
- 用途: ブロックされたジョブ(実行待ちの状態)を管理するテーブル
- 機能: セマフォや並行実行制御によってブロックされているジョブを保存する
ジョブがアンブロックされると、solid_queue_ready_executionsに移動する
-
solid_queue_claimed_executions
- 用途: ワーカーによって「取得済み(Claimed)」のジョブを管理するテーブル
- 機能: ワーカーが処理中のジョブを記録する
ワーカーが異常終了した場合、この情報を元にジョブの再実行が可能
テーブルの関係と流れは下記のようになります。
- 即時実行ジョブを実行した時のディスパッチャーの動き(
Job.perform_later
)- solid_queue_jobsにジョブ情報が登録される
INSERT INTO `solid_queue_jobs` (`queue_name`, `class_name`, `arguments`, `priority`, `active_job_id`, `scheduled_at`, `finished_at`, `concurrency_key`, `created_at`, `updated_at`) VALUES ('default', 'ExampleJob', '{\"job_class\":\"ExampleJob\",\"job_id\":\"9beb19bf-87e6-4f2e-9fc4-c62555616926\",\"provider_job_id\":null,\"queue_name\":\"default\",\"priority\":null,\"arguments\":[\"Hello\",\"World\"],\"executions\":0,\"exception_executions\":{},\"locale\":\"en\",\"timezone\":\"UTC\",\"enqueued_at\":\"2024-12-26T02:33:58.419603000Z\",\"scheduled_at\":\"2024-12-26T02:33:58.414628000Z\"}', 0, '9beb19bf-87e6-4f2e-9fc4-c62555616926', '2024-12-26 02:33:58.414628', NULL, NULL, '2024-12-26 02:33:58.531770', '2024-12-26 02:33:58.531770')
- 登録したjob_idの取得
SELECT `solid_queue_jobs`.* FROM `solid_queue_jobs` WHERE `solid_queue_jobs`.`id` = 1 LIMIT 1
- ワーカーが取れるようにsolid_queue_ready_executionsにデータ登録
INSERT INTO `solid_queue_ready_executions` (`job_id`, `queue_name`, `priority`, `created_at`) VALUES (1, 'default', 0, '2024-12-26 02:33:58.613826')
- solid_queue_jobsにジョブ情報が登録される
- 遅延実行ジョブを実行した時のディスパッチャーの動き(
Job.set(wait: 5.minutes).perform_later
)- scheduled_executionsに登録
INSERT INTO `solid_queue_scheduled_executions` (`job_id`, `queue_name`, `priority`, `scheduled_at`, `created_at`) VALUES (5, 'default', 0, '2024-12-27 01:35:25.352280', '2024-12-27 01:30:25.427500')
- 実行すべきジョブがないかscheduled_executionsテーブルを定期的に確認
SELECT `solid_queue_scheduled_executions`.`job_id` FROM `solid_queue_scheduled_executions` WHERE `solid_queue_scheduled_executions`.`scheduled_at` <= '2024-12-27 01:35:55.242173' ORDER BY `solid_queue_scheduled_executions`.`scheduled_at` ASC, `solid_queue_scheduled_executions`.`priority` ASC, `solid_queue_scheduled_executions`.`job_id` ASC LIMIT 500 FOR UPDATE SKIP LOCKED
- あれば、ready_executionsへ移動
INSERT INTO `solid_queue_ready_executions` (`queue_name`,`priority`,`job_id`,`created_at`) VALUES ('default', 0, 5, CURRENT_TIMESTAMP(6)) AS `solid_queue_ready_executions_values` ON DUPLICATE KEY UPDATE `queue_name`=`solid_queue_ready_executions`.`queue_name`
- scheduled_executionsテーブルからは削除
DELETE FROM `solid_queue_scheduled_executions` WHERE `solid_queue_scheduled_executions`.`id` = 1
- scheduled_executionsに登録
- ワーカーがジョブを取得して処理する
- readyテーブルにレコードがあるかどうかの存在確認
SELECT 1 AS one FROM `solid_queue_ready_executions` LIMIT 1 FOR UPDATE SKIP LOCKED
- 存在していれば、指定したスレッド分取得する(LIMIT句で指定されている部分)
SELECT `solid_queue_ready_executions`.`id`, `solid_queue_ready_executions`.`job_id` FROM `solid_queue_ready_executions` ORDER BY `solid_queue_ready_executions`.`priority` ASC, `solid_queue_ready_executions`.`job_id` ASC LIMIT 3 FOR UPDATE SKIP LOCKED
- 処理中を表すclaimedテーブルにインサート
INTO `solid_queue_claimed_executions` (`job_id`,`process_id`,`created_at`) VALUES (3, 33, CURRENT_TIMESTAMP(6)) AS `solid_queue_claimed_executions_values`
- 取得したジョブをreadyから削除
DELETE FROM `solid_queue_ready_executions` WHERE `solid_queue_ready_executions`.`id` = 3
- ジョブが正常に終了するとsolid_queue_jobsの終了日時を更新
※ジョブの処理が失敗した場合、この操作は行われず、失敗したジョブを記録するfailed_executionsテーブルにインサートする処理が行われます
UPDATE `solid_queue_jobs` SET `solid_queue_jobs`.`updated_at` = '2024-12-26 08:58:41.584331', `solid_queue_jobs`.`finished_at` = '2024-12-26 08:58:41.584331' WHERE `solid_queue_jobs`.`id` = 3
INSERT INTO `solid_queue_failed_executions` (`job_id`, `error`, `created_at`) VALUES (4, '{\"exception_class\":\"StandardError\",\"message\":...backtrace長いので割愛
- claimedテーブルから削除
DELETE FROM `solid_queue_claimed_executions` WHERE `solid_queue_claimed_executions`.`id` = 3
- readyテーブルにレコードがあるかどうかの存在確認
総括
実はDelayedJobを利用している中で、パフォーマンス以外でもいくつかの解決できていない課題も存在していました。それらを含めてSolidQueueだと一挙に解決できそうな気がしています。
- DeadLockが発生するとワーカーが勝手に停止する → 現在は、数分おきにワーカーの死活監視を行い、停止していたら再度立ち上げを行うスクリプトで対処している
- ジョブが異常終了してロックしっぱなしで終了してしまったものが他のワーカーの処理対象にならない → 現在は、定期的にロックしっぱなしのものがないかチェックし、ロック解除を行うスクリプトで対処している)
実際に現実での業務効率化と同じように、狭い場所で複数人数で作業する環境(DelayedJob)を、タスクをうまく分割しながら作業する場所も分け、一人一人がストレスなく作業できる環境(SolidQueueu)にして効率化を行なっている点が面白いなぁと思いました。
今回、実装上メリットが大きくありそうなことは理解できたので、次回は実際に動かしてみて、DelayedJobやSidekiqとのパフォーマンス比較もしてみたいと思います。