8
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Phoenix Channelのテスト

Posted at

Channelを使ったアプリケーションのシナリオテストをどのように書くのか分からなかったので調べたことをまとめます。
テストライブラリはESpecを使いました。多分、他のライブラリでもほとんどやり方は変わらないと思います。

テストするシナリオ

ブロードキャストメッセージを各ユーザーが受信できるかテストします。
実際のプロジェクトではもっと複雑なシーケンスがほとんどだと思いますが、ここではわかりやすくなるようにシンプルなシーケンスをテストします
テストする対象のコードは、Channelのブロードキャスト処理とします(handle_in関数)。

defmodule Hello.RoomChannel do
  use Hello.Web, :channel

  def join("rooms:" <> room_id, _params, socket) do
    {:ok, assign(socket, :room_id, room_id)}
    {:ok, socket}
  end

  def handle_in("new_msg", msg, socket) do
    name = msg["user"]
    broadcast!(socket, "new_msg", %{user: msg["user"], body: msg["body"]})

    {:reply, {:ok, %{msg: msg["body"]}}, socket}
  end

# (略)
end

テストするための準備

メッセージ送信処理

アプリケーションでメッセージを送信/受信するクライアントはネイティブアプリかウェブアプリですが、テストではGenServerを使って擬似クライアントを実装します。use Phoenix.ChannelTestしているのはすでにPhoenixで用意してくれている関数を使うために指定しています。
(Phoenix.ChannelTestphoenix/lib/phoenix/testディレクトリにあるモジュールでChannelのテストを書くのに便利な関数が用意されています)

defmodule Hello.Support.RoomChannelClient do
  use Phoenix.ChannelTest

  @endpoint Hello.Endpoint

  def start_link(name, room_id) when is_atom(name) do
    GenServer.start_link(__MODULE__, %{room_id: room_id}, name: name)
  end

  def init(state) do
    with {:ok, socket} <- connect(Hello.UserSocket, %{"token" => ""}),
         {:ok, _msg, socket} <- subscribe_and_join(socket, Hello.RoomChannel, "rooms:#{state.room_id}") do

      {:ok, Map.merge(state, %{socket: socket, mailbox: []})}
    end
  end

  def handle_call({:event, event, body}, _from, state) do
    ref = make_ref()
    send(state.socket.channel_pid,
      %Phoenix.Socket.Message{event: event, topic: state.socket.topic, ref: ref, payload: body})

    {:reply, {:ok, ref}, state}
  end

# (略)
end

ここでやっていることは

  • UserSocketに接続し、対象のトピックをサブスクライブした後にジョインする
  • テストコードからメッセージ送信のリクエストを受け付けられるようにhandle_call関数でメッセージを受け付けソケットに送信する実装を入れている
  • 後で書く予定のBroadcastメッセージを保持するためにstateは{socket, mailbox: []}の形式でもつ

こうすることで、テストコードの中で以下のようにメッセージ送信することができます。

  describe "send new_msg" do

    before do
      {:ok, _pid} = ChannelClient.start_link(:user1, 1)
    end

    it do
      GenServer.call(:user1, {:event, "new_msg", %{"user" => "user1", "body" => "hello channel!"}})
    # (略)
    end
  end

メッセージ受信処理の準備

次に、ブロードキャストメッセージを受信するクライアントの処理もGenServerを使って実装します。
テストコードで以下のようにメッセージ受信を指定できるようにします。

  user1_messages = wait_and_receive(:user1, 1)

それではwait_and_receiveを実装します。

# 最初に呼ばれる。リトライ回数を設定
def wait_and_receive(username, n), do: wait_and_receive(username, n, 10)
# 全て取得し終わった場合に呼ばれる
def wait_and_receive(_, n, _) when n <= 0, do: []
# リトライ回数分、実行しても指定したメッセージ数分取得できなかった場合エラーとする
def wait_and_receive(username, n, 0), do: raise "wait_and_receive failed. username: #{username}"
# 取得処理
def wait_and_receive(username, n, retry) do
  case GenServer.call(username, :receive) do
    [] ->
      :timer.sleep(10)
      wait_and_receive(username, n, retry - 1)
    result_list ->
      num = Enum.count(result_list)
      result_list ++ wait_and_receive(username, n-num, retry - 1)
  end
end

ここでやっていることは

  • ユーザー名、取得メッセージ数を指定してwait_and_receiveが呼ばれたら、リトライ回数をセットして実行する(ここでは10回とした)
  • メッセージがまだなければスリープ、あれば取得する。取得メッセージ数まで取得できてなければ再帰処理で繰り返す
  • リトライ回数まで実行しても取得が終わらなければ、例外を発生させる

OTPメッセージの受信処理

各メッセージをstateに保存するhandle_info関数も実装します

  def handle_call(:receive, _from, state) do
    maillist = state.mailbox
    state = Map.merge(state, %{mailbox: []})

    {:reply, maillist, state}
  end

  def handle_info(%Phoenix.Socket.Broadcast{}=message, state) do
    state = Map.merge(state, %{mailbox: (state.mailbox ++ [message])})
    {:noreply, state}
  end

  def handle_info(%Phoenix.Socket.Message{}=message, state) do
    state = Map.merge(state, %{mailbox: (state.mailbox ++ [message])})
    {:noreply, state}
  end

  def handle_info(%Phoenix.Socket.Reply{}=message, state) do
    state = Map.merge(state, %{mailbox: (state.mailbox ++ [message])})
    {:noreply, state}
  end

その他ヘルパー関数

あとブロードキャストメッセージだけ受信できるように適当なフィルタリング処理を追加してあげると便利です。

def is_reply(%Phoenix.Socket.Reply{}=_), do: true
def is_reply(_), do: false

def is_broadcast(%Phoenix.Socket.Broadcast{}=_), do: true
def is_broadcast(_), do: false

テストコード

これでシナリオテストを書く準備ができました。
user1がブロードキャストメッセージを送信し、user1、user2にブロードキャストされていることを確認するテストコードは以下のようになります。

  def is_reply(%Phoenix.Socket.Reply{}=_), do: true
  def is_reply(_), do: false

  def is_broadcast(%Phoenix.Socket.Broadcast{}=_), do: true
  def is_broadcast(_), do: false

  before do
    {:ok, _pid} = ChannelClient.start_link(:user1, 1)
    {:ok, _pid} = ChannelClient.start_link(:user2, 1)
  end

  it do
    GenServer.call(:user1, {:event, "new_msg", %{"user" => "user1", "body" => "hello channel!"}})
    user1_results = wait_and_receive(:user1, 1)
    [user1_rcv_msgs] = Enum.filter(user1_results, &(__MODULE__.is_broadcast(&1)))

    expect(user1_rcv_msgs).to be_struct(Phoenix.Socket.Broadcast)
    expect(user1_rcv_msgs.event).to eq "new_msg"
    expect(user1_rcv_msgs.payload.body).to eq "hello channel!"
    expect(user1_rcv_msgs.payload.user).to eq "user1"

    user2_results = wait_and_receive(:user2, 1)
    [user2_rcv_msgs] = Enum.filter(user2_results, &(__MODULE__.is_broadcast(&1)))

    expect(user2_rcv_msgs).to be_struct(Phoenix.Socket.Broadcast)
    expect(user2_rcv_msgs.event).to eq "new_msg"
    expect(user2_rcv_msgs.payload.body).to eq "hello channel!"
    expect(user2_rcv_msgs.payload.user).to eq "user1"

  end
end

まとめ

ここでテストしたのはhandle_in関数のみですが、同じやり方でpush, handle_out関数もテストできます。
Phoenixですでにテストするための関数をほとんど用意してくれているのはありがたいです。

8
1
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
1

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?