概要
ActionCableのコードリーディングをしつつメモ書き。
rails g channel channel_name [methods]
channelクラスを作成すると以下の様なファイルが生成されます。
# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading.
class RoomChannel < ApplicationCable::Channel
def subscribed
# stream_from "some_channel"
end
def unsubscribed
# Any cleanup needed when channel is unsubscribed
end
end
今回はstream_fromからの流れを追っていきます。
ActionCable::Channel#stream_from
# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used
# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
# Hold off the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!
callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]
EM.next_tick do
pubsub.subscribe(broadcasting, &callback).callback do |reply|
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end
end
end
stream_fromはコメントにあるとおり、broardcastingに渡された名前のpubsub queueからのstreamingを開始するメソッドです。
第二引数にcallbackを渡すことで、subscriberへの送信時に実行されるデフォルトのcallbackの代わりに利用することも出来るようです。
ActionCable::Channel#defer_subscription_confirmation!
defer_subscription_confirmation!は、defer_subscription_confirmationをtrueにするだけのメソッドです。
defer_subscription_confirmationについては、ActionCable::Channel::Baseのinitializeに説明がありました。
# When a channel is streaming via redis pubsub, we want to delay the confirmation
# transmission until redis pubsub subscription is confirmed.
@defer_subscription_confirmation = false
redisのpubsubからの確認が返ってくるまでsubscribeできたことを送信しないようにするためのプロパティのようです。
ActionCable::Channel#default_stream_callback
デフォルトのコールバックは以下のようになっています。
def default_stream_callback(broadcasting)
-> (message) do
transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
end
end
messageをJSONでエンコードして送信するだけのようですね。
EM.next_tick
EM.next_tickは、渡したブロックをeventmachineの次のイテレートで実行するようにスケジューリングするメソッドとのことです。
pubsubはEM::Hiredisのコネクションから取得したpubsubクライアントのインスタンスです。
HiredisはRedisのCクライアントのラッパーとのこと。
subscribeメソッドを呼ぶことで、引数で渡したpubsub channelをsubscribeすることが出来ます。ブロックを渡すことでメッセージを受信した時のコールバックとして処理されます。
ActionCable::Channel#transmit_subscription_confirmation
このメソッドを呼ぶことでsubscribeされた確認をクライアントへと送信します。
def transmit_subscription_confirmation
unless subscription_confirmation_sent?
logger.info "#{self.class.name} is transmitting the subscription confirmation"
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
@subscription_confirmation_sent = true
end
end
connectionは初期化の際にコンストラクタへと渡されています。
(そういえば、そもそもChannelクラスはいつ初期化されてるんだろう……)
恐らくActionCable::Connection::Baseだとすると、transmitはwebsocketへと委譲されており、その中ではwebsocket.sendが呼ばれるようになっています。
# ActionCable::Connection::Base
# Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
def transmit(data)
websocket.transmit data
end
# ActionCable::Connection::WebSocket
def transmit(data)
websocket.send data
end
Channelの初期化タイミング
少し寄り道して初期化タイミング調査。
Channelの初期化はActionCable::Subscriptions#addでされていました。
def add(data)
id_key = data['identifier']
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access
subscription_klass = connection.server.channel_classes[id_options[:channel]]
if subscription_klass
subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
else
logger.error "Subscription class not found (#{data.inspect})"
end
end
addはActionCable::Subscriptions#execute_commandで呼ばれています。
def execute_command(data)
case data['command']
when 'subscribe' then add data
when 'unsubscribe' then remove data
when 'message' then perform_action data
else
logger.error "Received unrecognized command in #{data.inspect}"
end
rescue Exception => e
logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
end
execute_commandはクライアント側から渡されたJSONの"command"キーの値に応じてサーバ側で処理をするメソッドです。
クライアントからsubscribeが要求された時にlib/channel以下に置かれたchannelクラスから探してきてaddするようですね。
stream_fromが呼ばれるタイミング
stream_fromが呼ばれるとどうなるかは追えましたが、じゃあそもそもいつ呼ばれてるのかを遡っていきます。
def subscribe_to_channel
run_callbacks :subscribe do
subscribed
end
if subscription_rejected?
reject_subscription
else
transmit_subscription_confirmation unless defer_subscription_confirmation?
end
end
ActionCable::Channel#subscribedはここで呼ばれています。
run_callbacksはset_callbackメソッドで登録されたコールバックを呼び出すActiveSupport::Callbacksのメソッドです。
先ほど出てきたdefer_subscription_confirmationがここでも使われています。
subscribe_to_channel自体はActionCable::Channelがinitializeされるタイミングで呼び出されていました。
def initialize(connection, identifier, params = {})
@connection = connection
@identifier = identifier
@params = params
# When a channel is streaming via redis pubsub, we want to delay the confirmation
# transmission until redis pubsub subscription is confirmed.
@defer_subscription_confirmation = false
@reject_subscription = nil
@subscription_confirmation_sent = nil
delegate_connection_identifiers
subscribe_to_channel
end
Channelの初期化タイミングは先ほど出てきたとおり、クライアントから"subscribe"が要求された時です。
今回のまとめ
stream_fromから追っかけて行きましたが、次はクライアントからのエンドポイントからexecute_commandがどう呼ばれるかを追いかけたほうが理解が進みそうですね。
そういえばWebSocketに対して登録されたイベントハンドラの中身を追いかけてなかったので、ここを追いかけたほうがいいかもしれません。