Introduction
Socket.ioもびっくりのスーパー抽象化がなされたActionCableについて,実装を覗いて理解を深めましょう.
なお,Rails 5-beta1を参考にしていきます.
ActionCable.server
Rails.application.routes.draw do
mount ActionCable.server => '/cable'
end
ActionCable.server
の実装.ActionCable::Server::Base
のインスタンスを返しているだけ.
# https://github.com/rails/rails/blob/v5.0.0.beta1/actioncable/lib/action_cable.rb#L42-L44
module ActionCable
module_function def server
@server ||= ActionCable::Server::Base.new
end
end
ActionCable::Server
ActionCable::Server::Base
は#call(env)
を実装しているので,mountable engineとして動作する.
# https://github.com/rails/rails/blob/v5.0.0.beta1/actioncable/lib/action_cable/server/base.rb#L25-L28
class ActionCable::Server::Base
def call(env)
setup_heartbeat_timer
config.connection_class.new(self, env).process
end
end
setup_heartbeat_timer
はActionCable::Server::Connection
moduleに定義されている.EventMachineで処理をスケジューリングしている.connections
の各要素のbeat
メソッドを叩いているようだ.ここは後で見ていくことにする.
# https://github.com/rails/rails/blob/v5.0.0.beta1/actioncable/lib/action_cable/server/connections.rb#L24-L30
module ActionCable::Server::Connection
def setup_heartbeat_timer
EM.next_tick do
@heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
EM.next_tick { connections.map(&:beat) }
end
end
end
end
ActionCable::Server::Base#initialize
に戻る.2行目には次のようなコードがある.config.connection_class
のインスタンスを作ってからprocess
を呼んでいるらしい.
config.connection_class.new(self, env).process
config
はcattr_accessor
で宣言されている.ActionCable::Server::Configuration
のインスタンスらしい.
# https://github.com/rails/rails/blob/v5.0.0.beta1/actioncable/lib/action_cable/server/base.rb#L16
cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new }
ActionCable::Server::Configuration
を見てみると,connection_class
のなかみはApplicationCable::Connection
であることがわかる.
# https://github.com/rails/rails/blob/v5.0.0.beta1/actioncable/lib/action_cable/server/configuration.rb#L15
class ActionCable::Server::Configuration
attr_accessor :connection_class
def initialize
@connection_class = ApplicationCable::Connection
end
end
ActionCable::Connection
ActionCable::Connection::Base#process
ではWebSocketの各種イベントのコールバックを登録している.ちゃんと接続できたらMountable Engineのcall
メソッドが要求する返り値を返す.
# https://github.com/rails/rails/blob/master/actioncable/lib/action_cable/connection/base.rb#L71-L83
class ActionCable::Connection::Base
def initialize
@websocket = ActionCable::Connection::WebSocket.new(env)
end
def process
logger.info started_request_message
if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }
respond_to_successful_request
else
respond_to_invalid_request
end
end
end
ActionCable::Connection::WebSocket
はFaye::WebSocket
のラッパになっており,主要なメソッド(?)は大抵delegateされる.
# https://github.com/rails/rails/blob/v5.0.0.beta1/actioncable/lib/action_cable/connection/web_socket.rb#L7-L11
class ActionCable::Connection::WebSocket
delegate :rack_response, :close, :on, to: :websocket
def initialize(env)
@websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
end
end
メッセージ受信時の流れ
めんどくさくなってきたのでざっくり(そのうちちゃんと書きます)
-
ActionCable::Connection::Base#on_message
- とりあえずバッファに突っ込む
ActionCable::Connection::MessageBuffer#append
ActionCable::Connection::MessageBuffer#receive
- バッファからConnectionに投げ返す
ActionCable::Connection::Base#receive
-
ActionCable::Connection::Subscriptions#execute_command
-
data['command'] == 'subscribe'
- このタイミングで
ActionCable::Channels::Base
のインスタンスを作っておく ActionCable::Connection::Subscriptions#add
- このタイミングで
-
data['command'] == 'message'
ActionCable::Connection::Subscriptions#perform_action
ActionCable::Channel::Base#perform_action
-
ActionCable::Channel::Base
サブクラスのメソッドを起動する
-