ActionCableコードリーディングその3

  • 4
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

その1
その2

概要

ActionCableのコードリーディングをしつつメモ書き。
今回はWebSocketに対して登録されたイベントハンドラから見ていきます。

ActionCable::Connection::Base#process

その1でも見ましたが、WebSocketへのイベントハンドラの登録はここで行われています。

# Called by the server when a new WebSocket connection is established. This configures the callbacks intended for overwriting by the user.
# This method should not be called directly. Rely on the #connect (and #disconnect) callback instead.
def process
  logger.info started_request_message

  if websocket.possible? && allow_request_origin?
    websocket.on(:open)    { |event| send_async :on_open   }
    websocket.on(:message) { |event| on_message event.data }
    websocket.on(:close)   { |event| send_async :on_close  }

    respond_to_successful_request
  else
    respond_to_invalid_request
  end
end

ActionCable::Connection::Base#send_async

send_asyncでは、worker_poolのasyncを呼び出しています。

# Invoke a method on the connection asynchronously through the pool of thread workers.
def send_async(method, *arguments)
  worker_pool.async.invoke(self, method, *arguments)
end

worker_poolの中身はActionCable::Server::Worker#poolの戻り値です。
コメントにもある通り、configのworker_pool_sizeでこのサイズを変更することが出来ます。

# The thread worker pool for handling all the connection work on this server. Default size is set by config.worker_pool_size.
def worker_pool
  @worker_pool ||= ActionCable::Server::Worker.pool(size: config.worker_pool_size)
end

このpoolメソッドや、send_asyncで呼び出しているasyncメソッドはActionCable::Server::WorkerがincludeしているCelluloidのメソッドです。

Celluloid

CelluloidはactorっぽいことをRubyでするためのフレームワークです。
actorについては先日のRubyKaigi2015のセッションでも紹介されていましたね。
(スライドがSpeakerDeckにアップされていました ⇛ https://speakerdeck.com/m_seki/actor-thread-and-me-rubykaigi2015)

Celluloidについてはこちらの記事が分かりやすかったです。
Celluloidのはなし

Celluloid#pool

poolについてはGithubのwikiに説明が載っていました。
delegate先のproxy objectであるworkerのpoolを作るメソッド、ということですかね……。

ActionCable::Server::Worker#invoke

def invoke(receiver, method, *args)
  @connection = receiver

  run_callbacks :work do
    receiver.send method, *args
  end
rescue Exception => e
  # ...

invokeはcallbackを呼んで、connectionに対してsendしてるだけです。
sendされるメソッドはon_openとon_closeでしたね。
つまり、connectionのon_open / on_closeが呼び出されます。

def on_open
  connect if respond_to?(:connect)
  subscribe_to_internal_channel
  beat

  message_buffer.process!
  server.add_connection(self)
rescue ActionCable::Connection::Authorization::UnauthorizedError
  respond_to_invalid_request
end

# ...

def on_close
  logger.info finished_request_message

  server.remove_connection(self)

  subscriptions.unsubscribe_from_all
  unsubscribe_from_internal_channel

  disconnect if respond_to?(:disconnect)
end

on_openの方は、MessageBuffer#process!を呼んでserverに自身を追加。
on_closeの方はchannelをremoveして、subscribeを解除してdisconnectしてます。

ActionCable::Connection::MessageBuffer#process!

def process!
  @processing = true
  receive_buffered_messages
end
def receive_buffered_messages
  receive buffered_messages.shift until buffered_messages.empty?
end
def receive(message)
  connection.send_async :receive, message
end

MessageBuffer#process!が呼ばれると、bufferされていたmessageがなくなるまでconnectionのreceiveメソッドを非同期で呼び出すようになっています。

receiveメソッドで、execute_commandが呼び出され、コマンドパラメータに応じたメッセージの処理がなされます。

def receive(data_in_json)
  if websocket.alive?
    subscriptions.execute_command ActiveSupport::JSON.decode(data_in_json)
  else
    logger.error "Received data without a live WebSocket (#{data_in_json.inspect})"
  end
end

ActionCable::Connection::Base#on_message

on_openとon_closeは終わりですが、最後にon_messageをエンドポイントまで見ていきましょう。

def on_message(message)
  message_buffer.append message
end

message_bufferにメッセージを追加しています。
MessageBuffer#appendではprocess!が呼ばれていればreceiveを、呼ばれていなければbufferを呼び出すようになっています。

def append(message)
  if valid? message
    if processing?
      receive message
    else
      buffer message
    end
  else
    connection.logger.error "Couldn't handle non-string message: #{message.class}"
  end
end

bufferは配列にmessageを追加しているだけですね。

def buffer(message)
  buffered_messages << message
end

まとめ

execute_commandが呼ばれるまでの流れを追うことが出来て、一通りコードを追えました。
あと気になってるのはidentified_byのあたりなので、その辺を追えたらなと思います。