LoginSignup
0
0

More than 3 years have passed since last update.

Rails6のマルチDBの自動切換(ActiveRecord::Middleware::DatabaseSelector)の実装を読んでみる

Last updated at Posted at 2020-09-24

概要


Rails6.0より実装された複数DB対応に関して、自身のプロジェクトに適用できるか?という判断のために実装を追ってみます

Railsガイドでの記載

自動切り替え機能によって、アプリケーションはHTTP verbや直近の書き込みの有無に応じてprimaryからreplica、またはreplicaからprimaryへと切り替えます。
アプリケーションがPOST、PUT、DELETE、PATCHのいずれかのリクエストを受け取ると、自動的にprimaryに書き込みます。書き込み後に指定の時間が経過するまでは、アプリケーションはprimaryから読み出します。アプリケーションがGETリクエストやHEADリクエストを受け取ると、直近の書き込みがなければreplicaから読み出します。

HTTPメソッドによる判定 + 書き込みからの時間による判定 の2種類にて行われているという記載になっている
設定方法としては下記の記載

config.active_record.database_selector = { delay: 2.seconds }
config.active_record.database_resolver = ActiveRecord::Middleware::DatabaseSelector::Resolver
config.active_record.database_resolver_context = ActiveRecord::Middleware::DatabaseSelector::Resolver::Session

コードリーディング

まずはミドルウェアの適用部分から追ってみる

Railtieでのミドルウェアの適用部分

activerecord-6.0.3.2/lib/active_record/railtie.rb

  class Railtie < Rails::Railtie # :nodoc:
    config.active_record = ActiveSupport::OrderedOptions.new

    initializer "active_record.database_selector" do
      if options = config.active_record.delete(:database_selector)
        resolver = config.active_record.delete(:database_resolver)
        operations = config.active_record.delete(:database_resolver_context)
        config.app_middleware.use ActiveRecord::Middleware::DatabaseSelector, resolver, operations, options
      end
    end
  • database_selectorが設定されていれば実行される
  • database_resolverdatabase_resolver_contextはただ渡されるようになっている

続いてミドルウェア部分の実装を追っていく

[補足] OrderedOptionsとは?

Hashをドットでアクセスできるようにしている機構
deleteで設定されていない場合はnilが返る

Ruby2.6.5 pry(main)> x = ActiveSupport::OrderedOptions.new
=> {}
Ruby2.6.5 pry(main)> x.delete(:hoge)
=> nil

DatabaseSelectorの実装内容

activerecord-6.0.3.2/lib/active_record/middleware/database_selector.rb

module ActiveRecord
  module Middleware
    class DatabaseSelector
      def initialize(app, resolver_klass = nil, context_klass = nil, options = {})
        @app = app
        @resolver_klass = resolver_klass || Resolver
        @context_klass = context_klass || Resolver::Session
        @options = options
      end

      attr_reader :resolver_klass, :context_klass, :options

      def call(env)
        request = ActionDispatch::Request.new(env)

        select_database(request) do
          @app.call(env)
        end
      end
  • database_resolverが設定されていない場合はDatabaseSelector::Resolverが設定される
  • database_resolver_contextが設定されていない場合はDatabaseSelector::Resolver::Sessionが設定される

続いて重要そうな#select_databaseを見てみる

DatabaseSelector#select_database

module ActiveRecord
  module Middleware
    class DatabaseSelector
      private
        def select_database(request, &blk)
          context = context_klass.call(request)
          resolver = resolver_klass.call(context, options)

          if reading_request?(request)
            resolver.read(&blk)
          else
            resolver.write(&blk)
          end
        end

        def reading_request?(request)
          request.get? || request.head?
        end
    end
  end
end

contextが先に実行され、それをもとにresolverが作られているため、まずはcontextのほうを確認してみる

Resolver::Session.call

class Resolver # :nodoc:
  class Session # :nodoc:
    def self.call(request)
      new(request.session)
    end

    def initialize(session)
      @session = session
    end

    attr_reader :session

.call#newしているだけで、ここではActionDispatch::Requestのsessionをセットしているのみ
なので続いてはResolverの.callを確認

Resolver.call

class Resolver # :nodoc:
  SEND_TO_REPLICA_DELAY = 2.seconds

  def self.call(context, options = {})
    new(context, options)
  end

  def initialize(context, options = {})
    @context = context
    @options = options
    @delay = @options && @options[:delay] ? @options[:delay] : SEND_TO_REPLICA_DELAY
    @instrumenter = ActiveSupport::Notifications.instrumenter
  end

  attr_reader :context, :delay, :instrumenter

ここでもnewしているだけなので、元のDatabaseSelector#select_databaseをもう少し読み進めていく
※ちなみにinstrumenterは通知の仕組み(購読すれば通知を受け取れる)

HTTPメソッドの判断部分

def select_database(request, &blk)
  context = context_klass.call(request)
  resolver = resolver_klass.call(context, options)

  if reading_request?(request)
    resolver.read(&blk)
  else
    resolver.write(&blk)
  end
end

def reading_request?(request)
  request.get? || request.head?
end

ここではGETかHEADの場合 => read
それ以外の場合 => write
としてresolverの処理を実行している(Resolverがさらに判定を行っているっぽい

まずはreadから見てみる

Resolver#read

class Resolver # :nodoc:
  def read(&blk)
    if read_from_primary?
      read_from_primary(&blk)
    else
      read_from_replica(&blk)
    end
  end

  private
    def read_from_primary(&blk)
      ActiveRecord::Base.connected_to(role: ActiveRecord::Base.writing_role, prevent_writes: true) do
        instrumenter.instrument("database_selector.active_record.read_from_primary") do
          yield
        end
      end
    end

    def read_from_replica(&blk)
      ActiveRecord::Base.connected_to(role: ActiveRecord::Base.reading_role, prevent_writes: true) do
        instrumenter.instrument("database_selector.active_record.read_from_replica") do
          yield
        end
      end
    end

roleをどちらにするかとinstrumentにより通知を行ったうえで処理を進めているよう。
では続いて#read_from_primary?にて何をどう判定しているのか?

  private
    def read_from_primary?
      !time_since_last_write_ok?
    end

    def send_to_replica_delay
      delay
    end

    def time_since_last_write_ok?
      Time.now - context.last_write_timestamp >= send_to_replica_delay
    end

contextいわゆるsessionの#last_write_timestampというものが、
最初のconfig.active_record.database_selector = { delay: 2.seconds }のdelay分だけ経過していなかった場合はprimaryにして、
delay分経過している場合はreplicaを見るようになっている

ということでsessionの#last_write_timestampを確認

class Session # :nodoc:
  # Converts milliseconds since epoch timestamp into a time object.
  def self.convert_timestamp_to_time(timestamp)
    timestamp ? Time.at(timestamp / 1000, (timestamp % 1000) * 1000) : Time.at(0)
  end

  def last_write_timestamp
    self.class.convert_timestamp_to_time(session[:last_write])
  end

ただsesion[:last_write]をtimeオブジェクトに直しているのみ
readはこれで終わりなので続いてwrite

Resolver#write

class Resolver # :nodoc:
  def write(&blk)
    write_to_primary(&blk)
  end

  private
    def write_to_primary(&blk)
      ActiveRecord::Base.connected_to(role: ActiveRecord::Base.writing_role, prevent_writes: false) do
        instrumenter.instrument("database_selector.active_record.wrote_to_primary") do
          yield
        ensure
          context.update_last_write_timestamp
        end
      end
    end

これはただただ書き込みのroleにて書き込んでいるのみ

class Session # :nodoc:
  def self.convert_time_to_timestamp(time)
    time.to_i * 1000 + time.usec / 1000
  end

  def update_last_write_timestamp
    session[:last_write] = self.class.convert_time_to_timestamp(Time.now)
  end

そして、session[:last_write]に現在のタイムスタンプを付与

まとめ

さすがRailsですね。とても分かりやすく作られていました。

結局はActionDispatch::Requestをもとに処理されているだけで、かつResolverの#read#writeを両方書き換えれば
GETとHEADに限らず好きに書くことができそうでした。

ヘッダーやIPアドレス、localhostなどによる判定も、Requestが持っている情報なので使えますし
できることはかなり多いと思いました。

0
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
0
0