概要
Rails6.0より実装された複数DB対応に関して、自身のプロジェクトに適用できるか?という判断のために実装を追ってみます
Railsガイドでの記載
自動切り替え機能によって、アプリケーションはHTTP verbや直近の書き込みの有無に応じてprimaryからreplica、またはreplicaからprimaryへと切り替えます。
アプリケーションがPOST、PUT、DELETE、PATCHのいずれかのリクエストを受け取ると、自動的にprimaryに書き込みます。書き込み後に指定の時間が経過するまでは、アプリケーションはprimaryから読み出します。アプリケーションがGETリクエストやHEADリクエストを受け取ると、直近の書き込みがなければreplicaから読み出します。
HTTPメソッドによる判定 + 書き込みからの時間による判定 の2種類にて行われているという記載になっている
設定方法としては下記の記載
config.active_record.database_selector = { delay: 2.seconds }
config.active_record.database_resolver = ActiveRecord::Middleware::DatabaseSelector::Resolver
config.active_record.database_resolver_context = ActiveRecord::Middleware::DatabaseSelector::Resolver::Session
コードリーディング
まずはミドルウェアの適用部分から追ってみる
Railtieでのミドルウェアの適用部分
class Railtie < Rails::Railtie # :nodoc:
config.active_record = ActiveSupport::OrderedOptions.new
initializer "active_record.database_selector" do
if options = config.active_record.delete(:database_selector)
resolver = config.active_record.delete(:database_resolver)
operations = config.active_record.delete(:database_resolver_context)
config.app_middleware.use ActiveRecord::Middleware::DatabaseSelector, resolver, operations, options
end
end
-
database_selector
が設定されていれば実行される -
database_resolver
とdatabase_resolver_context
はただ渡されるようになっている
続いてミドルウェア部分の実装を追っていく
[補足] OrderedOptionsとは?
Hashをドットでアクセスできるようにしている機構
deleteで設定されていない場合はnilが返る
Ruby2.6.5 pry(main)> x = ActiveSupport::OrderedOptions.new
=> {}
Ruby2.6.5 pry(main)> x.delete(:hoge)
=> nil
DatabaseSelectorの実装内容
module ActiveRecord
module Middleware
class DatabaseSelector
def initialize(app, resolver_klass = nil, context_klass = nil, options = {})
@app = app
@resolver_klass = resolver_klass || Resolver
@context_klass = context_klass || Resolver::Session
@options = options
end
attr_reader :resolver_klass, :context_klass, :options
def call(env)
request = ActionDispatch::Request.new(env)
select_database(request) do
@app.call(env)
end
end
-
database_resolver
が設定されていない場合はDatabaseSelector::Resolver
が設定される -
database_resolver_context
が設定されていない場合はDatabaseSelector::Resolver::Session
が設定される
続いて重要そうな#select_database
を見てみる
DatabaseSelector#select_database
module ActiveRecord
module Middleware
class DatabaseSelector
private
def select_database(request, &blk)
context = context_klass.call(request)
resolver = resolver_klass.call(context, options)
if reading_request?(request)
resolver.read(&blk)
else
resolver.write(&blk)
end
end
def reading_request?(request)
request.get? || request.head?
end
end
end
end
context
が先に実行され、それをもとにresolver
が作られているため、まずはcontext
のほうを確認してみる
Resolver::Session.call
class Resolver # :nodoc:
class Session # :nodoc:
def self.call(request)
new(request.session)
end
def initialize(session)
@session = session
end
attr_reader :session
.call
は#new
しているだけで、ここではActionDispatch::Request
のsessionをセットしているのみ
なので続いてはResolverの.call
を確認
Resolver.call
class Resolver # :nodoc:
SEND_TO_REPLICA_DELAY = 2.seconds
def self.call(context, options = {})
new(context, options)
end
def initialize(context, options = {})
@context = context
@options = options
@delay = @options && @options[:delay] ? @options[:delay] : SEND_TO_REPLICA_DELAY
@instrumenter = ActiveSupport::Notifications.instrumenter
end
attr_reader :context, :delay, :instrumenter
ここでもnewしているだけなので、元のDatabaseSelector#select_database
をもう少し読み進めていく
※ちなみにinstrumenterは通知の仕組み(購読すれば通知を受け取れる)
HTTPメソッドの判断部分
def select_database(request, &blk)
context = context_klass.call(request)
resolver = resolver_klass.call(context, options)
if reading_request?(request)
resolver.read(&blk)
else
resolver.write(&blk)
end
end
def reading_request?(request)
request.get? || request.head?
end
ここではGETかHEADの場合 => read
それ以外の場合 => write
としてresolverの処理を実行している(Resolverがさらに判定を行っているっぽい
まずはreadから見てみる
Resolver#read
class Resolver # :nodoc:
def read(&blk)
if read_from_primary?
read_from_primary(&blk)
else
read_from_replica(&blk)
end
end
private
def read_from_primary(&blk)
ActiveRecord::Base.connected_to(role: ActiveRecord::Base.writing_role, prevent_writes: true) do
instrumenter.instrument("database_selector.active_record.read_from_primary") do
yield
end
end
end
def read_from_replica(&blk)
ActiveRecord::Base.connected_to(role: ActiveRecord::Base.reading_role, prevent_writes: true) do
instrumenter.instrument("database_selector.active_record.read_from_replica") do
yield
end
end
end
roleをどちらにするかとinstrumentにより通知を行ったうえで処理を進めているよう。
では続いて#read_from_primary?
にて何をどう判定しているのか?
private
def read_from_primary?
!time_since_last_write_ok?
end
def send_to_replica_delay
delay
end
def time_since_last_write_ok?
Time.now - context.last_write_timestamp >= send_to_replica_delay
end
context
いわゆるsessionの#last_write_timestamp
というものが、
最初のconfig.active_record.database_selector = { delay: 2.seconds }
のdelay分だけ経過していなかった場合はprimaryにして、
delay分経過している場合はreplicaを見るようになっている
ということでsessionの#last_write_timestamp
を確認
class Session # :nodoc:
# Converts milliseconds since epoch timestamp into a time object.
def self.convert_timestamp_to_time(timestamp)
timestamp ? Time.at(timestamp / 1000, (timestamp % 1000) * 1000) : Time.at(0)
end
def last_write_timestamp
self.class.convert_timestamp_to_time(session[:last_write])
end
ただsesion[:last_write]
をtimeオブジェクトに直しているのみ
readはこれで終わりなので続いてwrite
Resolver#write
class Resolver # :nodoc:
def write(&blk)
write_to_primary(&blk)
end
private
def write_to_primary(&blk)
ActiveRecord::Base.connected_to(role: ActiveRecord::Base.writing_role, prevent_writes: false) do
instrumenter.instrument("database_selector.active_record.wrote_to_primary") do
yield
ensure
context.update_last_write_timestamp
end
end
end
これはただただ書き込みのroleにて書き込んでいるのみ
class Session # :nodoc:
def self.convert_time_to_timestamp(time)
time.to_i * 1000 + time.usec / 1000
end
def update_last_write_timestamp
session[:last_write] = self.class.convert_time_to_timestamp(Time.now)
end
そして、session[:last_write]
に現在のタイムスタンプを付与
まとめ
さすがRailsですね。とても分かりやすく作られていました。
結局はActionDispatch::Requestをもとに処理されているだけで、かつResolverの#read
と#write
を両方書き換えれば
GETとHEADに限らず好きに書くことができそうでした。
ヘッダーやIPアドレス、localhostなどによる判定も、Requestが持っている情報なので使えますし
できることはかなり多いと思いました。