Ruby
Rails
ActionCable

ActionCableコードリーディングその1

More than 3 years have passed since last update.

その2

その3


概要

ActionCableのコードリーディングをしつつメモ書き。

今回は初期化部分を見ていきます。


ActionCable::Server::Base#call

ActionCableはRails::Engineとして実装されているため、ここがエンドポイントになる。

def call(env)

setup_heartbeat_timer
config.connection_class.new(self, env).process
end

setup_heartbeat_timerはActionCable::Server::Connectionモジュールのメソッドです。

# WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you

# then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
EM.next_tick do
@heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
EM.next_tick { connections.map(&:beat) }
end
end
end

コメントに書いてある通り、3秒ごとにすべてのコネクションにheartbeatを送って、失敗したらdisconnectするメソッドですね。


ActionCable::Server::Configuration

configはActionCable::Server::Configurationを見ています。

cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new }

connection_classはAppllicatinCable::Connectionのようです。

# ActionCable::Server::Configuration

def initialize
@log_tags = []

@connection_class = ApplicationCable::Connection
@worker_pool_size = 100

@channels_path = Rails.root.join('app/channels')

@disable_request_forgery_protection = false
end


ApplicationCable::Connection

ApplicationCable::ConnectionはActionCable::Connection::Baseを継承しています。

module ApplicationCable

class Connection < ActionCable::Connection::Base
end
end

コンストラクタはこのようになっています。

serverにActionCable::Server::Baseが、

envにActionCable::Server::Base#callに渡されたenvがそのまま渡されます。

どうやらwebsocketの初期化もここで行っているようです。

def initialize(server, env)

@server, @env = server, env

@logger = new_tagged_logger

@websocket = ActionCable::Connection::WebSocket.new(env)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)

@_internal_redis_subscriptions = nil
@started_at = Time.now
end


ActionCable::Connection::Base#process

initializeの後に呼ばれるprocessメソッドは次のようになっています。

def process

logger.info started_request_message

if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }

respond_to_successful_request
else
respond_to_invalid_request
end
end

websocketが可能かどうか・request_originが許可されているかどうかをチェックして、成功・失敗を返すようです。

また、websocketへ on/message/close のイベントハンドラの登録もここで行われています。

respond_to_successful_requestの中身はrackのレスポンスを返すだけです。

def respond_to_successful_request

websocket.rack_response
end

respond_to_invalid_requestの中身はwebsocketがまだ生きてたらcloseして404を返すようになっています。

def respond_to_invalid_request

close if websocket.alive?

logger.info finished_request_message
[ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end


ActionCable::Connection::WebSocket

ActionCableのwebsocketの部分はfeya-websocketというgemのラッパーとして実装されています。

require 'faye/websocket'

module ActionCable
module Connection
# Decorate the Faye::WebSocket with helpers we need.
class WebSocket
delegate :rack_response, :close, :on, to: :websocket

def initialize(env)
@websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
end

def possible?
websocket
end

# ...

feya-websocketfeyaというシンプルなPub/Subメッセージングを行うためのgemのwebsocketの部分を切り出したgemだそうです。

単品で使ったことがありますが、簡単にwebsocketを実装できたので、websocketサーバ単独で動かすときは選択肢の1つになると思います。

先ほど出てきたpossible?メソッドはwebsocketのインスタンスがあるかどうか判定してるだけですね。

Rackから渡されたenvがwebsocketのrequestかどうか判定して、websocketでなければnilが設定されるようになっています。


ActionCable::Connection::Subscriptions

重要そうな名前なのでこちらも覗き見。

module ActionCable

module Connection
# Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
# the connection to the proper channel. Should not be used directly by the user.
class Subscriptions
def initialize(connection)
@connection = connection
@subscriptions = {}
end

コメントに書いてある通り、接続が確立しているすべてのchannel subscriptionsのコレクションクラスのようです。

クライアントから送信されたコマンドによって適切なチャンネルへとルーティングする責務を担うみたいです。


ActionCable::Connection::MessageBuffer

ついでにこちらも。

module ActionCable

module Connection
# Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
# Entirely internal operation and should not be used directly by the user.
class MessageBuffer
def initialize(connection)
@connection = connection
@buffered_messages = []
end

少なくとも接続が初期化される前にはwebsocketからのメッセージをバッファリング・受信する準備が出来ているようにするためのクラス……ってことですかね。

中身見てもappendとかreceiveとかprocessとか、process_buffered_messagesといったメソッドがあるのでそれっぽいです。


今回のまとめ

ActionCableがリクエストを受け取り、WebSocketのconnectionが確立されるまでの流れを見ることが出来ました。

次はメッセージングやPubSubの部分を見ていけたらと思います。