5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

PxDTAdvent Calendar 2024

Day 3

ActionCableのワーカースレッドのパフォーマンス可視化に取り組んでみる

Last updated at Posted at 2024-12-23

この記事について

Ruby On RailsにはActionCableというwebsocket通信をサポートするフレームワークがあります。このActioCableにはメッセージ処理を担うワーカースレッドが存在します。今回はこのワーカースレッドのパフォーマンスを可視化することをに取り組むためにソースコードを読み一つの方法を提示してみます

ActionCableにおける受信メッセージの非同期処理の仕組み

ActionCabe::Server::Worker

ActionCableには、ActionCabe::Server::Worker クラスがあり、websocker通信で受信したメッセージはこのクラスのインスタンスに渡され処理されます。ActionCable::Server::Workerは、 Concurrent::ThreadPoolExecutorインスタンスを保持し、受信したメッセージが非同期且つ並行的に処理される仕組みになっています。

/action_cable/server/worker.rb
module ActionCable
  module Server
    # Worker used by Server.send_async to do connection work in threads.
    class Worker 
       def initialize(max_size: 5)
        @executor = Concurrent::ThreadPoolExecutor.new(
          name: "ActionCable",
          min_threads: 1,
          max_threads: max_size,
          max_queue: 0,
        )
      end
   end
  end
end

config.action_cable.worker_pool_size

Concurrent::ThreadPoolExecutorの初期化時に設定するmax_threadsの値は、config.action_cable.worker_pool_size によって指定されます。この値はデフォルト値として 4 が設定されていますが以下のように、アプリケーションの設定として上書きすることが可能です

config/environments/production.rb

Rails.application.configure do
  config.action_cable.worker_pool_size = ENV.fetch("RAILS_MAX_THREADS", 5)
end

ActionCableを用いたwebsocket通信において「パフォーマンスが出ない」要因は様々考えられますが、この worker_pool_size の変更時は、その中で受信したメッセージが処理しきれていないようなケースで、メッセージ処理の並行性が向上し、パフォーマンスが改善する可能性があります。

接続からメッセージ受信の流れとWorkerの呼び出しタイミング

ActionCable::Server::Base

ActionCableにおける、通信のエントリポイントはActionCable::Server::Baseです。websocket接続確立要求を受け取ったタイミングでコネクションオブジェクトが初期化され、::Websoket::Driverに渡され、websocket通信へのアップグレードが行われます。その後接続が破棄されるまで、このコネクションオブジェクトが保持されます。

action_cable/server/base.rb
      def call(env)
        return config.health_check_application.call(env) if env["PATH_INFO"] == config.health_check_path
        setup_heartbeat_timer
        config.connection_class.call.new(self, env).process
      end

config.connection_class は、ActionCable::Engineで"ApplicationCable::Connection が指定されており、このコネクションオブジェクトのクラスはapp/channels/application_cable/channel.rb であることがわかります。

action_cable/engine.rb
self.connection_class = -> { "ApplicationCable::Connection".safe_constantize || previous_connection_class.call }

このApplicationCable::Connection は、ActionCable::Conenction::Base を継承しており、フレームワークにおけるこのクラスの振る舞いはActionCable::Conenction::Base の実装を追うことで処理の流れを追うことができます

ActionCable::Connection::Base

ActionCable::Connection::Baseのインスタンスの初期化時にserver.worker_poolを取得、ActionCable::Connection::WebSocketを初期化します。

action_cable/connection/base.rb
      def initialize(server, env, coder: ActiveSupport::JSON)
        @server, @env, @coder = server, env, coder

        @worker_pool = server.worker_pool
        @logger = new_tagged_logger

        @websocket      = ActionCable::Connection::WebSocket.new(env, self, event_loop)
        @subscriptions  = ActionCable::Connection::Subscriptions.new(self)
        @message_buffer = ActionCable::Connection::MessageBuffer.new(self)

        @_internal_subscriptions = nil
        @started_at = Time.now
      end

さらに、詳細は割愛しますが、ActionCable::Connection::Baseのインスタンスは
ActionCable::Connection::WebSocket、ActionCable::Connection::ClientSocketを経由して、::Websocket::Driverに渡されます。この::Websocket::Driverがクライアントと通信した結果をこのインスタンスに通知します。メッセージ受信した結果(この辺りも詳細割愛)、receiveメソッドが呼ばれます

action_cable/connection/base.rb
      def receive(websocket_message) # :nodoc:
        send_async :dispatch_websocket_message, websocket_message
      end

ここで呼ばれているsend_asyncを見てみるとworker_pool.async_invokeが呼ばれています。

action_cable/connection/base.rb
      def send_async(method, *arguments)
        worker_pool.async_invoke(self, method, *arguments)
      end

ここでActionCable::Server::Workerのasync_invokeを見てみましょう

ActionCable::Server::Worker

async_invokeは冒頭で述べた、Concurrent::ThreadPoolExecutorインスタンスである@executerのpostを呼んでいます。ここで渡されてきたメッセージの処理が実行されます
ここれで
invokeに渡されているのは、receiverは ActionCable::Connection::Baseのインスタンス、methodは、dispatch_websocket_messageです。

action_cable/server/worker.rb
      def async_invoke(receiver, method, *args, connection: receiver, &block)
        @executor.post do
          invoke(receiver, method, *args, connection: connection, &block)
        end
      end

このpostを実行することで受信したメッセージが並行に処理されています。
ここで処理されるのは、ApplicationCable::Channelを継承して開発者が実装する個別のXXXXChannelクラスのメソッドです。

非同期処理のパフォーマンスを可視化してみる

Concurrent::ThreadPoolExecutorのキューサイズ

送信されるメッセージ数に対して、スレッドが足りない時や、個別のChanne実装重い処理がある場合にどんどん処理が遅れていきます。
通常では、ここにボトルネックがある場合にそれを可視化することができません。そこで、一つの可視化の方法として、Concurrent::ThreadPoolExecutorのキューサイズを監視することができます。Concurrent::ThreadPoolExecutorではpostで渡された処理がキューに詰められ、それをスレッドプールにある空きのあるスレッドが処理をしていく流れになっています。このキューサイズを取得するqueue_lengthというメソッドを定期的に確認しログに出力することでパフォーマンスを可視化することができます。

concurrent-ruby/concurrent/executor/ruby_thread_pool_executor.rb
    def queue_length
      synchronize { @queue.length }
    end

このキューにメッセージが溜まりがちなら遅延などが発生していると考えられ、スレッド数を増やしたりchannelの実装を見直すなどの取り組みを行うことができます。

キューサイズを可視化するための実装変更

ActionCable::Workerのasync_invokeを以下のようなモンキーパッチを当てることで、定期的にキューサイズを確認することができます。ただし、キューサイズの取得処理自体も排他制御がかかっているため、頻繁な呼び出しはパフォーマンスに影響が与えるので注意が必要です。

module ActionCable
  module Worker
    alias_method :original_async_invoke, :async_invoke

    def async_invoke(receiver, method, *args, connection: receiver, &block)
      @last_log_time ||= Time.now

      if Time.now - @last_log_time > 60 # 60秒経過したか確認
        queue_size = @executer.respond_to?(:queue_length) ? @executer.queue_length : nil
        # ここでキューサイズをログ出力
        puts "Current queue size: #{queue_size}" if queue_size
        @last_log_time = Time.now
      end

      original_async_invoke(receiver, method, *args, connection: connection, &block)
    end
  end
end

まとめ

Concurrent::ThreadPoolExecutorのキューサイズを監視することで、ワーカースレッドのパフォーマンスの可視化をする一つの方法を説明してみました。

5
0
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
0

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?