Ruby
Rails
ActionCable

ActionCableコードリーディングその3

More than 3 years have passed since last update.

その1

その2


概要

ActionCableのコードリーディングをしつつメモ書き。

今回はWebSocketに対して登録されたイベントハンドラから見ていきます。


ActionCable::Connection::Base#process

その1でも見ましたが、WebSocketへのイベントハンドラの登録はここで行われています。

# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.

# This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
def process
logger.info started_request_message

if websocket.possible? && allow_request_origin?
websocket.on(:open) { |event| send_async :on_open }
websocket.on(:message) { |event| on_message event.data }
websocket.on(:close) { |event| send_async :on_close }

respond_to_successful_request
else
respond_to_invalid_request
end
end


ActionCable::Connection::Base#send_async

send_asyncでは、worker_poolのasyncを呼び出しています。

# Invoke a method on the connection asynchronously through the pool of thread workers.

def send_async(method, *arguments)
worker_pool.async.invoke(self, method, *arguments)
end

worker_poolの中身はActionCable::Server::Worker#poolの戻り値です。

コメントにもある通り、configのworker_pool_sizeでこのサイズを変更することが出来ます。

# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.

def worker_pool
@worker_pool ||= ActionCable::Server::Worker.pool(size: config.worker_pool_size)
end

このpoolメソッドや、send_asyncで呼び出しているasyncメソッドはActionCable::Server::WorkerがincludeしているCelluloidのメソッドです。


Celluloid

CelluloidはactorっぽいことをRubyでするためのフレームワークです。

actorについては先日のRubyKaigi2015のセッションでも紹介されていましたね。

(スライドがSpeakerDeckにアップされていました ⇛ https://speakerdeck.com/m_seki/actor-thread-and-me-rubykaigi2015)

Celluloidについてはこちらの記事が分かりやすかったです。

Celluloidのはなし


Celluloid#pool

poolについてはGithubのwikiに説明が載っていました。

delegate先のproxy objectであるworkerのpoolを作るメソッド、ということですかね……。


ActionCable::Server::Worker#invoke

def invoke(receiver, method, *args)

@connection = receiver

run_callbacks :work do
receiver.send method, *args
end
rescue Exception => e
# ...

invokeはcallbackを呼んで、connectionに対してsendしてるだけです。

sendされるメソッドはon_openとon_closeでしたね。

つまり、connectionのon_open / on_closeが呼び出されます。

def on_open

connect if respond_to?(:connect)
subscribe_to_internal_channel
beat

message_buffer.process!
server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
respond_to_invalid_request
end

# ...

def on_close
logger.info finished_request_message

server.remove_connection(self)

subscriptions.unsubscribe_from_all
unsubscribe_from_internal_channel

disconnect if respond_to?(:disconnect)
end

on_openの方は、MessageBuffer#process!を呼んでserverに自身を追加。

on_closeの方はchannelをremoveして、subscribeを解除してdisconnectしてます。


ActionCable::Connection::MessageBuffer#process!

def process!

@processing = true
receive_buffered_messages
end

def receive_buffered_messages

receive buffered_messages.shift until buffered_messages.empty?
end

def receive(message)

connection.send_async :receive, message
end

MessageBuffer#process!が呼ばれると、bufferされていたmessageがなくなるまでconnectionのreceiveメソッドを非同期で呼び出すようになっています。

receiveメソッドで、execute_commandが呼び出され、コマンドパラメータに応じたメッセージの処理がなされます。

def receive(data_in_json)

if websocket.alive?
subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
else
logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
end
end


ActionCable::Connection::Base#on_message

on_openとon_closeは終わりですが、最後にon_messageをエンドポイントまで見ていきましょう。

def on_message(message)

message_buffer.append message
end

message_bufferにメッセージを追加しています。

MessageBuffer#appendではprocess!が呼ばれていればreceiveを、呼ばれていなければbufferを呼び出すようになっています。

def append(message)

if valid? message
if processing?
receive message
else
buffer message
end
else
connection.logger.error "Couldn't handle non-string message: #{message.class}"
end
end

bufferは配列にmessageを追加しているだけですね。

def buffer(message)

buffered_messages << message
end


まとめ

execute_commandが呼ばれるまでの流れを追うことが出来て、一通りコードを追えました。

あと気になってるのはidentified_byのあたりなので、その辺を追えたらなと思います。