ActionCableコードリーディングその1

  • 23
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

その2
その3

概要

ActionCableのコードリーディングをしつつメモ書き。
今回は初期化部分を見ていきます。

ActionCable::Server::Base#call

ActionCableはRails::Engineとして実装されているため、ここがエンドポイントになる。

def call(env)
  setup_heartbeat_timer
  config.connection_class.new(self, env).process
end

setup_heartbeat_timerはActionCable::Server::Connectionモジュールのメソッドです。

# WebSocket connection implementations differ on when they'll mark a connection as stale. We basically never want a connection to go stale, as you
# then can't rely on being able to receive and send to it. So there's a 3 second heartbeat running on all connections. If the beat fails, we automatically
# disconnect.
def setup_heartbeat_timer
  EM.next_tick do
    @heartbeat_timer ||= EventMachine.add_periodic_timer(BEAT_INTERVAL) do
      EM.next_tick { connections.map(&:beat) }
    end
  end
end

コメントに書いてある通り、3秒ごとにすべてのコネクションにheartbeatを送って、失敗したらdisconnectするメソッドですね。

ActionCable::Server::Configuration

configはActionCable::Server::Configurationを見ています。

cattr_accessor(:config, instance_accessor: true) { ActionCable::Server::Configuration.new }

connection_classはAppllicatinCable::Connectionのようです。

# ActionCable::Server::Configuration
def initialize
  @log_tags = []

  @connection_class  = ApplicationCable::Connection
  @worker_pool_size  = 100

  @channels_path = Rails.root.join('app/channels')

  @disable_request_forgery_protection = false
end

ApplicationCable::Connection

ApplicationCable::ConnectionはActionCable::Connection::Baseを継承しています。

module ApplicationCable
  class Connection < ActionCable::Connection::Base
  end
end

コンストラクタはこのようになっています。
serverにActionCable::Server::Baseが、
envにActionCable::Server::Base#callに渡されたenvがそのまま渡されます。
どうやらwebsocketの初期化もここで行っているようです。

def initialize(server, env)
  @server, @env = server, env

  @logger = new_tagged_logger

  @websocket      = ActionCable::Connection::WebSocket.new(env)
  @subscriptions  = ActionCable::Connection::Subscriptions.new(self)
  @message_buffer = ActionCable::Connection::MessageBuffer.new(self)

  @_internal_redis_subscriptions = nil
  @started_at = Time.now
end

ActionCable::Connection::Base#process

initializeの後に呼ばれるprocessメソッドは次のようになっています。

def process
  logger.info started_request_message

  if websocket.possible? && allow_request_origin?
    websocket.on(:open)    { |event| send_async :on_open   }
    websocket.on(:message) { |event| on_message event.data }
    websocket.on(:close)   { |event| send_async :on_close  }

    respond_to_successful_request
  else
    respond_to_invalid_request
  end
end

websocketが可能かどうか・request_originが許可されているかどうかをチェックして、成功・失敗を返すようです。
また、websocketへ on/message/close のイベントハンドラの登録もここで行われています。

respond_to_successful_requestの中身はrackのレスポンスを返すだけです。

def respond_to_successful_request
  websocket.rack_response
end

respond_to_invalid_requestの中身はwebsocketがまだ生きてたらcloseして404を返すようになっています。

def respond_to_invalid_request
  close if websocket.alive?

  logger.info finished_request_message
  [ 404, { 'Content-Type' => 'text/plain' }, [ 'Page not found' ] ]
end

ActionCable::Connection::WebSocket

ActionCableのwebsocketの部分はfeya-websocketというgemのラッパーとして実装されています。

require 'faye/websocket'

module ActionCable
  module Connection
    # Decorate the Faye::WebSocket with helpers we need.
    class WebSocket
      delegate :rack_response, :close, :on, to: :websocket

      def initialize(env)
        @websocket = Faye::WebSocket.websocket?(env) ? Faye::WebSocket.new(env) : nil
      end

      def possible?
        websocket
      end

      # ...

feya-websocketfeyaというシンプルなPub/Subメッセージングを行うためのgemのwebsocketの部分を切り出したgemだそうです。
単品で使ったことがありますが、簡単にwebsocketを実装できたので、websocketサーバ単独で動かすときは選択肢の1つになると思います。

先ほど出てきたpossible?メソッドはwebsocketのインスタンスがあるかどうか判定してるだけですね。
Rackから渡されたenvがwebsocketのrequestかどうか判定して、websocketでなければnilが設定されるようになっています。

ActionCable::Connection::Subscriptions

重要そうな名前なのでこちらも覗き見。

module ActionCable
  module Connection
    # Collection class for all the channel subscriptions established on a given connection. Responsible for routing incoming commands that arrive on
    # the connection to the proper channel. Should not be used directly by the user.
    class Subscriptions
      def initialize(connection)
        @connection = connection
        @subscriptions = {}
      end

コメントに書いてある通り、接続が確立しているすべてのchannel subscriptionsのコレクションクラスのようです。
クライアントから送信されたコマンドによって適切なチャンネルへとルーティングする責務を担うみたいです。

ActionCable::Connection::MessageBuffer

ついでにこちらも。

module ActionCable
  module Connection
    # Allows us to buffer messages received from the WebSocket before the Connection has been fully initialized and is ready to receive them.
    # Entirely internal operation and should not be used directly by the user.
    class MessageBuffer
      def initialize(connection)
        @connection = connection
        @buffered_messages = []
      end

少なくとも接続が初期化される前にはwebsocketからのメッセージをバッファリング・受信する準備が出来ているようにするためのクラス……ってことですかね。
中身見てもappendとかreceiveとかprocessとか、process_buffered_messagesといったメソッドがあるのでそれっぽいです。

今回のまとめ

ActionCableがリクエストを受け取り、WebSocketのconnectionが確立されるまでの流れを見ることが出来ました。
次はメッセージングやPubSubの部分を見ていけたらと思います。