最近Channelを使ったアプリケーションを触っているので、APIをまとめました。
受信したイベントを処理するAPI
handle_in/3
channelにjoinしたクライアントからのメッセージを受信して何かしらの処理を行うときに定義します。
多いのは、トピックをサブスクライブしているクライアント全てにメッセージを送るケースだと思います。
その場合は、broadcast!/3
を使います。
def handle_in("new_msg", msg, socket) do
broadcast!(socket, "new_msg", %{"name" => msg["name"], body: msg["body"]})
{:reply, {:ok, msg["body"]}, socket}
end
{:reply, {:ok, response}, socket}
でリクエストを送信したクライアントに処理結果を返却します。
返却する必要がない場合は、{:noreply, socket}
を戻り値に指定します。
全体にブロードキャストする以外に、ソケットに接続しているユーザーにだけ直接メッセージを送ることもできます。push/3
を使います。
def handle_in("push_event", msg, socket) do
push(socket, "push_event", %{body: "push message"})
{:noreply, socket}
end
また、自分以外のクライアントにブロードキャストできるbroadcast_from/4
もあります。自身にメッセージを通知したくない場合に使います。
handle_out/3
ブロードキャストされたメッセージをインターセプトすることができます。メッセージを加工して届けたい場合に使用することが多いです。
handle_out
の宣言の前にintercept ["event_name"]
でインターセプトするイベントを指定します。
intercept ["new_msg"]
def handle_out("new_msg", payload, socket) do
push(socket, "new_msg", %{payload|body: payload.body <> " hello world!"})
{:noreply, socket}
end
上記の場合、broadcast(socket, "new_msg", message)
でブロードキャストしたメッセージをクライアントに送信する前にインターセプトして、そのソケットに接続しているユーザーにだけ加工したメッセージを送信します。
Endpoint.broadcast
自分がsubscribeしていないトピックを指定して、broadcastすることもできます。YourApp.Endpoint.broadcast/3
もしくはYourAppName.Endpoint.broadcast_from/4
を使います。
def handle_in("to_other_topic_client", msg, socket) do
YourAppName.Endpoint.broadcast_from!(self(), "rooms:othertopic", "new_msg", %{user: msg["user"], body: msg["body"]})
{:noreply, socket}
end
rooms:othertopic
をサブスクライブしているクライアントに対してメッセージをブロードキャストしています。
APIサーバなどChannel以外から、直接トピックを指定してメッセージをブロードキャストする場合にもこの方法で行います。
Channelのコード
Phoenix.Channel
の中がどうなっているのか少し見てみます。
コールバック関数
Phoenix.Channel
:
Public APIはコールバック関数として宣言されています。
defmodule Phoenix.Channel do
# 略
@callback join(topic :: binary, auth_msg :: map, Socket.t) ::
{:ok, Socket.t} |
{:ok, map, Socket.t} |
{:error, map}
@callback handle_in(event :: String.t, msg :: map, Socket.t) ::
{:noreply, Socket.t} |
{:reply, reply, Socket.t} |
{:stop, reason :: term, Socket.t} |
{:stop, reason :: term, reply, Socket.t}
@callback handle_info(term, Socket.t) ::
{:noreply, Socket.t} |
{:stop, reason :: term, Socket.t}
そして、__using__
の中にデフォルト実装があります。
これで、自分のプロジェクトのChannelモジュールでuse Phoenix.Channel
とすることでこれらの関数が実装されます。
# 略
defmacro __using__(_) do
quote do
@behaviour unquote(__MODULE__)
@on_definition unquote(__MODULE__)
@before_compile unquote(__MODULE__)
@phoenix_intercepts []
import unquote(__MODULE__)
import Phoenix.Socket, only: [assign: 3]
def code_change(_old, socket, _extra), do: {:ok, socket}
def handle_in(_event, _message, socket) do
{:noreply, socket}
end
def handle_info(_message, socket), do: {:noreply, socket}
def terminate(_reason, _socket), do: :ok
defoverridable code_change: 3, handle_info: 2, handle_in: 3, terminate: 2
end
end
broadcast関数
broadcast関数は以下のように実装されています。
def broadcast(socket, event, message) do
%{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
Server.broadcast pubsub_server, topic, event, message
end
ここで指定しているpubsub_server
は設定ファイル(config.exs
)で指定しています。デフォルトはPhoenix.PubSub.PG2
が設定されています。
続いて、Channel.Server
を見てみます。
Phoenix.Channel.Server
:
def broadcast(pubsub_server, topic, event, payload)
when is_binary(topic) and is_binary(event) and is_map(payload) do
PubSub.broadcast pubsub_server, topic, %Broadcast{
topic: topic,
event: event,
payload: payload
}
end
Phoenix.PubSub
を呼び出しています。
Phoenix.PubSub
:
defmodule Phoenix.PubSub do
# 略
def broadcast(server, topic, message) when is_atom(server) or is_tuple(server),
do: call(server, :broadcast, [:none, topic, message])
server
変数はPhoenix.PubSub.PG2
だったので、PG2Server
を見てみます。
Phoenix.PubSub.PG2Server
:
defmodule Phoenix.PubSub.PG2Server do
# 略
def broadcast(fastlane, server_name, pool_size, from_pid, topic, msg) do
server_name
|> get_members()
|> do_broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
end
# 略
defp do_broadcast(pids, fastlane, server_name, pool_size, from_pid, topic, msg)
when is_list(pids) do
local_node = Phoenix.PubSub.node_name(server_name)
Enum.each(pids, fn
pid when is_pid(pid) and node(pid) == node() ->
Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
{^server_name, node_name} when node_name == local_node ->
Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
pid_or_tuple ->
send(pid_or_tuple, {:forward_to_local, fastlane, from_pid, topic, msg})
end)
:ok
end
broadcast
関数の最初から三つまでの引数(fastlane
, server_name
, pool_size
)はPhoenix.PubSub.PG2
のinit
関数の中で定義されています。
init
関数に渡されるopts
変数は以下のようになっています。
pry(1)> opts
[fastlane: Phoenix.Channel.Server, pool_size: 1, name: YourAppName.PubSub,
adapter: Phoenix.PubSub.PG2]
今回のChannelの処理に直接は関係ないですが、これらはPhoenix.Endpoint
の__using__
の中で子を起動していく際に渡されます。
では、元のソースコードを見ていきます。
do_broadcast
関数ではPubSub.Local
のbroadcast関数に処理を移譲しています。
Phoenix.PubSub.Local
:
defmodule Phoenix.PubSub.Local do
# 略
def broadcast(fastlane, pubsub_server, pool_size, from, topic, msg) when is_atom(pubsub_server) do
parent = self()
for shard <- 0..(pool_size - 1) do
Task.async(fn ->
do_broadcast(fastlane, pubsub_server, shard, from, topic, msg)
Process.unlink(parent)
end)
end |> Enum.map(&Task.await(&1, :infinity))
:ok
end
defp do_broadcast(fastlane, pubsub_server, shard, from, topic, msg) do
pubsub_server
|> subscribers_with_fastlanes(topic, shard)
|> fastlane.fastlane(from, msg) # TODO: Test this contract
end
def subscribers_with_fastlanes(pubsub_server, topic, shard) when is_atom(pubsub_server) do
try do
shard
|> local_for_shard(pubsub_server)
|> :ets.lookup_element(topic, 2)
catch
:error, :badarg -> []
end
end
subscribers_with
関数の:ets.lookup_element
でローカルサーバー名とトピック名をキーにしてsubscribeしているクライアントを取得しています。
このクライアントに対してメッセージを送信することで、トピックをサブスクライブしているクライアントへのブロードキャストを実現しています。