概要
ActionCableのコードリーディングをしつつメモ書き。
今回はWebSocketに対して登録されたイベントハンドラから見ていきます。
ActionCable::Connection::Base#process
その1でも見ましたが、WebSocketへのイベントハンドラの登録はここで行われています。
# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
def process
logger.info started_request_message
if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }
respond_to_successful_request
else
respond_to_invalid_request
end
end
ActionCable::Connection::Base#send_async
send_asyncでは、worker_poolのasyncを呼び出しています。
# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
worker_pool.async.invoke(self, method, *arguments)
end
worker_poolの中身はActionCable::Server::Worker#poolの戻り値です。
コメントにもある通り、configのworker_pool_sizeでこのサイズを変更することが出来ます。
# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
@worker_pool ||= ActionCable::Server::Worker.pool(size: config.worker_pool_size)
end
このpoolメソッドや、send_asyncで呼び出しているasyncメソッドはActionCable::Server::WorkerがincludeしているCelluloidのメソッドです。
Celluloid
CelluloidはactorっぽいことをRubyでするためのフレームワークです。
actorについては先日のRubyKaigi2015のセッションでも紹介されていましたね。
(スライドがSpeakerDeckにアップされていました ⇛ https://speakerdeck.com/m_seki/actor-thread-and-me-rubykaigi2015)
Celluloidについてはこちらの記事が分かりやすかったです。
Celluloidのはなし
Celluloid#pool
poolについてはGithubのwikiに説明が載っていました。
delegate先のproxy objectであるworkerのpoolを作るメソッド、ということですかね……。
ActionCable::Server::Worker#invoke
def invoke(receiver, method, *args)
@connection = receiver
run_callbacks :work do
receiver.send method, *args
end
rescue Exception => e
# ...
invokeはcallbackを呼んで、connectionに対してsendしてるだけです。
sendされるメソッドはon_openとon_closeでしたね。
つまり、connectionのon_open / on_closeが呼び出されます。
def on_open
connect if respond_to?(:connect)
subscribe_to_internal_channel
beat
message_buffer.process!
server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
respond_to_invalid_request
end
# ...
def on_close
logger.info finished_request_message
server.remove_connection(self)
subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel
disconnect if respond_to?(:disconnect)
end
on_openの方は、MessageBuffer#process!を呼んでserverに自身を追加。
on_closeの方はchannelをremoveして、subscribeを解除してdisconnectしてます。
ActionCable::Connection::MessageBuffer#process!
def process!
@processing = true
receive_buffered_messages
end
def receive_buffered_messages
receive buffered_messages.shift until buffered_messages.empty?
end
def receive(message)
connection.send_async :receive, message
end
MessageBuffer#process!が呼ばれると、bufferされていたmessageがなくなるまでconnectionのreceiveメソッドを非同期で呼び出すようになっています。
receiveメソッドで、execute_commandが呼び出され、コマンドパラメータに応じたメッセージの処理がなされます。
def receive(data_in_json)
if websocket.alive?
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
else
logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
end
end
ActionCable::Connection::Base#on_message
on_openとon_closeは終わりですが、最後にon_messageをエンドポイントまで見ていきましょう。
def on_message(message)
message_buffer.append message
end
message_bufferにメッセージを追加しています。
MessageBuffer#appendではprocess!が呼ばれていればreceiveを、呼ばれていなければbufferを呼び出すようになっています。
def append(message)
if valid? message
if processing?
receive message
else
buffer message
end
else
connection.logger.error "Couldn't handle non-string message: #{message.class}"
end
end
bufferは配列にmessageを追加しているだけですね。
def buffer(message)
buffered_messages << message
end
まとめ
execute_commandが呼ばれるまでの流れを追うことが出来て、一通りコードを追えました。
あと気になってるのはidentified_byのあたりなので、その辺を追えたらなと思います。