概要
ActionCableのコードリーディングをしつつメモ書き。
今回は初期化部分を見ていきます。
ActionCable::Server::Base#call
ActionCableはRails::Engineとして実装されているため、ここがエンドポイントになる。
def call(env)
setup_heartbeat_timer
config.connection_class.new(self, env).process
end
setup_heartbeat_timerはActionCable::Server::Connectionモジュールのメソッドです。
# WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
# then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
EM.next_tick do
@heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
EM.next_tick { connections.map(&:beat) }
end
end
end
コメントに書いてある通り、3秒ごとにすべてのコネクションにheartbeatを送って、失敗したらdisconnectするメソッドですね。
ActionCable::Server::Configuration
configはActionCable::Server::Configurationを見ています。
cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new }
connection_classはAppllicatinCable::Connectionのようです。
# ActionCable::Server::Configuration
def initialize
@log_tags = []
@connection_class = ApplicationCable::Connection
@worker_pool_size = 100
@channels_path = Rails.root.join('app/channels')
@disable_request_forgery_protection = false
end
ApplicationCable::Connection
ApplicationCable::ConnectionはActionCable::Connection::Baseを継承しています。
module ApplicationCable
class Connection < ActionCable::Connection::Base
end
end
コンストラクタはこのようになっています。
serverにActionCable::Server::Baseが、
envにActionCable::Server::Base#callに渡されたenvがそのまま渡されます。
どうやらwebsocketの初期化もここで行っているようです。
def initialize(server, env)
@server, @env = server, env
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@_internal_redis_subscriptions = nil
@started_at = Time.now
end
ActionCable::Connection::Base#process
initializeの後に呼ばれるprocessメソッドは次のようになっています。
def process
logger.info started_request_message
if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }
respond_to_successful_request
else
respond_to_invalid_request
end
end
websocketが可能かどうか・request_originが許可されているかどうかをチェックして、成功・失敗を返すようです。
また、websocketへ on/message/close のイベントハンドラの登録もここで行われています。
respond_to_successful_requestの中身はrackのレスポンスを返すだけです。
def respond_to_successful_request
websocket.rack_response
end
respond_to_invalid_requestの中身はwebsocketがまだ生きてたらcloseして404を返すようになっています。
def respond_to_invalid_request
close if websocket.alive?
logger.info finished_request_message
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end
ActionCable::Connection::WebSocket
ActionCableのwebsocketの部分はfeya-websocketというgemのラッパーとして実装されています。
require 'faye/websocket'
module ActionCable
module Connection
# Decorate the Faye::WebSocket with helpers we need.
class WebSocket
delegate :rack_response, :close, :on, to: :websocket
def initialize(env)
@websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
end
def possible?
websocket
end
# ...
feya-websocketはfeyaというシンプルなPub/Subメッセージングを行うためのgemのwebsocketの部分を切り出したgemだそうです。
単品で使ったことがありますが、簡単にwebsocketを実装できたので、websocketサーバ単独で動かすときは選択肢の1つになると思います。
先ほど出てきたpossible?メソッドはwebsocketのインスタンスがあるかどうか判定してるだけですね。
Rackから渡されたenvがwebsocketのrequestかどうか判定して、websocketでなければnilが設定されるようになっています。
ActionCable::Connection::Subscriptions
重要そうな名前なのでこちらも覗き見。
module ActionCable
module Connection
# Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
# the connection to the proper channel. Should not be used directly by the user.
class Subscriptions
def initialize(connection)
@connection = connection
@subscriptions = {}
end
コメントに書いてある通り、接続が確立しているすべてのchannel subscriptionsのコレクションクラスのようです。
クライアントから送信されたコマンドによって適切なチャンネルへとルーティングする責務を担うみたいです。
ActionCable::Connection::MessageBuffer
ついでにこちらも。
module ActionCable
module Connection
# Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
# Entirely internal operation and should not be used directly by the user.
class MessageBuffer
def initialize(connection)
@connection = connection
@buffered_messages = []
end
少なくとも接続が初期化される前にはwebsocketからのメッセージをバッファリング・受信する準備が出来ているようにするためのクラス……ってことですかね。
中身見てもappendとかreceiveとかprocessとか、process_buffered_messagesといったメソッドがあるのでそれっぽいです。
今回のまとめ
ActionCableがリクエストを受け取り、WebSocketのconnectionが確立されるまでの流れを見ることが出来ました。
次はメッセージングやPubSubの部分を見ていけたらと思います。