ActionCableコードリーディングその2

  • 11
    いいね
  • 0
    コメント
この記事は最終更新日から1年以上が経過しています。

その1
その3

概要

ActionCableのコードリーディングをしつつメモ書き。

rails g channel channel_name [methods]

channelクラスを作成すると以下の様なファイルが生成されます。

# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading.
class RoomChannel < ApplicationCable::Channel
  def subscribed
    # stream_from "some_channel"
  end

  def unsubscribed
    # Any cleanup needed when channel is unsubscribed
  end
end

今回はstream_fromからの流れを追っていきます。

ActionCable::Channel#stream_from

# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
  # Hold off the confirmation until pubsub#subscribe is successful
  defer_subscription_confirmation!

  callback ||= default_stream_callback(broadcasting)
  streams << [ broadcasting, callback ]

  EM.next_tick do
    pubsub.subscribe(broadcasting, &callback).callback do |reply|
      transmit_subscription_confirmation
      logger.info "#{self.class.name} is streaming from #{broadcasting}"
    end
  end
end

stream_fromはコメントにあるとおり、broardcastingに渡された名前のpubsub queueからのstreamingを開始するメソッドです。
第二引数にcallbackを渡すことで、subscriberへの送信時に実行されるデフォルトのcallbackの代わりに利用することも出来るようです。

ActionCable::Channel#defer_subscription_confirmation!

defer_subscription_confirmation!は、defer_subscription_confirmationをtrueにするだけのメソッドです。
defer_subscription_confirmationについては、ActionCable::Channel::Baseのinitializeに説明がありました。

# When a channel is streaming via redis pubsub, we want to delay the confirmation
# transmission until redis pubsub subscription is confirmed.
@defer_subscription_confirmation = false

redisのpubsubからの確認が返ってくるまでsubscribeできたことを送信しないようにするためのプロパティのようです。

ActionCable::Channel#default_stream_callback

デフォルトのコールバックは以下のようになっています。

def default_stream_callback(broadcasting)
  -> (message) do
    transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
  end
end

messageをJSONでエンコードして送信するだけのようですね。

EM.next_tick

EM.next_tickは、渡したブロックをeventmachineの次のイテレートで実行するようにスケジューリングするメソッドとのことです。

pubsubはEM::Hiredisのコネクションから取得したpubsubクライアントのインスタンスです。
HiredisはRedisのCクライアントのラッパーとのこと。
subscribeメソッドを呼ぶことで、引数で渡したpubsub channelをsubscribeすることが出来ます。ブロックを渡すことでメッセージを受信した時のコールバックとして処理されます。

ActionCable::Channel#transmit_subscription_confirmation

このメソッドを呼ぶことでsubscribeされた確認をクライアントへと送信します。

def transmit_subscription_confirmation
  unless subscription_confirmation_sent?
    logger.info "#{self.class.name} is transmitting the subscription confirmation"
    connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
    @subscription_confirmation_sent = true
  end
end

connectionは初期化の際にコンストラクタへと渡されています。
(そういえば、そもそもChannelクラスはいつ初期化されてるんだろう……)
恐らくActionCable::Connection::Baseだとすると、transmitはwebsocketへと委譲されており、その中ではwebsocket.sendが呼ばれるようになっています。

# ActionCable::Connection::Base

# Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
def transmit(data)
  websocket.transmit data
end
# ActionCable::Connection::WebSocket
def transmit(data)
  websocket.send data
end

Channelの初期化タイミング

少し寄り道して初期化タイミング調査。
Channelの初期化はActionCable::Subscriptions#addでされていました。

def add(data)
  id_key = data['identifier']
  id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access

  subscription_klass = connection.server.channel_classes[id_options[:channel]]

  if subscription_klass
    subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
  else
    logger.error "Subscription class not found (#{data.inspect})"
  end
end

addはActionCable::Subscriptions#execute_commandで呼ばれています。

def execute_command(data)
  case data['command']
  when 'subscribe'   then add data
  when 'unsubscribe' then remove data
  when 'message'     then perform_action data
  else
    logger.error "Received unrecognized command in #{data.inspect}"
  end
rescue Exception => e
  logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
end

execute_commandはクライアント側から渡されたJSONの"command"キーの値に応じてサーバ側で処理をするメソッドです。
クライアントからsubscribeが要求された時にlib/channel以下に置かれたchannelクラスから探してきてaddするようですね。

stream_fromが呼ばれるタイミング

stream_fromが呼ばれるとどうなるかは追えましたが、じゃあそもそもいつ呼ばれてるのかを遡っていきます。

def subscribe_to_channel
  run_callbacks :subscribe do
    subscribed
  end

  if subscription_rejected?
    reject_subscription
  else
    transmit_subscription_confirmation unless defer_subscription_confirmation?
  end
end

ActionCable::Channel#subscribedはここで呼ばれています。
run_callbacksはset_callbackメソッドで登録されたコールバックを呼び出すActiveSupport::Callbacksのメソッドです。
先ほど出てきたdefer_subscription_confirmationがここでも使われています。

subscribe_to_channel自体はActionCable::Channelがinitializeされるタイミングで呼び出されていました。

def initialize(connection, identifier, params = {})
  @connection = connection
  @identifier = identifier
  @params     = params

  # When a channel is streaming via redis pubsub, we want to delay the confirmation
  # transmission until redis pubsub subscription is confirmed.
  @defer_subscription_confirmation = false

  @reject_subscription = nil
  @subscription_confirmation_sent = nil

  delegate_connection_identifiers
  subscribe_to_channel
end

Channelの初期化タイミングは先ほど出てきたとおり、クライアントから"subscribe"が要求された時です。

今回のまとめ

stream_fromから追っかけて行きましたが、次はクライアントからのエンドポイントからexecute_commandがどう呼ばれるかを追いかけたほうが理解が進みそうですね。
そういえばWebSocketに対して登録されたイベントハンドラの中身を追いかけてなかったので、ここを追いかけたほうがいいかもしれません。