LoginSignup
21
14

More than 5 years have passed since last update.

Sidekiq 5 で変わった ActiveRecord のコネクション管理の方法と ActiveSupport::Executor/Reloader について

Posted at

Sidekiq 5 の変更点

Change.md - mperham/sidekiq
5.0-Upgrade.md - mperham/sidekiq

上記のどちらでも、Job Logging と Job Retry の変更点について真っ先に触れられています。

この変更点だけで言えば、おそらく直接的に影響があるのは、Job Logging と Job Retry が Sidekiq middleware として挿し込まれていたという前提で書かれているようなプラグイン系であり、普通に使っている分にはそれ程大きな影響は無いと思われます。

私自身も仕事で幾つかのプラグインと共に利用していますが、特に影響は出ていません。

この変更点の要因になっている内容が、この記事で触れることの大半です。

Sidekiq の PR から関連する変更点を見ていく

もちろん全てではありません、一連の流れが分かりそうな部分だけ選んでいます。

#2457 Enable code reloading in development mode with Rails 5

Until now Sidekiq has been required to eager load all application code at startup, since Rails reloading has never been thread-safe. This has changed in Rails 5! If the user is running Rails 5 in development, have Sidekiq leverage the new Interlock to safely reload app code.

上記は Description の最初の一文です。
Sidekiq は複数のスレッドを使ってキューイングされたジョブを実行していくのですが、実行する Worker クラスのロードをスレッド内で実行します。

sidekiqでRedisにキューイングされる内容
"{\"enqueued_at\":1494408578.771217,\"class\":\"SampleWorker\",\"queue\":\"default\",\"created_at\":1494408578.771217,\"args\":[123456,\"sample_args\"],\"jid\":\"73af9c9f8c5f453facdc0a1c\",\"retry\":1}"

具体的には、上記のような JSON が Redis にキューイングされ、各スレッドがそれを取得し処理していきます。
実行する Worker は class のキーで文字列保存されているため、 json['class'].constantize して取得されます。Sidekiq は startup で eager_load を実行するため複数スレッドで実行しても問題なく実行できます。

しかしリロード時には、一度アンロードされるため、それが複数スレッドで実行されると次のロード時に問題が起こる可能性があったのですが、Rails 5 で thread-safe なリロードの仕組みが入ったことで development モードで安全にコードの変更後のリロードができるようになったようです。

new Interlock

If the user is running Rails 5 in development, have Sidekiq leverage the new Interlock to safely reload app code.

先ほどの Description の後半で new Interlock とありますが、PR の初回のコードは以下のようになっていました。
https://github.com/mperham/sidekiq/pull/2457/commits/51347c5d97ebdadd9ec1914acc5198765ab8a31a#diff-35c2e9e447f43c397fce237a6732d8ceR43

lib/sidekiq/rails.rb
class Reloader
  def call
    ActiveSupport::Dependencies.interlock.running do
      begin
        ActionDispatch::Reloader.prepare! if do_reload_now = reload_dependencies?
        yield
      ensure
        ActionDispatch::Reloader.cleanup! if do_reload_now
      end
    end
  end

  private

  def reload_dependencies?
    @app.config.reload_classes_only_on_change != true || @app.reloaders.any?(&:updated?)
  end

ActiveSupport::Dependencies.interlock.running の部分は以下のようなものです。どうやら共有ロックを使っているらしい。
https://github.com/rails/rails/blob/v5.1.1/activesupport/lib/active_support/dependencies/interlock.rb#L38-L42

activesupport/lib/active_support/dependencies/interlock.rb
module ActiveSupport
  module Dependencies
    class Interlock
      def initialize # :nodoc:
        @lock = ActiveSupport::Concurrency::ShareLock.new
      end

      def running
        @lock.sharing do
          yield
        end
      end

関連する Interlock のコードや Reloader の変更はコメントによると以下の PR が主のようです。

最終的には Rails.application.reloader.wrap を使ったコードに書き直されていますね。

最終的なReloaderのコード
class Reloader
  class Reloader
    def initialize(app = ::Rails.application)
      Sidekiq.logger.debug "Enabling Rails 5+ live code reloading, so hot!" unless app.config.cache_classes
      @app = app
    end

    def call
      @app.reloader.wrap do
        yield
      end
    end

この時点の Processor#process のコードは以下のようなものです。
https://github.com/mperham/sidekiq/blob/1ab8883367598bf1ae50b53ca161969729ef6755/lib/sidekiq/processor.rb#L118-L152

module Sidekiq
  class Processor
    def process(work)
      jobstr = work.job
      queue = work.queue_name

      @reloader.call do
        ack = false
        begin
          job = Sidekiq.load_json(jobstr)
          klass  = job['class'.freeze].constantize
          worker = klass.new
          worker.jid = job['jid'.freeze]

          stats(worker, job, queue) do
            Sidekiq.server_middleware.invoke(worker, job, queue) do
              ack = true
              execute_job(worker, cloned(job['args'.freeze]))
            end
          end
          ack = true
        rescue Sidekiq::Shutdown
          ack = false
        rescue Exception => ex
          handle_exception(ex, job || { :job => jobstr })
          raise
        ensure
          work.acknowledge if ack
        end
      end
    end

#3166 Resolve Rails Reloader and ActiveRecord middleware incompatibility

The new Rails reloader enables the query cache within every yield and expects to have access to the current connection that it enabled the query cache for at the end of the block to clear the cache and then release the connection. The Sidekiq ActiveRecord middleware which is added automatically releases the connection earlier, the query cache of another connection may instead be cleared, and the dirty query cache might be used by another worker.

新しい Rails の reloader が渡したブロックの最後でコネクションを解放するようになりました。
Sidekiq は独自に middlware によってコネクションの解放を行っていました。

先ほどの PR によって reloader が導入されたことで、コネクションの解放が複数箇所で発生するようになりました。他に reloader のロック周りの影響もあり、リロード時にデッドロックや QueryCache に古いものが残ったりしやすくなったようです。

これに関しては元になった issueでも言及がされています。
少なくともこの時点では query cachingDB connection management が development モード以外でも実行されるとなっていますね。

The reloader only actually reloads things in development mode (well, when cache_classes is off)... but it does invoke the executor, which handles "other stuff" that should occur around any user code. Within the framework, that's currently query caching and DB connection management (returning AR connections to the pool).

修正内容としては比較的単純で、新しい reloader の仕組みが入った Rails 5 の場合には Sidekiq の ActiveRecord 用の middleware を読み込まないようにしています。

これに加えて #3154 の対応として以下のコミットが入っています。

この2つの対応で Rails 5 時の initialize 処理は以下のようになりました。

module Sidekiq
  class Rails < ::Rails::Engine
    config.after_initialize do
      if ::Rails::VERSION::MAJOR >= 5
        # The reloader also takes care of ActiveRecord but is incompatible with
        # the ActiveRecord middleware so make sure it's not in the chain already.
        if defined?(Sidekiq::Middleware::Server::ActiveRecord) && Sidekiq.server_middleware.exists?(Sidekiq::Middleware::Server::ActiveRecord)
          raise ArgumentError, "You are using the Sidekiq ActiveRecord middleware and the new Rails 5 reloader which are incompatible. Please remove the ActiveRecord middleware from your Sidekiq middleware configuration."
        elsif ::Rails.application.config.cache_classes
          # The reloader API has proven to be troublesome under load in production.
          # We won't use it at all when classes are cached, see #3154
          Sidekiq.logger.debug { "Autoload disabled in #{::Rails.env}, Sidekiq will not reload changed classes" }
        else
          Sidekiq.options[:reloader] = Sidekiq::Rails::Reloader.new
        end
      end
    end

ただ、これが次の問題を生みます :cry:

#3212 Handle errors raised by the reloader

内容としては reloader がエラーを raise することがあるため例外処理の場所を変えたというものです。
この reloader がエラーを出すという事はこの後も出てきます。

module Sidekiq
  class Processor
    def process(work)
      jobstr = work.job
      queue = work.queue_name

      ack = false
      begin # @reloader.call が begin の内部に入った。
        @reloader.call do
          job = Sidekiq.load_json(jobstr)
          klass  = job['class'.freeze].constantize
          worker = klass.new
          worker.jid = job['jid'.freeze]
      # 略

#3221 Use Rails executor if code reloading is disabled

#3166 で説明した内容の結果として Rails 5 でかつ Rails.application.config.cache_classes == false の時 のみ Rails 5 の新しい reloader が利用されるようになりました。

そうなると Rails 5 で cache_classes == true の場合には、reloader も利用されず、Sidekiq の AR 用の middleware も読み込まれません。そうなると、コネクションを解放する役割を担うものがなくなってしまうという事態になってしまいました。
当然ながら cache_classes == true は production モードで動作させる時の一般的な設定です。

おそらく元々は development モードでのリロードのために導入した reloader が途中からコネクション解放の役割の話しも出てきて、パターンが漏れてしまったのかもしれません。

この PR での提案は ActiveSupport::Executorcache_classes == true の場合には利用するようにするというものです。 (詳しくは後述しますが、 Executor も reloader と同様にブロックを抜ける際にコネクションの解放を行います)

この PR の内容自体は、5.0 時点の Sidekiq のコードには存在していません。PR での議論を読んでいくと、#3212 で reloader の例外を考慮したコードに変更した件から派生した作者の考察がでてきます。(コメント)

リロードしたいクラスは middleware を呼び出す箇所の外にあるため、リロードの処理は middleware に組み込むことができず、しかし Retry は middleware として実装されているため reloader でエラーが発生した場合には Retry できません。
この課題から作者から #3235 の PR が作成され、ここでほぼ 5.0 のコードが準備されることになります。

作者からの提案に関するサンプルコードを引用です。

worker = nil
begin
  reload do
    # 開発中に修正する可能性のあるリロードしたいクラスがこれ。
    # なので、reload はこの外側である必要がある。
    worker = job.constantize.new 

    # middleware の呼び出しがここ。
    # 一番外側に Retry middleware を配置したとしても、reload 自体のエラーはこの外側なので
    # どうしようも無い。
    execute_middleware(worker, ...) do
      execute_job
    end
  end
rescue => ex
  # 提案としては、こんな感じで reload よりさらに外側でリトライできるようになればいいよねってこと。
  retry_failures(job_hash, worker, ex)
end

#3235 Rework job processing in light of Rails 5's Reloader

この PR でついに Sidekiq 5 の大きな変更点になっている Retry と Logging が middleware から Processor クラスへの移動が行われます。

では、最終的に Processor での reloader やリトライ処理がどのようになったのかを見ておきます。
(ここで紹介するコードはあくまで PR のものなので、最新のコードは master を参照してください)

sidekiq/lib/sidekiq/processor.rb
# 一部コメントは削除しています
module Sidekiq
  class Processor
    def process(work)
      jobstr = work.job
      queue = work.queue_name

      ack = false
      begin
        job_hash = nil
        begin
          job_hash = Sidekiq.load_json(jobstr)
        rescue => ex
          Sidekiq.logger.error { "Pushing job to dead queue due to invalid JSON: #{ex}" }
          send_to_morgue(jobstr)
          ack = true
          raise
        end

        ack = true

        # 🏁 ここで dispatch メソッドが呼び出される
        dispatch(job_hash, queue) do |worker|

          # ここで middleware の呼び出しや worker.perform の実行を行う
          Sidekiq.server_middleware.invoke(worker, job_hash, queue) do
            execute_job(worker, cloned(job_hash['args'.freeze]))
          end
        end
      rescue Sidekiq::Shutdown
        ack = false
      rescue Exception => ex
        handle_exception(ex, { :context => "Job raised exception", :job => job_hash, :jobstr => jobstr })
        raise
      ensure
        work.acknowledge if ack
      end
    end

    def dispatch(job_hash, queue)
      pristine = cloned(job_hash)

      # 外側のリトライ
      @retrier.global(job_hash, queue) do
        @logging.call(job_hash, queue) do
          stats(pristine, queue) do
            # Rails 5 requires a Reloader to wrap code execution.  In order to
            # constantize the worker and instantiate an instance, we have to call
            # the Reloader.  It handles code loading, db connection management, etc.
            # Effectively this block denotes a "unit of work" to Rails.

            # ここで reloader が入る
            @reloader.call do

              # 開発時に修正される Worker クラスの生成は reloader の内部。
              klass  = job_hash['class'.freeze].constantize
              worker = klass.new
              worker.jid = job_hash['jid'.freeze]

              # 内側のリトライ。これまで Retry middleware が担っていたもの。
              @retrier.local(worker, job_hash, queue) do

                # この yield 呼び出し先で middleware の実行や worker.perform も実行される。
                # よって、ActiveRecord の利用もあるが、reloader の内部なので、reloader を
                # 抜ける時にコネクションはプールに返される。
                yield worker
              end
            end
          end
        end
      end
    end

    def execute_job(worker, cloned_args)
      worker.perform(*cloned_args)
    end

    def send_to_morgue(msg)
      now = Time.now.to_f
      Sidekiq.redis do |conn|
        conn.multi do
          conn.zadd('dead', now, msg)
          conn.zremrangebyscore('dead', '-inf', now - DeadSet.timeout)
          conn.zremrangebyrank('dead', 0, -DeadSet.max_jobs)
        end
      end
    end

#3221 の最後で引用した作者の例示したコードと同様に、 middleware の実行や worker の実行が reloader の内部に入っています。

作者の例示コードから変わった特徴的なのは以下の点です。

  1. retrygloballocal の 2 箇所に分散された
  2. loggingmiddleware から出された

2 に関しては logging の処理 をみてもらうと分かりますが、目的がリトライ処理も含めた処理時間の計測であったため、 retry の移動にともなって移動しただけでしょう。

  • リトライ処理と書くと実際にリトライをしていそうですが、Sidekiq のリトライ処理はリトライキューに入れる所までです。
  • middleware の時にも logging は retry よりも先に挿し込まれていたので外側でした。

先ほど logging は「目的がリトライ処理も含めた処理時間の計測」と書きましたが、「global のリトライ処理は入っていなけど?」と疑問を持つかもしれません。

次に移動したリトライの処理を見ていきます。

job_retry.rb

module Sidekiq
  class JobRetry
    class Skip < ::RuntimeError; end

    include Sidekiq::Util

    DEFAULT_MAX_RETRY_ATTEMPTS = 25

    def initialize(options = {})
      @max_retries = Sidekiq.options.merge(options).fetch(:max_retries, DEFAULT_MAX_RETRY_ATTEMPTS)
    end

    # The global retry handler requires only the barest of data.
    # We want to be able to retry as much as possible so we don't
    # require the worker to be instantiated.
    def global(msg, queue)
      yield
    rescue Skip
      raise
    rescue Sidekiq::Shutdown
      # ignore, will be pushed back onto queue during hard_shutdown
      raise
    rescue Exception => e
      # ignore, will be pushed back onto queue during hard_shutdown
      raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)

      raise e unless msg['retry']

      # 🏁 ここは第一引数(worker) をあえて nil にしている
      attempt_retry(nil, msg, queue, e)
    end


    # The local retry support means that any errors that occur within
    # this block can be associated with the given worker instance.
    # This is required to support the `sidekiq_retries_exhausted` block.
    def local(worker, msg, queue)
      yield
    rescue Skip
      raise
    rescue Sidekiq::Shutdown
      # ignore, will be pushed back onto queue during hard_shutdown
      raise
    rescue Exception => e
      # ignore, will be pushed back onto queue during hard_shutdown
      raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)

      if msg['retry'] == nil
        msg['retry'] = worker.class.get_sidekiq_options['retry']
      end

      raise e unless msg['retry']
      attempt_retry(worker, msg, queue, e)

      # We've handled this error associated with this job, don't
      # need to handle it at the global level
      raise Skip
    end

削除された middleware/retry_jobs.rb と比較しても、同名の各メソッドに大きな変更がある訳ではありません。

比較のために以下に middleware 時の #call メソッドの内容を引用します。

module Sidekiq
  module Middleware
    module Server
      class RetryJobs
        def call(worker, msg, queue)
          yield
        rescue Sidekiq::Shutdown
          # ignore, will be pushed back onto queue during hard_shutdown
          raise
        rescue Exception => e
          # ignore, will be pushed back onto queue during hard_shutdown
          raise Sidekiq::Shutdown if exception_caused_by_shutdown?(e)

          raise e unless msg['retry']
          attempt_retry(worker, msg, queue, e)
        end

見比べてもらうと分かるのですが、 local, global 共に middleware#call と似ています。
重要な違いは以下の 2 点になるかと思います。

  1. rescue Exception 時の attempt_retry の引数が global の場合のみ nil になっている
  2. Skip という新たな例外を用意しており、 local で一度 rescueした例外を global が意図的に無視できるようにしている。

1 に関してですが、これは global の時は worker クラスをインスタンス化できていないので インスタンスは渡せません。
#attempt_retry の第一引数 worker に nil を渡すと何が起こるかというと、リトライキューに入れる際の遅延時間や、リトライを規定回数し終わっても失敗した場合の処理を worker で固有に設定したものではなく、Sidekiq 全体で適用される設定(or デフォルト)が使われるという違いがあります。

sidekiq/lib/sidekiq/job_retry.rb
# 以下はいずれも #attempt_retry の内部で呼ばれるメソッド

def retries_exhausted(worker, msg, exception)
  logger.debug { "Retries exhausted for job" }
  begin
    # worker が nil だと Sidekiq.default_retries_exhausted が使われる
    block = worker && worker.sidekiq_retries_exhausted_block || Sidekiq.default_retries_exhausted
    block.call(msg, exception) if block
  rescue => e
    handle_exception(e, { context: "Error calling retries_exhausted for #{msg['class']}", job: msg })
  end

  send_to_morgue(msg) unless msg['dead'] == false
end

def delay_for(worker, count, exception)
  # worker が nil だとデフォルトの `seconds_to_delay` が使われる
  worker && worker.sidekiq_retry_in_block? && retry_in(worker, count, exception) || seconds_to_delay(count)
end

def seconds_to_delay(count)
  (count ** 4) + 15 + (rand(30)*(count+1))
end

2 の Skip に関しては、今回初めて登場したものですが、 globallocal でネストすることになった例外処理を取り回すために入った処置だと考えられます。

ちなみに global から再度 raise された Skip は最終的には #processrescue Exeception に拾われて処理されます。 (処理される際には cause が取り出されるため、Skip が外に出されることはありません。)

PR を追って見たきたことのまとめ

Sidekiq 5 の大きな変更である Retry と Logging がどうして middleware から外れ Processor の内部に組み込まれたのかの理由を見ていきました。

それ自体は、

  • thread を多用する Sidekiq の development モードでのリロード処理を Rails 5 で入ったスレッドセーフな ActiveSupport::Reloader を使って改善する取り組みから始まり、
  • これまで ActiveRecord のコネクションの解放を middleware でやっていたものを reloader の仕組みだけで実現できるようになるところに繋がっていき、
  • 最終的にそれを上手く組み込むために Retry を middleware から外し、 Processor の内部に入れるしかないという判断になった

という経緯であるようでした。(間違っているかもしれませんが)

ここまででは触れませんでしたが、Processor#process の冒頭でキューから取得した JSON が復元できなければ、即死亡に入れるという改善も入っており、Processor クラス自体も良くなった印象です。

最後に、 ActiveSupport::Reloader について少し見ておきます。

ActiveSupport::Reloader と ActiveSupport::Executor

そもそも Sidekiq の development モードでのリロードを改善した ActiveSupport::Reloader が、どのような事をやっているのかを軽く見ておきます。

内部の複数スレッドの lock 部分はまだ理解できてない部分が多いので、触り程度になってしまいますが。

Reloader と Executor の関係

Reloader と Executor は共に ExecutionWrapper を継承したクラスです。

特に Executor に関しては ExecutionWrapper を継承しただけのクラスになっています。

require "active_support/execution_wrapper"

module ActiveSupport
  class Executor < ExecutionWrapper
  end
end

Executor について

Executor というか ExecutionWrapper ですが、以下のようなメソッドが用意されています。

module ActiveSupport
  class ExecutionWrapper

    # callback 登録用のメソッド
    def self.to_run(*args, &block)
    def self.to_complete(*args, &block)
    def self.register_hook(hook, outer: false)

    def self.run!
    def self.wrap
    def self.active?

    def run!
    def complete!
end

Callback 登録部分

module ActiveSupport
  class ExecutionWrapper
    define_callbacks :run
    define_callbacks :complete

    def self.to_run(*args, &block)
      set_callback(:run, *args, &block)
    end

    def self.to_complete(*args, &block)
      set_callback(:complete, *args, &block)
    end

    RunHook = Struct.new(:hook) do # :nodoc:
      def before(target)
        hook_state = target.send(:hook_state)
        hook_state[hook] = hook.run
      end
    end

    CompleteHook = Struct.new(:hook) do # :nodoc:
      def before(target)
        hook_state = target.send(:hook_state)
        if hook_state.key?(hook)
          hook.complete hook_state[hook]
        end
      end
      alias after before
    end

    def self.register_hook(hook, outer: false)
      if outer
        to_run RunHook.new(hook), prepend: true
        to_complete :after, CompleteHook.new(hook)
      else
        to_run RunHook.new(hook)
        to_complete CompleteHook.new(hook)
      end
    end

    private
      def hook_state
        @_hook_state ||= {}
      end

register_hookto_runto_complete を一緒に登録するためのものですね。register_hook の場合は、.run.complete に反応できるオブジェクトであれば登録が可能です。例えば以下のようなものですね。

register_hookに登録できるオブジェクトの例
class SampleHook
  def self.run
    'run_result' # ここで戻したものが hook_state を通して complete に渡る
  end

  def self.complete(run_result)

  end
end

run!, wrap

実際に利用されるのはこの2つのメソッドです。
wraprun! を実行し、自動的に complete まで実行するメソッドです。(コメントでも wrap を使えと書いてあります)

module ActiveSupport
  class ExecutionWrapper
    Null = Object.new # :nodoc:
    def Null.complete! # :nodoc:
    end

    def self.run!
      if active?
        Null # Null を返すと手動で complete! される場合にも何もしない
      else
        new.tap do |instance|
          success = nil
          begin
            instance.run! # インスタンスメソッドの run! を実行
            success = true
          ensure
            # 失敗した場合のみ complete! まで実行する。
            # 成功の場合は利用側で手動で返却したインスタンスに対して complete! することを期待します。
            instance.complete! unless success 
          end
        end
      end
    end

    def self.wrap
      return yield if active?

      instance = run! # self.run! を実行
      begin
        yield
      ensure
        instance.complete!
      end
    end

    class << self # :nodoc:
      attr_accessor :active
    end

    def self.inherited(other) # :nodoc:
      super
      other.active = Concurrent::Hash.new
    end

    self.active = Concurrent::Hash.new

    def self.active? # :nodoc:
      @active[Thread.current]
    end

    def run! # :nodoc:
      self.class.active[Thread.current] = true
      run_callbacks(:run)
    end

    def complete!
      run_callbacks(:complete)
    ensure
      self.class.active.delete Thread.current
    end

run!wrap がどういう事をしているかは、なんとなく分かるかと思います。

active?Concurrent::Hash のキーに Thread.current を入れています。
想像できる所では同一スレッドにおいて、 Executor をネストして用いた場合に、過剰に run!, complete 処理が走らないようにするためではないかと思います。

Reloader について

以下のようなメソッドが用意されています。

module ActiveSupport
  class Reloader < ExecutionWrapper
    # callback 登録用のメソッド
    def self.to_prepare(*args, &block)
    def self.before_class_unload(*args, &block)
    def self.after_class_unload(*args, &block)

    def self.reload!
    def self.run!
    def self.wrap

    def self.check!
    def self.reloaded!
    def self.prepare!

    def require_unload_lock!
    def release_unload_lock!

    def run!
    def class_unload!(&block)
    def complete!

Callback 登録部分

module ActiveSupport
  class Reloader < ExecutionWrapper
    define_callbacks :prepare
    define_callbacks :class_unload

    def self.to_prepare(*args, &block)
      set_callback(:prepare, *args, &block)
    end

    def self.before_class_unload(*args, &block)
      set_callback(:class_unload, *args, &block)
    end

    def self.after_class_unload(*args, &block)
      set_callback(:class_unload, :after, *args, &block)
    end

    to_run(:after) { self.class.prepare! }

最後に to_run(:after) に callback を登録しています。これは ExecutionWrapper を継承しているため自身の run に対して設定しています。

run!, wrap

module ActiveSupport
  class Reloader < ExecutionWrapper
    def self.run!
      if check!
        super
      else
        Null
      end
    end

    # Run the supplied block as a work unit, reloading code as needed
    def self.wrap
      executor.wrap do
        super
      end
    end

    class_attribute :executor
    class_attribute :check

    self.executor = Executor
    self.check = lambda { false }

    def self.check!
      @should_reload ||= check.call
    end

    def run!
      super
      release_unload_lock!
    end

    def complete!
      super
      self.class.reloaded!
    ensure
      release_unload_lock!
    end

    # Acquire the ActiveSupport::Dependencies::Interlock unload lock,
    # ensuring it will be released automatically
    def require_unload_lock!
      unless @locked
        ActiveSupport::Dependencies.interlock.start_unloading
        @locked = true
      end
    end

    # Release the unload lock if it has been previously obtained
    def release_unload_lock!
      if @locked
        @locked = false
        ActiveSupport::Dependencies.interlock.done_unloading
      end
    end

    def initialize
      super
      @locked = false
    end

# ===========

module ActiveSupport
  class ExecutionWrapper
    def self.run!
      if active?
        Null
      else
        new.tap do |instance|
          success = nil
          begin
            instance.run!
            success = true
          ensure
            instance.complete! unless success
          end
        end
      end
    end

    def self.wrap
      return yield if active?

      instance = run!
      begin
        yield
      ensure
        instance.complete!
      end
    end

super で呼び出すため、 ExecutionWrapper のコードも再掲しています。

なんとなく wrap を実行した時の呼び出し順を書いてみると以下のようになるかなと。

Reloader.wrap 
  -> ExecutionWrapper.wrap -> (active? pass) 
    -> Reloader.run! -> (check! pass)
      -> ExecutionWrapper.run!
        -> Reloader#run!                   
          -> ExecutionWrapper#run!         # ここで active? == true
          -> Reloader#release_unload_lock!
  -> Reloader#complete!
    -> ExecutionWrapper#complete!          # ここで active? == false
    -> Reloader.reloaded!
    -> Reloadder#release_unload_lock!

active?, check! のいずれかが false の場合には、即座に Null のオブジェクトが返され、その後は実行されず、complete!Null の内容なので、事実上何も実行されません。

release_unload_lock! はここでは機能しない

Reloader.wrap の内部では release_unload_lock! しか呼ばれておらず require_unload_lock! の方は呼び出されていません。

しかもコードをみてもらうと分かる通り @locked は上記のルートでは true になりえないので、 ActiveSupport::Dependencies.interlock の所までくることもありません。

require_unload_lock!#class_unload! でしか呼び出されることはありません。

Rails 内で Executor と Reloader がどう使われているかを見る

class_unload!

まずは、先ほどでてきた class_unload! から見ていきます。

rails/railties/lib/rails/application/finisher.rb
module Rails
  class Application
    module Finisher
      # Set clearing dependencies after the finisher hook to ensure paths
      # added in the hook are taken into account.
      initializer :set_clear_dependencies_hook, group: :all do |app|
        callback = lambda do
          ActiveSupport::DescendantsTracker.clear
          ActiveSupport::Dependencies.clear
        end

        if config.cache_classes
          app.reloader.check = lambda { false }
        elsif config.reload_classes_only_on_change
          app.reloader.check = lambda do
            app.reloaders.map(&:updated?).any?
          end
        else
          app.reloader.check = lambda { true }
        end

        if config.reload_classes_only_on_change
          reloader = config.file_watcher.new(*watchable_args, &callback)
          reloaders << reloader

          # Prepend this callback to have autoloaded constants cleared before
          # any other possible reloading, in case they need to autoload fresh
          # constants.
          app.reloader.to_run(prepend: true) do
            # In addition to changes detected by the file watcher, if routes
            # or i18n have been updated we also need to clear constants,
            # that's why we run #execute rather than #execute_if_updated, this
            # callback has to clear autoloaded constants after any update.
            class_unload! do
              reloader.execute
            end
          end
        else
          app.reloader.to_complete do
            class_unload!(&callback)
          end
        end
      end

リロードの仕組み自体は大きく変わっていないと思いますので、以下の記事が参考になります。
RailsのReloaderの仕組み - AnyType

ここでは、Reloader の使われ方のみ見ていきます。

reloader, reloaders

app.reloader, app.reloaders がでてきていますが、これは別物です。

app.reloader はこれまで見てきた ActiveSupport::Reloader のことであり、app.reloaders には上記のコードにもでてきてきますが、 file_watcher のような変更監視を行うものが入ります。

https://railsguides.jp/configuring.html
config.file_watcherは、config.reload_classes_only_on_changeがtrueの場合にファイルシステム上のファイル更新検出に使用されるクラスを指定します。ActiveSupport::FileUpdateChecker APIに従う必要があります。

他にも RoutesReloader というクラスも上記の Finisher 内で reloaders に登録されていますが、こちらも FileUpdateChecker を内部で利用しています。

以降の説明では file_watcher に指定されているものは FileUpdateChecker として進めますが、 Rails アプリを作成した時の development.rbfile_watcher に指定されているのは EventedFileUpdateChecker の方です。

Rails の設定による Reloader.check の動作の変更

Reloader で self.check = lambda { false } というコードがでてきていたのですが、ここではその挙動を設定に応じて、変更しています。

この check の結果が false になると Reloader.run! (引いてはそれを利用する warp) も動作を行いません。

rails/railties/lib/rails/application/finisher.rb
# initializer :set_clear_dependencies_hook, group: :all do |app|
if config.cache_classes
  app.reloader.check = lambda { false }
elsif config.reload_classes_only_on_change
  app.reloader.check = lambda do
    app.reloaders.map(&:updated?).any?
  end
else
  app.reloader.check = lambda { true }
end

以下はガイドからの引用です。

https://railsguides.jp/configuring.html

  • config.reload_classes_only_on_changeは、監視しているファイルが変更された場合にのみクラスを再読み込みするかどうかを指定します。デフォルトでは、autoload_pathで指定されたすべてのファイルが監視対象となり、デフォルトでtrueが設定されます。 config.cache_classesがオンの場合はこのオプションは無視されます

config.cache_classes が true の場合には app.reloader.check には false を返す proc が渡されるため Reloader は動作しません。

config.reload_classes_only_on_changetrue の場合には app.reloaders に変更があった (updated?) があった場合のみ true になる proc が渡されます。

set_clear_dependencies_hook

上記の Finisher のコードはそもそも set_clear_dependencies_hook の内部です。

https://railsguides.jp/configuring.html

  • set_clear_dependencies_hook: active_record.set_dispatch_hooksへのフックを提供します。このイニシャライザより前に実行されます。このイニシャライザは、 cache_classesがfalseの場合にのみ実行 されます。そして、このイニシャライザはActionDispatch::Callbacks.afterを使用して、オブジェクト空間からのリクエスト中に参照された定数を削除します。これにより、これらの定数は以後のリクエストで再度読み込まれるようになります。
  • active_record.set_dispatch_hooks: config.cache_classesがfalseに設定されている場合 、再読み込み可能なデータベース接続をすべてリセットします。

上記のガイドの active_record.set_dispatch_hooks は Rails5.1 では存在しておらず、おそらくは以下の active_record.set_reloader_hooks の方ではないかと思います。

http://guides.rubyonrails.org/configuring.html

  • set_clear_dependencies_hook: This initializer - which runs only if cache_classes is set to false - uses ActionDispatch::Callbacks.after to remove the constants which have been referenced during the request from the object space so that they will be reloaded during the following request.
  • active_record.set_reloader_hooks: Resets all reloadable connections to the database if config.cache_classes is set to false.

「ActionDispatch::Callbacks.afterを使用して、オブジェクト空間からのリクエスト中に参照された定数を削除します。」この辺りがコードからは把握できませんでした。(すでに削除されている ActionDispatch::Callbacks.to_prepare の内容もガイドにはまだ入っているため、追随できていない箇所なのかもしれません。)

以下は再び set_clear_dependencies_hook の内容です。

rails/railties/lib/rails/application/finisher.rb
# initializer :set_clear_dependencies_hook, group: :all do |app|
callback = lambda do
  ActiveSupport::DescendantsTracker.clear
  ActiveSupport::Dependencies.clear
end

# 前述の app.reloader.check の設定箇所

if config.reload_classes_only_on_change
  reloader = config.file_watcher.new(*watchable_args, &callback)
  reloaders << reloader

  # Prepend this callback to have autoloaded constants cleared before
  # any other possible reloading, in case they need to autoload fresh
  # constants.
  app.reloader.to_run(prepend: true) do
    # In addition to changes detected by the file watcher, if routes
    # or i18n have been updated we also need to clear constants,
    # that's why we run #execute rather than #execute_if_updated, this
    # callback has to clear autoloaded constants after any update.
    class_unload! do
      reloader.execute # ここで file_watcher の execute を呼び出す
    end
  end
else
  app.reloader.to_complete do
    class_unload!(&callback) # ここで file_watcher の execute を呼び出す
  end
end

config.reload_classes_only_on_change の設定で分岐していますが、いずれの場合も class_unload! を呼び出しているのが分かります。

config.cache_classes が true の場合は、

以下は config.file_watcher に設定されている FileUpdateChecker#execute の内容です。
https://github.com/rails/rails/blob/v5.1.1/activesupport/lib/active_support/file_update_checker.rb#L78-L85

rails/activesupport/lib/active_support/file_update_checker.rb
module ActiveSupport
  class FileUpdateChecker
    # Executes the given block and updates the latest watched files and
    # timestamp.
    def execute
      @last_watched   = watched
      @last_update_at = updated_at(@last_watched)
      @block.call
    ensure
      @watched = nil
      @updated_at = nil
    end

なんらか変更監視の状態を更新した上で block を実行しています。

class_unload! に直接渡すにせよ、file_watcher を経由するにせよ、実行されるブロックは以下のものです。

callback = lambda do
  ActiveSupport::DescendantsTracker.clear
  ActiveSupport::Dependencies.clear
end

こちらに関しては再掲になりますが、以下の記事で内容が解説されています。 (Rails5 の内容ではないですが、大きくは変わっていないですし分かりやすいです)
RailsのReloaderの仕組み - AnyType

ここでは、Rails が規約ベースや autoload で読み込んだクラス(定数)をアンロードして、次のクラスへのアクセスの時に再度ロードされる状態にするという感じで捉えておいてもらえればと思います。(ちょっと適当すぎるかもですが :bow: )

class_unload! 再訪

ということで再び ActiveSupport::Reloader#class_unload! です。

rails/activesupport/lib/active_support/reloader.rb
module ActiveSupport
  class Reloader < ExecutionWrapper
    def class_unload!(&block) # :nodoc:
      require_unload_lock!
      run_callbacks(:class_unload, &block)
    end

    # Acquire the ActiveSupport::Dependencies::Interlock unload lock,
    # ensuring it will be released automatically
    def require_unload_lock!
      unless @locked
        ActiveSupport::Dependencies.interlock.start_unloading
        @locked = true
      end
    end

内容自体は ActiveSupport::Dependencies::Interlock でアンロードのロックを開始し、callback を実行しているだけです。

では callback を登録している箇所を見ていきます。 v5.1.1 では Rails 内には 2 箇所しかありません。

一箇所は actoin_cable です。
https://github.com/rails/rails/blob/v5.1.1/actioncable/lib/action_cable/engine.rb#L71

rails/actioncable/lib/action_cable/engine.rb
module ActionCable
  class Engine < Rails::Engine
    initializer "action_cable.set_work_hooks" do |app|
      ActiveSupport.on_load(:action_cable) do
        ActionCable::Server::Worker.set_callback :work, :around, prepend: true do |_, inner|
          app.executor.wrap do
            # If we took a while to get the lock, we may have been halted
            # in the meantime. As we haven't started doing any real work
            # yet, we should pretend that we never made it off the queue.
            unless stopping?
              inner.call
            end
          end
        end

        wrap = lambda do |_, inner|
          app.executor.wrap(&inner)
        end
        ActionCable::Channel::Base.set_callback :subscribe, :around, prepend: true, &wrap
        ActionCable::Channel::Base.set_callback :unsubscribe, :around, prepend: true, &wrap

        # class_unload で server restart
        app.reloader.before_class_unload do
          ActionCable.server.restart
        end
      end
    end

もう一箇所は active_record です。
https://github.com/rails/rails/blob/v5.1.1/activerecord/lib/active_record/railtie.rb#L148-L157

rails/activerecord/lib/active_record/railtie.rb
module ActiveRecord
  class Railtie < Rails::Railtie
    initializer "active_record.set_reloader_hooks" do
      ActiveSupport.on_load(:active_record) do
        ActiveSupport::Reloader.before_class_unload do
          if ActiveRecord::Base.connected?
            ActiveRecord::Base.clear_cache!
            ActiveRecord::Base.clear_reloadable_connections!
          end
        end
      end
    end

先ほど引用したガイドにあった以下の説明は Reloader#class_unload! によって(全てかは分かりませんが)実施されいていることがわかりました。

  • active_record.set_reloader_hooks: Resets all reloadable connections to the database if config.cache_classes is set to false.

ここまでで駆け足ではありますが class_unload! に関する部分は終了です。

Executor の register_hook, to_run, to_complete

続いて ActiveSupport::Executor のフックがどう使われているかを見ていきます。

register_hook

これの利用箇所は 2 箇所あります。

一つ目は active_record です。
https://github.com/rails/rails/blob/v5.1.1/activerecord/lib/active_record/query_cache.rb#L44

module ActiveRecord
  class QueryCache
    def self.run
      caching_pool = ActiveRecord::Base.connection_pool
      caching_was_enabled = caching_pool.query_cache_enabled

      caching_pool.enable_query_cache!

      [caching_pool, caching_was_enabled]
    end

    def self.complete((caching_pool, caching_was_enabled))
      caching_pool.disable_query_cache! unless caching_was_enabled

      ActiveRecord::Base.connection_handler.connection_pool_list.each do |pool|
        pool.release_connection if pool.active_connection? && !pool.connection.transaction_open?
      end
    end

    def self.install_executor_hooks(executor = ActiveSupport::Executor)
      # self が run と complete に反応できるので登録可能
      executor.register_hook(self)
    end

Sidekiq の説明の部分で Reloader がコネクション解放の役割も担うようになった と書いていたのを思い出して欲しいのですが、ここがその実装部分になります。

Reloader.wrapExecutionWrapperrun, complete も呼び出します、つまり、両方の Callback が Reloader においても実行されます。

よって、app.reloader.wrap {} で実行されるブロックを抜けるとクエリキャッシュとコネクションは解放されます。(もちろんそのスレッドで利用していたものです)

ちなみに、この ActiveRecord::QueryCache.install_executor_hooksactive_record の initializer で登録されています。

続いてもう一箇所ですが、再び Finisher です。
https://github.com/rails/rails/blob/v5.1.1/railties/lib/rails/application/finisher.rb#L100-L121

rails/railties/lib/rails/application/finisher.rb
module Rails
  class Application
    module Finisher
      initializer :configure_executor_for_concurrency do |app|
        if config.allow_concurrency == false
          # User has explicitly opted out of concurrent request
          # handling: presumably their code is not threadsafe

          app.executor.register_hook(MutexHook.new, outer: true)

        elsif config.allow_concurrency == :unsafe
          # Do nothing, even if we know this is dangerous. This is the
          # historical behavior for true.

        else
          # Default concurrency setting: enabled, but safe

          unless config.cache_classes && config.eager_load
            # Without cache_classes + eager_load, the load interlock
            # is required for proper operation

            app.executor.register_hook(InterlockHook, outer: true)
          end
        end
      end

      class MutexHook
        def initialize(mutex = Mutex.new)
          @mutex = mutex
        end

        def run
          @mutex.lock
        end

        def complete(_state)
          @mutex.unlock
        end
      end

      module InterlockHook
        def self.run
          ActiveSupport::Dependencies.interlock.start_running
        end

        def self.complete(_state)
          ActiveSupport::Dependencies.interlock.done_running
        end
      end

どうやら並列実行に関する設定のようです。

config.allow_concurrency には false, :unsefe、と指定なしの 3 つのパターンがあるようです。

false の場合には MutexHook が、指定なしの場合には InterlockHook が利用されています。

MutexHook は mutex でロックしてしまっているので、たしかに並列実行は認めないようです。

一方で InterlockHook の方は ActiveSupport::Dependencies.interlock を利用しています。
関連する部分は以下なのですが、ざっくりでも説明できそうにないので省きます。

実質の処理を担っている ActiveSupport::Concurrency::ShareLock の冒頭に 「A share/exclusive lock, otherwise known as a read/write lock.」と記載されているので、そういうことなのだと思います :sweat_smile:

https://github.com/rails/rails/blob/v5.1.1/activesupport/lib/active_support/dependencies/interlock.rb#L30-L36
https://github.com/rails/rails/blob/v5.1.1/activesupport/lib/active_support/concurrency/share_lock.rb#L112-L138

ただ、デフォルトでも共有/排他ロックの仕組みが入っているため、 ActiveSupport::Reloader.class_unload! 時に取得される ActiveSupport::Dependencies.interlock.start_unloading とも上手く連携され、別スレッドでリロードが行われたとしても、 app.executor.warp {} の内部で処理を実行していれば、不正なクラスの内容で処理を実行することを防いでいるようです。

to_run

最初は action_view です。
https://github.com/rails/rails/blob/v5.1.1/actionview/lib/action_view/railtie.rb#L49-L55

rails/actionview/lib/action_view/railtie.rb
module ActionView
  class Railtie < Rails::Engine
    initializer "action_view.per_request_digest_cache" do |app|
      ActiveSupport.on_load(:action_view) do
        unless ActionView::Resolver.caching?
          app.executor.to_run ActionView::Digestor::PerExecutionDigestCacheExpiry
        end
      end
    end

module ActionView
  class Digestor
    module PerExecutionDigestCacheExpiry
      def self.before(target)
        ActionView::LookupContext::DetailsKey.clear
      end
    end

ActionView::Resolver.caching? が false の時のみという条件からみても、実行ごとに View の Lookup 用のキャッシュを捨てるという感じなのでしょうか。(開発中ならたしかにありそう)

続いては i18n 周りです。
https://github.com/rails/rails/blob/v5.1.1/activesupport/lib/active_support/i18n_railtie.rb#L59-L73

詳細は省きますが、i18n のファイルも別途変更監視を行っており、その変更を適用するための設定のようです。

最後はまた Finisher なのですが、すでに触れた set_clear_dependencies_hookset_routes_reloader_hook (こちらは少しだけですが) なので、説明は省きます。

to_complete

すでにでてきた Finisher の以下の箇所だけであるため割愛します。
https://github.com/rails/rails/blob/v5.1.1/railties/lib/rails/application/finisher.rb#L180

Reloader と Executor を見てきて

終盤息切れしてきましたが、Reloader と Executor をざっと駆け足で眺めました。 Interlock 周りを細かく見れていないので、まだまだ誤解している箇所もありそうに思います。

ここでは触れませんでしたが、Rails console で reload! とした時には Reloader.reload! が呼び出されます。

最後に参考資料を貼っておきます。

上2つの PR は Sidekiq の説明部分でも貼りましたが、 matthewd さんが貼っていた PR です。おそらく彼がこの辺りを積極的になおしているのだと思います。
彼が書いた解説資料が 3 つ目の資料です。(Rails Guide にはまだ来ていないようです。もしかしたら古い内容なのかも。)
4 つ目の資料は Reloader/Executor のことだけでなく、Sidekiq/ActiveRecord/Capybara への影響なども交えて話してくれていて分かりやすかったです。

最後に

Rails 上での開発に影響があるか?

参考資料の後半2つにもでてきていますが、書き方次第ではデッドロックの可能性があります。

なので、Executor/Reloader を使うようなコードを書かなければ影響は無いと思いますが、利用するコードを書く場合には注意が必要なのではと感じました。

一方で Sidekiq がそうしたように、Rails に依存した形での並列処理は書きやすくなったので、使い所はあるのではと思います。

Rails 以外での使いみちはあるか?

ActiveSupport 単体でということになりますが、Executor に関しては、見てもらって分かる通り特別な事はあまりしていません。「同一スレッドでネストして利用された場合の対処」と「Callback を提供」しているだけです。

並列性の話しは全て Rails で利用する場合なのです。(initializer 等で ActiveSupport::Dependencies.interlock 関連のフックが追加されるため)

一方で、ActiveSupport::Concurrency::ShareLock は Rails 関係なく、共有/排他ロックを実現できる仕組みです。(もしかすると、Rails 内でしか通用しないものなのかもしれませんが)

なので Executor との組み合わせや、ShareLock 単体での使いみちはあるのかもしれません。

まあ、コードを参考にするぐらいがちょうどいいのかもしれませんが。

最後の最後に

Rails 力低い人が軽い気持ちで見始めていい場所じゃなかった。。

21
14
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
21
14