Ruby
Rails
ActionCable

ActionCableコードリーディングその2

More than 3 years have passed since last update.

その1

その3


概要

ActionCableのコードリーディングをしつつメモ書き。


rails g channel channel_name [methods]

channelクラスを作成すると以下の様なファイルが生成されます。

# Be sure to restart your server when you modify this file. Action Cable runs in an EventMachine loop that does not support auto reloading.

class RoomChannel < ApplicationCable::Channel
def subscribed
# stream_from "some_channel"
end

def unsubscribed
# Any cleanup needed when channel is unsubscribed
end
end

今回はstream_fromからの流れを追っていきます。


ActionCable::Channel#stream_from

# Start streaming from the named <tt>broadcasting</tt> pubsub queue. Optionally, you can pass a <tt>callback</tt> that'll be used

# instead of the default of just transmitting the updates straight to the subscriber.
def stream_from(broadcasting, callback = nil)
# Hold off the confirmation until pubsub#subscribe is successful
defer_subscription_confirmation!

callback ||= default_stream_callback(broadcasting)
streams << [ broadcasting, callback ]

EM.next_tick do
pubsub.subscribe(broadcasting, &callback).callback do |reply|
transmit_subscription_confirmation
logger.info "#{self.class.name} is streaming from #{broadcasting}"
end
end
end

stream_fromはコメントにあるとおり、broardcastingに渡された名前のpubsub queueからのstreamingを開始するメソッドです。

第二引数にcallbackを渡すことで、subscriberへの送信時に実行されるデフォルトのcallbackの代わりに利用することも出来るようです。


ActionCable::Channel#defer_subscription_confirmation!

defer_subscription_confirmation!は、defer_subscription_confirmationをtrueにするだけのメソッドです。

defer_subscription_confirmationについては、ActionCable::Channel::Baseのinitializeに説明がありました。

# When a channel is streaming via redis pubsub, we want to delay the confirmation

# transmission until redis pubsub subscription is confirmed.
@defer_subscription_confirmation = false

redisのpubsubからの確認が返ってくるまでsubscribeできたことを送信しないようにするためのプロパティのようです。


ActionCable::Channel#default_stream_callback

デフォルトのコールバックは以下のようになっています。

def default_stream_callback(broadcasting)

-> (message) do
transmit ActiveSupport::JSON.decode(message), via: "streamed from #{broadcasting}"
end
end

messageをJSONでエンコードして送信するだけのようですね。


EM.next_tick

EM.next_tickは、渡したブロックをeventmachineの次のイテレートで実行するようにスケジューリングするメソッドとのことです。

pubsubはEM::Hiredisのコネクションから取得したpubsubクライアントのインスタンスです。

HiredisはRedisのCクライアントのラッパーとのこと。

subscribeメソッドを呼ぶことで、引数で渡したpubsub channelをsubscribeすることが出来ます。ブロックを渡すことでメッセージを受信した時のコールバックとして処理されます。


ActionCable::Channel#transmit_subscription_confirmation

このメソッドを呼ぶことでsubscribeされた確認をクライアントへと送信します。

def transmit_subscription_confirmation

unless subscription_confirmation_sent?
logger.info "#{self.class.name} is transmitting the subscription confirmation"
connection.transmit ActiveSupport::JSON.encode(identifier: @identifier, type: ActionCable::INTERNAL[:message_types][:confirmation])
@subscription_confirmation_sent = true
end
end

connectionは初期化の際にコンストラクタへと渡されています。

(そういえば、そもそもChannelクラスはいつ初期化されてるんだろう……)

恐らくActionCable::Connection::Baseだとすると、transmitはwebsocketへと委譲されており、その中ではwebsocket.sendが呼ばれるようになっています。

# ActionCable::Connection::Base

# Send raw data straight back down the WebSocket. This is not intended to be called directly. Use the #transmit available on the
# Channel instead, as that'll automatically address the correct subscriber and wrap the message in JSON.
def transmit(data)
websocket.transmit data
end

# ActionCable::Connection::WebSocket

def transmit(data)
websocket.send data
end


Channelの初期化タイミング

少し寄り道して初期化タイミング調査。

Channelの初期化はActionCable::Subscriptions#addでされていました。

def add(data)

id_key = data['identifier']
id_options = ActiveSupport::JSON.decode(id_key).with_indifferent_access

subscription_klass = connection.server.channel_classes[id_options[:channel]]

if subscription_klass
subscriptions[id_key] ||= subscription_klass.new(connection, id_key, id_options)
else
logger.error "Subscription class not found (#{data.inspect})"
end
end

addはActionCable::Subscriptions#execute_commandで呼ばれています。

def execute_command(data)

case data['command']
when 'subscribe' then add data
when 'unsubscribe' then remove data
when 'message' then perform_action data
else
logger.error "Received unrecognized command in #{data.inspect}"
end
rescue Exception => e
logger.error "Could not execute command from #{data.inspect}) [#{e.class} - #{e.message}]: #{e.backtrace.first(5).join(" | ")}"
end

execute_commandはクライアント側から渡されたJSONの"command"キーの値に応じてサーバ側で処理をするメソッドです。

クライアントからsubscribeが要求された時にlib/channel以下に置かれたchannelクラスから探してきてaddするようですね。


stream_fromが呼ばれるタイミング

stream_fromが呼ばれるとどうなるかは追えましたが、じゃあそもそもいつ呼ばれてるのかを遡っていきます。

def subscribe_to_channel

run_callbacks :subscribe do
subscribed
end

if subscription_rejected?
reject_subscription
else
transmit_subscription_confirmation unless defer_subscription_confirmation?
end
end

ActionCable::Channel#subscribedはここで呼ばれています。

run_callbacksはset_callbackメソッドで登録されたコールバックを呼び出すActiveSupport::Callbacksのメソッドです。

先ほど出てきたdefer_subscription_confirmationがここでも使われています。

subscribe_to_channel自体はActionCable::Channelがinitializeされるタイミングで呼び出されていました。

def initialize(connection, identifier, params = {})

@connection = connection
@identifier = identifier
@params = params

# When a channel is streaming via redis pubsub, we want to delay the confirmation
# transmission until redis pubsub subscription is confirmed.
@defer_subscription_confirmation = false

@reject_subscription = nil
@subscription_confirmation_sent = nil

delegate_connection_identifiers
subscribe_to_channel
end

Channelの初期化タイミングは先ほど出てきたとおり、クライアントから"subscribe"が要求された時です。


今回のまとめ

stream_fromから追っかけて行きましたが、次はクライアントからのエンドポイントからexecute_commandがどう呼ばれるかを追いかけたほうが理解が進みそうですね。

そういえばWebSocketに対して登録されたイベントハンドラの中身を追いかけてなかったので、ここを追いかけたほうがいいかもしれません。