8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 3 years have passed since last update.

NeosVRとPhoenix間でWebSocketを使用して双方向にメッセージングを行う NeosSocketを作ってみた

Last updated at Posted at 2021-03-17

はじめに

本記事はこちらの発展で
NeosVRとPhoenix間でWebSocketを使用して双方向にメッセージングを行う
WebアプリケーションNeosSocketを作成する記事になります

環境

OS MacOS Catalina 10.15.7
elixir 1.11.3-otp-23
erlang 23.2.5
nodejs v12.14.0
Phoenix v1.5.8

プロジェクト作成

mix phx.new neos_socket --live
cd neos_socket
mix ecto.create

Websocket有効化

次にwebsocketを有効にします

[edit]lib/neos_socket_web/channels/user_socket.ex
defmodule NeosSocketWeb.UserSocket do
  use Phoenix.Socket

  ## Channels
  channel "room:*", NeosSocketWeb.RoomChannel #コメントイン
...
end

Room CRUD作成

roomをDBと対応付けるためにCRUD画面を作ります

mix phx.gen.live Rooms Room rooms name:string description:string
mix ecto.migrate

phx.gen.liveで出てきたpathを追加し、トップページも RoomLive.Indexになるように変更します

[edit]lib/neos_socket_web/router.ex
defmodule NeosSocketWeb.Router do
...
  scope "/", NeosSocketWeb do
    pipe_through :browser
    live "/rooms", RoomLive.Index, :index
    live "/rooms/new", RoomLive.Index, :new
    live "/rooms/:id/edit", RoomLive.Index, :edit

    live "/rooms/:id", RoomLive.Show, :show
    live "/rooms/:id/show/edit", RoomLive.Show, :edit
    live "/", RoomLive.Index, :index
  end
...
end

bulma 追加

cardを使いたいのでbulmaを追加

cd assets
npm install --save bulma
cd ..

bulmaをimport

[edit]assets/css/app.scss
/* This file is for your main application css. */
@import "./phoenix.css";
@import "../node_modules/nprogress/nprogress.css";
@import "../node_modules/bulma/bulma.sass"; // 追加

差分は以下のコミットを参照してください
styling diff

アクセスしているクライアントを表示

こちらを参考にアクセス状況をリアルタイムに更新する機能を実装します

mix phx.gen.presence
[edit]lib/neos_socket/application.ex
defmodule NeosSocket.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    children = [
      # Start the Ecto repository
      NeosSocket.Repo,
      # Start the Telemetry supervisor
      NeosSocketWeb.Telemetry,
      # Start the PubSub system
      {Phoenix.PubSub, name: NeosSocket.PubSub},
      # Start the Endpoint (http/https)
      NeosSocketWeb.Endpoint, #,を忘れないこと
      NeosSocketWeb.Presence # ここ追加
      # Start a worker by calling: NeosSocket.Worker.start_link(arg)
      # {NeosSocket.Worker, arg}
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    opts = [strategy: :one_for_one, name: NeosSocket.Supervisor]
    Supervisor.start_link(children, opts)
  end

作成されたpresenceを使いやすいように変更します

[new]lib/neos_socket_web/channels/presence.ex
defmodule NeosSocketWeb.Presence do
  @moduledoc """
  Provides presence tracking to channels and processes.

  See the [`Phoenix.Presence`](http://hexdocs.pm/phoenix/Phoenix.Presence.html)
  docs for more details.
  """
  use Phoenix.Presence, otp_app: :neos_socket,
                        pubsub_server: NeosSocket.PubSub

  alias NeosSocketWeb.Presence

  def track_presence(pid, topic, key, payload) do
    Presence.track(pid, topic, key, payload)
  end

  def list_presence(topic) do
    topic
    |> Presence.list
    |> Enum.map(fn { _user_id, data} -> data |> extract_metadata end)
  end

  def update_presence(pid, topic, key, payload) do
    metas =
      Presence.get_by_key(topic, key)
      |> extract_metadata
      |> Map.merge(payload)

    Presence.update(pid, topic, key, metas)
  end

  def get_presence(topic, key) do
    Presence.get_by_key(topic, key)
    |> extract_metadata
  end

  def extract_metadata(data) do
    data |> Map.get(:metas) |> List.first
  end
end

管理ページ

リアルタイム更新の通知を受け取るために NeosSocketWeb.Endpoint.subscribe
roomにアクセスした通知を送るのに Presence.track_presence
更新通知を受け取ったときの処理を %{event: "presence_diff"}
で実行しています

[edit]lib/neos_socket_web/live/room_live/show.ex
defmodule NeosSocketWeb.RoomLive.Show do
  use NeosSocketWeb, :live_view

  alias NeosSocket.Rooms
  alias NeosSocketWeb.Presence

  @impl true
  def mount(%{"id" => id}, _session, socket) do
    obj_id = 9999
    room = Rooms.get_room!(id)
    NeosSocketWeb.Endpoint.subscribe("room:#{id}")
    Presence.track_presence(
      self(),
      "room:#{id}",
      obj_id,
      %{ name: "liveview", id: obj_id}
    )

    {
      :ok,
      socket
      |> assign(:page_title, "Room")
      |> assign(:room, room)
      |> assign(:object_id, obj_id)
      |> assign(:users, Presence.list_presence("room:#{id}"))
    }
  end

  def handle_info(%{event: "presence_diff"}, socket = %{assigns: %{room: room}}) do
    {:noreply, assign(socket, users: Presence.list_presence("room:#{room.id}"))}
  end  
end

[edit]lib/neos_socket_web/live/room_live/show.html.leex

<ul>
  <li class="title">
    <strong>Name:</strong>
    <%= @room.name %>
  </li>

  <li class="subtitle">
    <strong>Description:</strong>
    <%= @room.description %>
  </li>

</ul>

<div class="columns mt-1">
  <%= for object <- @users do %>
    <div class="column is-one-fifth">
      <div id="object-<%= object.id %>" class="card">
        <div class="card-content">
          <h5 class="title is-4">id: <%= object.id %></h5>
          <h5 class="title is-5">name: <%= object.name %></h5>
        </div>
      </div>
    </div>
  <% end %>
</div>

<span><%= live_redirect "Back", to: Routes.room_index_path(@socket, :index) %></span>

チャンネル(WebSocket)

部屋がなかったときに例外ではなくnilを返してエラーメッセージを返したいので
get_room/1 を実装

[edit]lib/neos_socket/rooms.ex
defmodule NeosSocket.Rooms do
...
  def get_room!(id), do: Repo.get!(Room, id)

  def get_room(id) do
    Room |> where([r], r.id == ^id) |> Repo.one
  end
...
end

Channelではjoin時にsubscribeされるので,track_presenceとpresence_diffだけ追加します
レスポンスの第2引数で現在アクセスしているクライアントのリストを返しています

[new]lib/neos_socket_web/channels/room_channel.ex
defmodule NeosSocketWeb.RoomChannel do
  use Phoenix.Channel
  alias NeosSocket.Rooms
  alias NeosSocketWeb.Presence

  @impl true
  def join(topic, payload, socket) do
    room_id = payload["room_id"]
    case Rooms.get_room(room_id) do
      nil ->
        {:error, "room not found"}
      _room ->
        object_id = payload["object_id"]
        Presence.track_presence(
          self(),
          "room:#{room_id}",
          object_id,
          %{ id: object_id, name: payload["name"]}
        )

        {
          :ok,
          Presence.list_presence("room:#{room_id}"),
          socket
          |> assign(:room_id, room_id)
          |> assign(:users, Presence.list_presence("room:#{room_id}"))
        }
    end
  end

  @impl
  def handle_info(%{event: "presence_diff"}, socket = %{ assigns: %{room_id: room_id } }) do
    {:noreply, assign(socket, users: Presence.list_presence("room:#{room_id}"))}
  end
end

動作確認用にjsからチャンネルに接続するコード

[new]assets/js/socket.js
import {Socket} from "phoenix"
let socket = new Socket("/socket", {params: {token: window.userToken}})

socket.connect()

let channel = socket.channel("room:1", {room_id: 1, object_id: 1,name: "socket"})

channel.join()
  .receive("ok", resp => { console.log("joined successfully", resp)})
  .receive("error", resp => {console.log("unable to join", resp)})


export default socket

app.jsで読み込み

[edit]assets/js/app.js
...
import socket from "./socket"
...

liveviewとsocket両方アクセスしているのを確認できました
socket -> liveviewの順番で実行されているので、consoleの方はsocketだけアクセスしているリストを受け取っています
スクリーンショット 2021-03-17 2.13.25.png
クライアントのアクセス状況を表示できるようになったので次はメッセージの送受信を実装していきます

Message と Object モデル作成

今回はDBに保存はしませんが、構造体がほしいのと後々DBに保存するので
mix phx.gen.schemaで messageとobjectを作っていきます

mix phx.gen.schema Rooms.Object objects name:string object_id:integer room_id:references:rooms
mix phx.gen.schema Rooms.Message messages body:string object_id:references:objects room_id:references:rooms
mix ecto.migrate
[new]lib/neos_socket/rooms/message.ex
defmodule NeosSocket.Rooms.Message do
  use Ecto.Schema
  import Ecto.Changeset

  schema "messages" do
    field :body, :string
    field :object_id, :id
    field :room_id, :id

    timestamps()
  end

  @doc false
  def changeset(message, attrs) do
    message
    |> cast(attrs, [:body, :object_id, :room_id])
    |> validate_required([:body, :object_id, :room_id])
  end
end
[new]lib/neos_socket/rooms/object.ex
defmodule NeosSocket.Rooms.Object do
  use Ecto.Schema
  import Ecto.Changeset

  schema "objects" do
    field :name, :string
    field :object_id, :integer
    field :room_id, :id

    timestamps()
  end

  @doc false
  def changeset(object, attrs) do
    object
    |> cast(attrs, [:name, :object_id, :room_id])
    |> validate_required([:name, :object_id, :room_id])
  end
end

Messageのchangesetを作成して、値は更新しますが保存しないで返します

[edit]lib/rooms.ex
defmodule NeosSocket.Rooms do
  alias NeosSocket.Rooms.Room
  alias NeosSocket.Rooms.Message # 追加
...

  def change_message(%Message{} = message, attrs \\ %{}) do
    Message.changeset(message, attrs)
  end

  def create_message(attrs \\ %{}) do
    %Message{}
    |> Message.changeset(attrs)
    # |> Repo.insert()
  end
end

送信処理

[edit]lib/neos_socket_web/live/room_live/show.ex
defmodule NeosSocketWeb.RoomLive.Show do
  use NeosSocketWeb, :live_view

  alias NeosSocket.Rooms
  alias NeosSocket.Rooms.Message
  alias NeosSocketWeb.Presence

  @impl true
  def mount(%{"id" => id}, _session, socket) do
    obj_id = 9999
    room = Rooms.get_room!(id)
    changeset = Rooms.change_message(%Message{object_id: obj_id, room_id: id})
....
    {
      :ok,
      socket
      |> assign(:page_title, "Room")
      |> assign(:room, room)
      |> assign(:object_id, obj_id)
      |> assign(:changeset, changeset) # changesetをassign
      |> assign(:users, Presence.list_presence("room:#{id}"))
    }
  end

  # 送信処理、送信先をオブジェクトか部屋全体(broadcast)かを選べる
  @impl true
  def handle_event("send", %{"message" => message_params}, socket) do
    message = Rooms.create_message(message_params).changes
    type = if message.object_id == 0, do: "room", else: "user"
    topic = if message.object_id == 0 do
              "#{type}:#{message.room_id}"
            else
              "#{type}:#{message.object_id}"
            end

    NeosSocketWeb.Endpoint.broadcast!(
      topic,
      "broadcast_#{type}",
      message
    )
    {:noreply, socket}
  end
...
end

websocket側

broadcastはそのままbroadcast関数を実行してメッセージを全体に送信しますが
postはneosからメッセージを受け取ったという形なので、管理画面の対応するObjectにメッセージを追加します
get_presenceでroomにアクセスしているオブジェクトからobject_idが一致するメッセージ一覧を取得し、
update_presenceで受信したメッセージを先頭に追加して新しいリストに差し替えています。

[edit]lib/neos_socket_web/channels/room_channel.ex
defmodule NeosSocketWeb.RoomChannel do
...
  @impl true
  def handle_in("broadcast", payload, socket) do
    message = Rooms.create_message(payload).changes
    broadcast!(socket, "broadcast_room", message)
    {:reply, {:ok, message }, socket}
  end

  @impl
  def handle_in("post", payload, socket) do
    message = Rooms.create_message(payload).changes
    presence = Presence.get_presence("room:#{message.room_id}", message.object_id)
    Presence.update_presence(
      self(),
      "room:#{message.room_id}",
      message.object_id,
      %{messages: [message | presence.messages] |> List.flatten }
    )
 
    {:reply, :ok, socket}
  end
...
end

formのhtml

<div class="box">
  <%=
    f = form_for @changeset, "#",
    id: "board-form",
    phx_submit: "send"
  %>
    <div class="media">
      <%= label f, :to, class: "label mt-2 media-left"%>
      <div class="media-content">
        <div class="select">
          <%=
            select f,
              :object_id,
              [ [key: "all", value: 0] |Enum.map(@users, fn u -> [key: u.name, value: u.id] end)]
          %>
        </div>
      </div>
    </div>
    <div class="field">
      <%= label f, :message, class: "label" %>  
      <%= textarea f, :body, class: "input mt-1" %>
    </div>

    <%= error_tag f, :body %>
    <%= hidden_input f, :room_id %>
    <%= submit "save", class: "button mt-1 is-primary", phx_disable_with: "saving..."%>
</div>

受信処理

受信したメッセージを
broadcastならassigns.messagesに
objectならpresenceのliveviewのmessagesに振り分けていきます

[edit]lib/neos_socket_web/live/room_live/show.ex
defmodule NeosSocketWeb.RoomLive.Show do

  def mount(%{"id" => id}, _session, socket) do
...
    NeosSocketWeb.Endpoint.subscribe("user:#{obj_id}") # liveview向けのメッセージ受信をsubscribe
    Presence.track_presence(
      self(),
      "room:#{id}",
      obj_id,
      %{ name: "liveview", id: obj_id, messages: []} # 受け取ったmessageを格納するmessagesを追加
    )

    {:ok,
     socket
     |> assign(:page_title, "Room")
     |> assign(:room, room)
     |> assign(:object_id, obj_id)
     |> assign(:changeset, changeset)
     |> assign(:users, Presence.list_presence("room:#{id}"))
     |> assign(:messages, []) # broadcast log
   }
  end

  @impl true
  def handle_info(%{event: "broadcast_room", payload: state}, %{ assigns: %{messages: messages}} = socket) do
    message = Rooms.create_message(state).changes
    {:noreply, socket |> assign(:messages, [message | messages] |> List.flatten)}
  end

  @impl true
  def handle_info(%{event: "broadcast_user", payload: state}, socket) do
    message = Rooms.create_message(state).changes
    presence = Presence.get_presence("room:#{message.room_id}", message.object_id)
    Presence.update_presence(
      self(),
      "room:#{message.room_id}",
      message.object_id,
      %{messages: [message | presence.messages] |> List.flatten }
    )
    {:noreply, socket}
  end
...
end

handle_infoでeventを発火させて、自objectのmessagesを更新し、
handle_infoではreplyできないのでhandle_in("send_broadcast_user")にpushしてreplyでNeos側にメッセージを送信します

[edit]lib/neos_socket_web/channels/room_channel.ex
defmodule NeosSocketWeb.RoomChannel do
...
  @impl true
  def join(topic, payload, socket) do
    room_id = payload["room_id"]
    case Rooms.get_room(room_id) do
      nil ->
        {:error, "room not found"}
      _room ->
        object_id = payload["object_id"]
        NeosSocketWeb.Endpoint.subscribe("user:#{object_id}") # 自分に向けてのメッセージ受信をsubscribe
        Presence.track_presence(
          self(),
          "room:#{room_id}",
          object_id,
          %{ id: object_id, name: payload["name"], messages: []} # messagesを追加
        )
...
    end
  end

  @impl
  def handle_in("send_broadcast_user", payload, socket) do
    {:reply, payload, socket}
  end

  @impl true
  def handle_info(%{event: "broadcast_room", payload: state}, socket = %{ assigns: messages}) do
    message = Rooms.create_message(state).changes
    push(socket,"send_broadcast_user", message)
    {:noreply, socket}
  end

  @impl true
  def handle_info(%{event: "broadcast_user", payload: state}, socket) do
    message = Rooms.create_message(state).changes
    presence = Presence.get_presence("room:#{message.room_id}", message.object_id)
    Presence.update_presence(
      self(),
      "room:#{message.room_id}",
      message.object_id,
      %{messages: [message | presence.messages] |> List.flatten }
    )
    push(socket,"send_broadcast_user", message)
    {:noreply, socket}
  end
...
end

メッセージ表示部分のhtml

<div class="columns mt-1">
  <div class="column is-one-fifth">
    <div class="card">
      <div class="card-content">
        <h5 class="title is-4">Broadcast</h5>
        <ul>
          <%= for message <- @messages do %>
            <li>[<%= message.body %>] from:<%= message.object_id %> </li>
          <% end %>
        </ul>

      </div>
    </div>

  </div>
  <%= for object <- @users do %>
    <div class="column is-one-fifth">
      <div id="object-<%= object.id %>" class="card">
        <div class="card-content">
          <h5 class="title is-4">id: <%= object.id %></h5>
          <h5 class="title is-5">name: <%= object.name %></h5>
          <ul>
            <%= for message <- object.messages do %>
              <li>[<%= message.body %>] from:<%= message.object_id %> </li>
            <% end %>
          </ul>
        </div>
      </div>
    </div>
  <% end %>
</div>

動作確認用のコードも追加していきます
channel.onで待ち受けするイベントを決めて
channel.pushでpostとbroadcastを実行しています

[edit]assets/js/socket.js
...
channel.on("broadcast_room", payload => {
  console.log("receive_room_broadcast", payload)
})

channel.on("send_broadcast_user", payload => {
  console.log("receive_user_broadcast", payload)
})

setTimeout(
  function () {
    channel.push("post", {body: "first", room_id: 1, object_id: 1})
      .receive("ok", (resp) => console.log("first post ok:", resp))
  },
  "1000"
);

setTimeout(
  function () {
    channel.push("broadcast", {body: "broadcast", room_id: 1, object_id: 1})
      .receive("ok", (resp) => console.log("broadcast send ok:", resp))
  },
  "1000"
);

これで実装が完了したので、動作確認をしていきます
Image from Gyazo

LiveViewとchannel両方で送受信できているのが確認できました
デプロイ後socket.jsは不要なのでコメントアウトを忘れずにしておきましょう

デプロイとNeosVR側のLogixについて

以下を参考にしてください

何が嬉しいの?

PhoenixとNeosVRをwebsocketでつないで尚且双方向にメッセージの送受信ができるので
NeosVRからテキストやメディアを受け取って Phoenix経由でtwitterやslackなど各種サービスにpostしたり
逆にPhoenixで実装したSlackBotで更新を受け取ってそれをNeosVRで受け取るということができます

NeosVR側で受け取ったデータをどう表示するかは課題が多いですが、このコードがいろんなサービスとの連携の一助になればと思います

本記事は以上になりますありがとうございました

本記事のコード

参考サイト

https://qiita.com/the_haigo/items/00b6c2ee12b061bc2355
https://qiita.com/the_haigo/items/3b34fa4f4c3adaf192c4
https://qiita.com/sand/items/886ace4dcdff5953e673
https://medium.com/elixir-learnings/getting-bulma-css-to-work-with-phoenix-liveview-e2c9e328874a
https://bulma.io/documentation/
https://hexdocs.pm/phoenix/channels.html
https://zenn.dev/koga1020/books/phoenix-guide-ja-1-5/viewer/channels#client-libraries
https://elixirforum.com/t/presence-update-loop-works-once-then-returns-error-nopresence/22371/2

8
5
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
8
5

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?