この記事について
Ruby On RailsにはActionCableというwebsocket通信をサポートするフレームワークがあります。このActioCableにはメッセージ処理を担うワーカースレッドが存在します。今回はこのワーカースレッドのパフォーマンスを可視化することをに取り組むためにソースコードを読み一つの方法を提示してみます
ActionCableにおける受信メッセージの非同期処理の仕組み
ActionCabe::Server::Worker
ActionCableには、ActionCabe::Server::Worker クラスがあり、websocker通信で受信したメッセージはこのクラスのインスタンスに渡され処理されます。ActionCable::Server::Workerは、 Concurrent::ThreadPoolExecutorインスタンスを保持し、受信したメッセージが非同期且つ並行的に処理される仕組みになっています。
module ActionCable
module Server
# Worker used by Server.send_async to do connection work in threads.
class Worker
def initialize(max_size: 5)
@executor = Concurrent::ThreadPoolExecutor.new(
name: "ActionCable",
min_threads: 1,
max_threads: max_size,
max_queue: 0,
)
end
end
end
end
config.action_cable.worker_pool_size
Concurrent::ThreadPoolExecutorの初期化時に設定するmax_threadsの値は、config.action_cable.worker_pool_size によって指定されます。この値はデフォルト値として 4 が設定されていますが以下のように、アプリケーションの設定として上書きすることが可能です
Rails.application.configure do
config.action_cable.worker_pool_size = ENV.fetch("RAILS_MAX_THREADS", 5)
end
ActionCableを用いたwebsocket通信において「パフォーマンスが出ない」要因は様々考えられますが、この worker_pool_size の変更時は、その中で受信したメッセージが処理しきれていないようなケースで、メッセージ処理の並行性が向上し、パフォーマンスが改善する可能性があります。
接続からメッセージ受信の流れとWorkerの呼び出しタイミング
ActionCable::Server::Base
ActionCableにおける、通信のエントリポイントはActionCable::Server::Baseです。websocket接続確立要求を受け取ったタイミングでコネクションオブジェクトが初期化され、::Websoket::Driverに渡され、websocket通信へのアップグレードが行われます。その後接続が破棄されるまで、このコネクションオブジェクトが保持されます。
def call(env)
return config.health_check_application.call(env) if env["PATH_INFO"] == config.health_check_path
setup_heartbeat_timer
config.connection_class.call.new(self, env).process
end
config.connection_class は、ActionCable::Engineで"ApplicationCable::Connection が指定されており、このコネクションオブジェクトのクラスはapp/channels/application_cable/channel.rb であることがわかります。
self.connection_class = -> { "ApplicationCable::Connection".safe_constantize || previous_connection_class.call }
このApplicationCable::Connection は、ActionCable::Conenction::Base を継承しており、フレームワークにおけるこのクラスの振る舞いはActionCable::Conenction::Base の実装を追うことで処理の流れを追うことができます
ActionCable::Connection::Base
ActionCable::Connection::Baseのインスタンスの初期化時にserver.worker_poolを取得、ActionCable::Connection::WebSocketを初期化します。
def initialize(server, env, coder: ActiveSupport::JSON)
@server, @env, @coder = server, env, coder
@worker_pool = server.worker_pool
@logger = new_tagged_logger
@websocket = ActionCable::Connection::WebSocket.new(env, self, event_loop)
@subscriptions = ActionCable::Connection::Subscriptions.new(self)
@message_buffer = ActionCable::Connection::MessageBuffer.new(self)
@_internal_subscriptions = nil
@started_at = Time.now
end
さらに、詳細は割愛しますが、ActionCable::Connection::Baseのインスタンスは
ActionCable::Connection::WebSocket、ActionCable::Connection::ClientSocketを経由して、::Websocket::Driverに渡されます。この::Websocket::Driverがクライアントと通信した結果をこのインスタンスに通知します。メッセージ受信した結果(この辺りも詳細割愛)、receiveメソッドが呼ばれます
def receive(websocket_message) # :nodoc:
send_async :dispatch_websocket_message, websocket_message
end
ここで呼ばれているsend_asyncを見てみるとworker_pool.async_invokeが呼ばれています。
def send_async(method, *arguments)
worker_pool.async_invoke(self, method, *arguments)
end
ここでActionCable::Server::Workerのasync_invokeを見てみましょう
ActionCable::Server::Worker
async_invokeは冒頭で述べた、Concurrent::ThreadPoolExecutorインスタンスである@executer
のpostを呼んでいます。ここで渡されてきたメッセージの処理が実行されます
ここれで
invokeに渡されているのは、receiverは ActionCable::Connection::Baseのインスタンス、methodは、dispatch_websocket_messageです。
def async_invoke(receiver, method, *args, connection: receiver, &block)
@executor.post do
invoke(receiver, method, *args, connection: connection, &block)
end
end
このpostを実行することで受信したメッセージが並行に処理されています。
ここで処理されるのは、ApplicationCable::Channelを継承して開発者が実装する個別のXXXXChannelクラスのメソッドです。
非同期処理のパフォーマンスを可視化してみる
Concurrent::ThreadPoolExecutorのキューサイズ
送信されるメッセージ数に対して、スレッドが足りない時や、個別のChanne実装重い処理がある場合にどんどん処理が遅れていきます。
通常では、ここにボトルネックがある場合にそれを可視化することができません。そこで、一つの可視化の方法として、Concurrent::ThreadPoolExecutorのキューサイズを監視することができます。Concurrent::ThreadPoolExecutorではpostで渡された処理がキューに詰められ、それをスレッドプールにある空きのあるスレッドが処理をしていく流れになっています。このキューサイズを取得するqueue_lengthというメソッドを定期的に確認しログに出力することでパフォーマンスを可視化することができます。
def queue_length
synchronize { @queue.length }
end
このキューにメッセージが溜まりがちなら遅延などが発生していると考えられ、スレッド数を増やしたりchannelの実装を見直すなどの取り組みを行うことができます。
キューサイズを可視化するための実装変更
ActionCable::Workerのasync_invokeを以下のようなモンキーパッチを当てることで、定期的にキューサイズを確認することができます。ただし、キューサイズの取得処理自体も排他制御がかかっているため、頻繁な呼び出しはパフォーマンスに影響が与えるので注意が必要です。
module ActionCable
module Worker
alias_method :original_async_invoke, :async_invoke
def async_invoke(receiver, method, *args, connection: receiver, &block)
@last_log_time ||= Time.now
if Time.now - @last_log_time > 60 # 60秒経過したか確認
queue_size = @executer.respond_to?(:queue_length) ? @executer.queue_length : nil
# ここでキューサイズをログ出力
puts "Current queue size: #{queue_size}" if queue_size
@last_log_time = Time.now
end
original_async_invoke(receiver, method, *args, connection: connection, &block)
end
end
end
まとめ
Concurrent::ThreadPoolExecutorのキューサイズを監視することで、ワーカースレッドのパフォーマンスの可視化をする一つの方法を説明してみました。