LoginSignup
12
8

More than 5 years have passed since last update.

Phoenix ChannelのAPIについて

Posted at

最近Channelを使ったアプリケーションを触っているので、APIをまとめました。

受信したイベントを処理するAPI

handle_in/3

channelにjoinしたクライアントからのメッセージを受信して何かしらの処理を行うときに定義します。
多いのは、トピックをサブスクライブしているクライアント全てにメッセージを送るケースだと思います。
その場合は、broadcast!/3を使います。

def handle_in("new_msg", msg, socket) do
  broadcast!(socket, "new_msg", %{"name" => msg["name"], body: msg["body"]})

  {:reply, {:ok, msg["body"]}, socket}
end

{:reply, {:ok, response}, socket}でリクエストを送信したクライアントに処理結果を返却します。
返却する必要がない場合は、{:noreply, socket}を戻り値に指定します。

全体にブロードキャストする以外に、ソケットに接続しているユーザーにだけ直接メッセージを送ることもできます。push/3を使います。

def handle_in("push_event", msg, socket) do
  push(socket, "push_event", %{body: "push message"})
  {:noreply, socket}
end

また、自分以外のクライアントにブロードキャストできるbroadcast_from/4もあります。自身にメッセージを通知したくない場合に使います。

handle_out/3

ブロードキャストされたメッセージをインターセプトすることができます。メッセージを加工して届けたい場合に使用することが多いです。
handle_outの宣言の前にintercept ["event_name"]でインターセプトするイベントを指定します。

intercept ["new_msg"]
def handle_out("new_msg", payload, socket) do
  push(socket, "new_msg", %{payload|body: payload.body <> " hello world!"})
  {:noreply, socket}
end

上記の場合、broadcast(socket, "new_msg", message) でブロードキャストしたメッセージをクライアントに送信する前にインターセプトして、そのソケットに接続しているユーザーにだけ加工したメッセージを送信します。

Endpoint.broadcast

自分がsubscribeしていないトピックを指定して、broadcastすることもできます。YourApp.Endpoint.broadcast/3もしくはYourAppName.Endpoint.broadcast_from/4を使います。

def handle_in("to_other_topic_client", msg, socket) do
  YourAppName.Endpoint.broadcast_from!(self(), "rooms:othertopic", "new_msg", %{user: msg["user"], body: msg["body"]})

  {:noreply, socket}
end

rooms:othertopicをサブスクライブしているクライアントに対してメッセージをブロードキャストしています。
APIサーバなどChannel以外から、直接トピックを指定してメッセージをブロードキャストする場合にもこの方法で行います。

Channelのコード

Phoenix.Channelの中がどうなっているのか少し見てみます。

コールバック関数

Phoenix.Channel:
Public APIはコールバック関数として宣言されています。

defmodule Phoenix.Channel do
  # 略

  @callback join(topic :: binary, auth_msg :: map, Socket.t) ::
              {:ok, Socket.t} |
              {:ok, map, Socket.t} |
              {:error, map}

  @callback handle_in(event :: String.t, msg :: map, Socket.t) ::
              {:noreply, Socket.t} |
              {:reply, reply, Socket.t} |
              {:stop, reason :: term, Socket.t} |
              {:stop, reason :: term, reply, Socket.t}

  @callback handle_info(term, Socket.t) ::
              {:noreply, Socket.t} |
              {:stop, reason :: term, Socket.t}

そして、__using__の中にデフォルト実装があります。
これで、自分のプロジェクトのChannelモジュールでuse Phoenix.Channelとすることでこれらの関数が実装されます。

  # 略

  defmacro __using__(_) do
    quote do
      @behaviour unquote(__MODULE__)
      @on_definition unquote(__MODULE__)
      @before_compile unquote(__MODULE__)
      @phoenix_intercepts []

      import unquote(__MODULE__)
      import Phoenix.Socket, only: [assign: 3]

      def code_change(_old, socket, _extra), do: {:ok, socket}

      def handle_in(_event, _message, socket) do
        {:noreply, socket}
      end

      def handle_info(_message, socket), do: {:noreply, socket}

      def terminate(_reason, _socket), do: :ok

      defoverridable code_change: 3, handle_info: 2, handle_in: 3, terminate: 2
    end
  end

broadcast関数

broadcast関数は以下のように実装されています。

  def broadcast(socket, event, message) do
    %{pubsub_server: pubsub_server, topic: topic} = assert_joined!(socket)
    Server.broadcast pubsub_server, topic, event, message
  end

ここで指定しているpubsub_serverは設定ファイル(config.exs)で指定しています。デフォルトはPhoenix.PubSub.PG2が設定されています。
続いて、Channel.Serverを見てみます。

Phoenix.Channel.Server:

def broadcast(pubsub_server, topic, event, payload)
    when is_binary(topic) and is_binary(event) and is_map(payload) do
  PubSub.broadcast pubsub_server, topic, %Broadcast{
    topic: topic,
    event: event,
    payload: payload
  }
end

Phoenix.PubSubを呼び出しています。

Phoenix.PubSub:

defmodule Phoenix.PubSub do

# 略

  def broadcast(server, topic, message) when is_atom(server) or is_tuple(server),
    do: call(server, :broadcast, [:none, topic, message])


server変数はPhoenix.PubSub.PG2だったので、PG2Serverを見てみます。
Phoenix.PubSub.PG2Server:

defmodule Phoenix.PubSub.PG2Server do

# 略

  def broadcast(fastlane, server_name, pool_size, from_pid, topic, msg) do
    server_name
    |> get_members()
    |> do_broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
  end

# 略

  defp do_broadcast(pids, fastlane, server_name, pool_size, from_pid, topic, msg)
    when is_list(pids) do
    local_node = Phoenix.PubSub.node_name(server_name)

    Enum.each(pids, fn
      pid when is_pid(pid) and node(pid) == node() ->
        Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
      {^server_name, node_name} when node_name == local_node ->
        Local.broadcast(fastlane, server_name, pool_size, from_pid, topic, msg)
      pid_or_tuple ->
        send(pid_or_tuple, {:forward_to_local, fastlane, from_pid, topic, msg})
    end)
    :ok
  end

broadcast関数の最初から三つまでの引数(fastlane, server_name, pool_size)はPhoenix.PubSub.PG2init関数の中で定義されています。
init関数に渡されるopts変数は以下のようになっています。

pry(1)> opts
[fastlane: Phoenix.Channel.Server, pool_size: 1, name: YourAppName.PubSub,
 adapter: Phoenix.PubSub.PG2]

今回のChannelの処理に直接は関係ないですが、これらはPhoenix.Endpoint__using__の中で子を起動していく際に渡されます。

では、元のソースコードを見ていきます。
do_broadcast関数ではPubSub.Localのbroadcast関数に処理を移譲しています。

Phoenix.PubSub.Local:

defmodule Phoenix.PubSub.Local do

# 略
  def broadcast(fastlane, pubsub_server, pool_size, from, topic, msg) when is_atom(pubsub_server) do
    parent = self()
    for shard <- 0..(pool_size - 1) do
      Task.async(fn ->
        do_broadcast(fastlane, pubsub_server, shard, from, topic, msg)
        Process.unlink(parent)
      end)
    end |> Enum.map(&Task.await(&1, :infinity))
    :ok
  end

  defp do_broadcast(fastlane, pubsub_server, shard, from, topic, msg) do
    pubsub_server
    |> subscribers_with_fastlanes(topic, shard)
    |> fastlane.fastlane(from, msg) # TODO: Test this contract
  end

  def subscribers_with_fastlanes(pubsub_server, topic, shard) when is_atom(pubsub_server) do
    try do
      shard
      |> local_for_shard(pubsub_server)
      |> :ets.lookup_element(topic, 2)
    catch
      :error, :badarg -> []
    end
  end

subscribers_with関数の:ets.lookup_elementでローカルサーバー名とトピック名をキーにしてsubscribeしているクライアントを取得しています。
このクライアントに対してメッセージを送信することで、トピックをサブスクライブしているクライアントへのブロードキャストを実現しています。

12
8
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
12
8