delayed_job

PostgreSQL バックエンドの delayed_job でワーカーが一度に複数のジョブを予約する問題

※ ほぼメンテが止まっている delayed_job を使っている人は、もう、あまりいない気がする。

問題

collectiveidea/delayed_job_active_record の GitHub Issue で報告されている問題について。

通常、delayed_job のワーカーは、一度に一つのジョブを予約し、そのジョブを実行し終えたら改めて一つのジョブを予約して、というループになっている。ところが、PostgreSQL バックエンドで実行される SQL によって、わりとよく一度に複数のジョブが予約される事があるという問題が確認されている。UPDATE 文でのサブクエリの使い方が問題で、PostgreSQL のプランナーが意図しない結果をもたらす、ということらしい。
(参考: sql - Postgres UPDATE ... LIMIT 1 - Database Administrators Stack Exchange)

影響

ごく短時間で完了できるジョブしか実行していないという場合はほぼ問題にならないが、数分や数十分かかるジョブを実行している場合に問題になりかねない。

複数のワーカーを動かしていても、このバグによって複数のジョブが一度に一つのワーカーに関連づけられてしまう事がある。重いジョブを含む複数のジョブが一つのワーカーに割り当てられた場合、空いているワーカーがあるにも関わらず、重いジョブが完了するまで他のジョブの開始が待たされてしまう。

対策(1) (2017/12/21 追記: この方法は別の問題を発生させる)

Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy というオプションのデフォルト値 :optimized_sql で実行される SQL が問題なので、これを使わずに済ませるという対策がひとつ。

Delayed::Backend::ActiveRecord.configuration.reserve_sql_strategy = :default_sql

ただし、この場合は効率の悪い SQL が実行されるという点に注意する必要がある(SELECT -> UPDATE -> SELECT)。

# https://github.com/collectiveidea/delayed_job_active_record/blob/master/lib/delayed/backend/active_record.rb#L132-L138
def self.reserve_with_scope_using_default_sql(ready_scope, worker, now)
  # This is our old fashion, tried and true, but slower lookup
  ready_scope.limit(worker.read_ahead).detect do |job|
    count = ready_scope.where(id: job.id).update_all(locked_at: now, locked_by: worker.name)
    count == 1 && job.reload
  end
end

追記: 2017/12/21

default_sql の場合、異なるワーカーで同一のジョブを重複して実行してしまう問題が発生する事に気がついた。

このストラテジによって作られる SQL は以下の様な構造の UPDATE 文になる。

UPDATE {table}
   SET {field} = {value}
 WHERE {ID} IN (
   SELECT {ID} FROM {table} WHERE {field} IS NOT NULL
 ) AND {ID} = {ID_VALUE}
;

この構造の SQL が(ほぼ)同時に実行される事を考える。UPDATE とサブクエリの SELECT は アトミックではない1 ので、複数のセッションで SELECT 部分を(ほぼ)同時に評価した時点では各セッションで条件を満たせるため、UPDATE によってフィールドを更新する事ができる。

この挙動を delayed_job に置き換えると、「誰もロックしていないジョブだから私が予約します」という事が同時に複数のワーカーで発生する事になる。つまり、複数のワーカーが同一ジョブを実行してしまうという事象が発生する。

対策(2)

delayed_job のバックエンドクラスは交換可能なので、SQL を書きかえたクラスに交換するという方法もある(もしくはモンキーパッチ)。

optimized_sql の SQL は Delayed::Backend::ActiveRecord::Job.reserve_with_scope_using_optimized_sql で組み立てているので、これを書きかえれば良い。

# https://github.com/collectiveidea/delayed_job_active_record/blob/master/lib/delayed/backend/active_record.rb#L89-L101
def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
  case connection.adapter_name
  when "PostgreSQL", "PostGIS"
    # Custom SQL required for PostgreSQL because postgres does not support UPDATE...LIMIT
    # This locks the single record 'FOR UPDATE' in the subquery
    # http://www.postgresql.org/docs/9.0/static/sql-select.html#SQL-FOR-UPDATE-SHARE
    # Note: active_record would attempt to generate UPDATE...LIMIT like
    # SQL for Postgres if we use a .limit() filter, but it would not
    # use 'FOR UPDATE' and we would have many locking conflicts
    quoted_table_name = connection.quote_table_name(table_name)
    subquery_sql      = ready_scope.limit(1).lock(true).select("id").to_sql
    reserved          = find_by_sql(["UPDATE #{quoted_table_name} SET locked_at = ?, locked_by = ? WHERE id IN (#{subquery_sql}) RETURNING *", now, worker.name])
    reserved[0]
  ...

以下はイメージ(動作検証などまったくしてない)。

class PatchedDelayedJobBackend < Delayed::Backend::ActiveRecord::Job
  def self.reserve_with_scope_using_optimized_sql(ready_scope, worker, now)
    quoted_table_name = connection.quote_table_name(table_name)
    subquery_sql      = ready_scope.limit(1).lock(true).select("id").to_sql
    reserved          = find_by_sql(["WITH sub AS (#{subquery_sql}) UPDATE #{quoted_table_name} t SET locked_at = ?, locked_by = ? WHERE t.id = sub.id RETURNING *", now, worker.name])
    reserved[0]
  end
end

Delayed::Worker.backend = PatchedDelayedJobBackend

再現メモ

PostgreSQL やそのプランナーという部分について知識が十分でないので、現象と原因のざっくりとした把握しかできていないがメモを残しておく。

  • PostgreSQL 9.5.10, 9.6.6 (macOS 10.13.1)

先述の GitHub Issue からリンクされている SQL の例 を参考に、delayed_job っぽい擬似的なテーブルで再現させた SQL が以下。

drop table if exists test1;
create table test1 (
  id serial primary key
, priority int not null default 0
, run_at timestamp not null default clock_timestamp()
, lock boolean not null default false
);
create index idx_test1_priority_and_run_at on test1 (priority, run_at);

insert into test1 select;
vacuum analyze test1;
insert into test1 select from generate_series(1, 5);

explain (analyze)
update test1
   set lock = TRUE
 where
   id in (
     select
       id
     from test1
     where run_at <= current_timestamp
     order by priority asc, run_at asc
     limit 1 for update
   ) returning *
;

select * from test1;

この SQL を実行すると LIMIT 1 で一件だけ更新できるわけではなく WHERE に該当するレコード全てが対象となってしまう事がわかる。ちなみに vacuum analyze test1 を実行しない場合は期待通りの結果が得られる。

Timing is on.
DROP TABLE
Time: 9.158 ms
CREATE TABLE
Time: 3.777 ms
CREATE INDEX
Time: 1.308 ms
INSERT 0 1
Time: 0.817 ms
VACUUM
Time: 47.520 ms
INSERT 0 5
Time: 0.577 ms
                                                                 QUERY PLAN
--------------------------------------------------------------------------------------------------------------------------------------------
 Update on test1  (cost=1.02..2.07 rows=1 width=50) (actual time=0.074..0.097 rows=6 loops=1)
   ->  Nested Loop Semi Join  (cost=1.02..2.07 rows=1 width=50) (actual time=0.046..0.062 rows=6 loops=1)
         Join Filter: (test1.id = "ANY_subquery".id)
         ->  Seq Scan on test1  (cost=0.00..1.01 rows=1 width=22) (actual time=0.002..0.003 rows=6 loops=1)
         ->  Subquery Scan on "ANY_subquery"  (cost=1.02..1.05 rows=1 width=32) (actual time=0.009..0.009 rows=1 loops=6)
               ->  Limit  (cost=1.02..1.04 rows=1 width=22) (actual time=0.005..0.005 rows=1 loops=6)
                     ->  LockRows  (cost=1.02..1.04 rows=1 width=22) (actual time=0.005..0.005 rows=1 loops=6)
                           ->  Sort  (cost=1.02..1.03 rows=1 width=22) (actual time=0.003..0.003 rows=4 loops=6)
                                 Sort Key: test1_1.priority, test1_1.run_at
                                 Sort Method: quicksort  Memory: 25kB
                                 ->  Seq Scan on test1 test1_1  (cost=0.00..1.01 rows=1 width=22) (actual time=0.005..0.007 rows=6 loops=1)
                                       Filter: (run_at <= now())
 Planning time: 0.304 ms
 Execution time: 0.157 ms
(14 rows)

Time: 1.099 ms
 id | priority |           run_at           | lock
----+----------+----------------------------+------
  1 |        0 | 2017-12-05 23:13:40.322709 | t
  2 |        0 | 2017-12-05 23:13:40.371038 | t
  3 |        0 | 2017-12-05 23:13:40.371078 | t
  4 |        0 | 2017-12-05 23:13:40.371082 | t
  5 |        0 | 2017-12-05 23:13:40.371085 | t
  6 |        0 | 2017-12-05 23:13:40.371088 | t
(6 rows)

Time: 0.222 ms

これを WITH で書きかえた SQL が以下。

drop table if exists test2;
create table test2 (
  id serial primary key
, priority int not null default 0
, run_at timestamp not null default clock_timestamp()
, lock boolean not null default false
);
create index idx_test2_priority_and_run_at on test2 (priority, run_at);

insert into test2 select;
vacuum analyze test2;
insert into test2 select from generate_series(1, 5);

explain (analyze)
with sub as (
  select
    id
  from test2
  where run_at <= current_timestamp
  order by priority asc, run_at asc
  limit 1 for update
)
update test2
   set lock = TRUE
  from sub
 where test2.id = sub.id
returning *;

select * from test2;

期待した結果が得られた。

Timing is on.
DROP TABLE
Time: 7.918 ms
CREATE TABLE
Time: 2.696 ms
CREATE INDEX
Time: 0.981 ms
INSERT 0 1
Time: 0.747 ms
VACUUM
Time: 47.679 ms
INSERT 0 5
Time: 1.109 ms
                                                            QUERY PLAN
----------------------------------------------------------------------------------------------------------------------------------
 Update on test2  (cost=1.04..2.08 rows=1 width=54) (actual time=0.080..0.087 rows=1 loops=1)
   CTE sub
     ->  Limit  (cost=1.02..1.04 rows=1 width=22) (actual time=0.045..0.045 rows=1 loops=1)
           ->  LockRows  (cost=1.02..1.04 rows=1 width=22) (actual time=0.044..0.044 rows=1 loops=1)
                 ->  Sort  (cost=1.02..1.03 rows=1 width=22) (actual time=0.037..0.037 rows=1 loops=1)
                       Sort Key: test2_1.priority, test2_1.run_at
                       Sort Method: quicksort  Memory: 25kB
                       ->  Seq Scan on test2 test2_1  (cost=0.00..1.01 rows=1 width=22) (actual time=0.021..0.024 rows=6 loops=1)
                             Filter: (run_at <= now())
   ->  Nested Loop  (cost=0.00..1.04 rows=1 width=54) (actual time=0.071..0.078 rows=1 loops=1)
         Join Filter: (test2.id = sub.id)
         Rows Removed by Join Filter: 5
         ->  Seq Scan on test2  (cost=0.00..1.01 rows=1 width=22) (actual time=0.006..0.010 rows=6 loops=1)
         ->  CTE Scan on sub  (cost=0.00..0.02 rows=1 width=32) (actual time=0.011..0.011 rows=1 loops=6)
 Planning time: 0.598 ms
 Execution time: 0.207 ms
(16 rows)

Time: 2.182 ms
 id | priority |           run_at           | lock
----+----------+----------------------------+------
  2 |        0 | 2017-12-05 23:39:31.11098  | f
  3 |        0 | 2017-12-05 23:39:31.111145 | f
  4 |        0 | 2017-12-05 23:39:31.111157 | f
  5 |        0 | 2017-12-05 23:39:31.111163 | f
  6 |        0 | 2017-12-05 23:39:31.111169 | f
  1 |        0 | 2017-12-05 23:39:31.062273 | t
(6 rows)

Time: 0.195 ms

  1. 挙動から把握しただけなので、『アトミック』という表現や仕組みについては誤解しているかもしれない。