7
5

More than 5 years have passed since last update.

ActionCableコードリーディングその3

Last updated at Posted at 2016-01-02

その1
その2

概要

ActionCableのコードリーディングをしつつメモ書き。
今回はWebSocketに対して登録されたイベントハンドラから見ていきます。

ActionCable::Connection::Base#process

その1でも見ましたが、WebSocketへのイベントハンドラの登録はここで行われています。

# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
def process
  logger.info started_request_message

  if websocket.possible? && allow_request_origin?
    websocket.on(:open)    { |event| send_async :on_open   }
    websocket.on(:message) { |event| on_message event.data }
    websocket.on(:close)   { |event| send_async :on_close  }

    respond_to_successful_request
  else
    respond_to_invalid_request
  end
end

ActionCable::Connection::Base#send_async

send_asyncでは、worker_poolのasyncを呼び出しています。

# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
  worker_pool.async.invoke(self, method, *arguments)
end

worker_poolの中身はActionCable::Server::Worker#poolの戻り値です。
コメントにもある通り、configのworker_pool_sizeでこのサイズを変更することが出来ます。

# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
  @worker_pool ||= ActionCable::Server::Worker.pool(size: config.worker_pool_size)
end

このpoolメソッドや、send_asyncで呼び出しているasyncメソッドはActionCable::Server::WorkerがincludeしているCelluloidのメソッドです。

Celluloid

CelluloidはactorっぽいことをRubyでするためのフレームワークです。
actorについては先日のRubyKaigi2015のセッションでも紹介されていましたね。
(スライドがSpeakerDeckにアップされていました ⇛ https://speakerdeck.com/m_seki/actor-thread-and-me-rubykaigi2015)

Celluloidについてはこちらの記事が分かりやすかったです。
Celluloidのはなし

Celluloid#pool

poolについてはGithubのwikiに説明が載っていました。
delegate先のproxy objectであるworkerのpoolを作るメソッド、ということですかね……。

ActionCable::Server::Worker#invoke

def invoke(receiver, method, *args)
  @connection = receiver

  run_callbacks :work do
    receiver.send method, *args
  end
rescue Exception => e
  # ...

invokeはcallbackを呼んで、connectionに対してsendしてるだけです。
sendされるメソッドはon_openとon_closeでしたね。
つまり、connectionのon_open / on_closeが呼び出されます。

def on_open
  connect if respond_to?(:connect)
  subscribe_to_internal_channel
  beat

  message_buffer.process!
  server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
  respond_to_invalid_request
end

# ...

def on_close
  logger.info finished_request_message

  server.remove_connection(self)

  subscriptions.unsubscribe_from_all
  unsubscribe_from_internal_channel

  disconnect if respond_to?(:disconnect)
end

on_openの方は、MessageBuffer#process!を呼んでserverに自身を追加。
on_closeの方はchannelをremoveして、subscribeを解除してdisconnectしてます。

ActionCable::Connection::MessageBuffer#process!

def process!
  @processing = true
  receive_buffered_messages
end
def receive_buffered_messages
  receive buffered_messages.shift until buffered_messages.empty?
end
def receive(message)
  connection.send_async :receive, message
end

MessageBuffer#process!が呼ばれると、bufferされていたmessageがなくなるまでconnectionのreceiveメソッドを非同期で呼び出すようになっています。

receiveメソッドで、execute_commandが呼び出され、コマンドパラメータに応じたメッセージの処理がなされます。

def receive(data_in_json)
  if websocket.alive?
    subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
  else
    logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
  end
end

ActionCable::Connection::Base#on_message

on_openとon_closeは終わりですが、最後にon_messageをエンドポイントまで見ていきましょう。

def on_message(message)
  message_buffer.append message
end

message_bufferにメッセージを追加しています。
MessageBuffer#appendではprocess!が呼ばれていればreceiveを、呼ばれていなければbufferを呼び出すようになっています。

def append(message)
  if valid? message
    if processing?
      receive message
    else
      buffer message
    end
  else
    connection.logger.error "Couldn't handle non-string message: #{message.class}"
  end
end

bufferは配列にmessageを追加しているだけですね。

def buffer(message)
  buffered_messages << message
end

まとめ

execute_commandが呼ばれるまでの流れを追うことが出来て、一通りコードを追えました。
あと気になってるのはidentified_byのあたりなので、その辺を追えたらなと思います。

7
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
7
5