5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?

More than 5 years have passed since last update.

Phoenix Channelで作る最先端Webアプリ - ETS編

Last updated at Posted at 2018-03-22

Phoenix Channelで作る最先端Webアプリ - topic-subtopic編 - Qiita
Phoenix Channelで作る最先端Webアプリ - ETS編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Fault Tolerance編 - Qiita
Phoenix Channelで作る最先端Webアプリ - 地図(Geo)拡張編 - Qiita
Phoenix Channelで作る最先端Webアプリ - DynamicSupervisor編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Elixier Application編 - Qiita
Phoenix Channelで作る最先端Webアプリ - Reactチャット編 - Qiita

 今回の技術的なポイントは以下の通りです。

ポイント
1.Elixir ApplicationのETSで状態を保存し、Fault Toleranceに対応する

 今回はElixirに最初から組み込まれているDB(?)であるETS(Erlang Term Storage)を使ってみます。嬉しいことにETSは特別な設定とか無しに、すぐに使えます。

1.ETSでFault Toleranceに対応

 ListenTweets.ListenerではTweetsの入力を待って、入力があったらブロードキャストを行うループがあります。for tweet <- stream do という文になりますが、ここでカウンターを仕掛けて、何番目の書き込みかもブロードキャストで渡したいと思います。試してみるとわかるのですが、このループ文をそのままにカウンター変数を導入することは難しいです。私の理解不足かもしれませんが、グローバル変数のようなものをカウンターとして、ここでインクリメントすることはできません。全てのデータはimmutableですから。

 しかしETSを使うとこの問題は簡単にクリアーできます。しかもFault Toleranceに対応した形で実現できます。ETSはメモリーデータです。非常に高速で、システム全体で共有できます。通常、Elixirでは専用のGenServerを用意して状態をキープしますが、その必要もありません。以下に特徴のいくつかを列挙します。

・テーブルはglobal name(atom)でアクセスできます。
・テーブルは mutableです。
・複数のプロセスが同時にアクセス(read & write)できます。
・テーブルは全く分離されたメモリースペースに置かれ、出し入れ時にディープコピーされます。
・ガベージコレクターによる弊害はありません。上書きや削除時に直ぐに解放されます。
・テーブルを作成したプロセスが死ぬと、テーブルは回収されます。
・Cコードで実装されており、自分で実装するより、速く効率的です。

2.ETSの実装

 それでは早速実装していきましょう。application.exでテーブルを作成します。:listener_stateと名付けます。:named_tableをオプションに含めることで、この名前を用いて直接テーブルにアクセスできます。:publicを指定しているので、全てのプロセスで読み/書きが可能です。

lib/listen_tweets/application.ex
#
    :ets.new(:listener_state, [:public, :named_table])
    opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

 次にlistener.exでこのテーブルを使います。「# msg_counterを取り出す」部分を見てください。テーブルにkeyの値が無ければ1を、あればその値をmsg_counterに設定します。「# msg_counterを更新する」を見てください。msg_counterを1インクリメントし、keyの新しい値としてテーブルを更新しています。

 ETSに関する設定とかコーディングは以上で全てです。超簡単ですね。Fault Toleranceについても、再起動時にETSを値を取得しているので、カウンターも連続した値をキープしたまま再起動します。pidを意識的に比較しない限りは、クラッシュした事実にも気づきません。

lib/listen_tweets/listener.ex
#
  def init(key) do
    channel =
        get_pid()
        |> get_socket()
        |> get_channel(key)

    if channel == :error do
      IO.puts("error!!!")
    else
       PhoenixChannelClient.join(channel)
       pid = spawn(fn ->
           stream = ExTwitter.stream_filter(track: key)
           for tweet <- stream do
             msg_counter =   # msg_counterを取り出す
               case :ets.lookup(:listener_state, key) do
                 [] -> 1
                 [{_key, state}] -> state
               end

             PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text, geo: tweet.geo, counter: msg_counter})
             # msg_counterを更新する
             :ets.insert(:listener_state, {key, msg_counter+1})
           end
       end)
    end
    { :ok, key }
  end
#

 上でPhoenixChannelClient.push/3でcounterも渡すように変更しました。全体のコードをシンプルにするために形式を合わせ、ブラウザのクライアントの方も、channel.push()でcounter:0を渡すように変更します。

assets/js/Chat.js
#
  handleSubmit(event) {
    event.preventDefault();
    this.channel.push("new_msg", {msg: this.state.inputMessage, geo:null, counter:0})
    this.setState({ inputMessage: "" })
  }
#

 channel moduleの方も、counterの追加に対応します。

lib/react_chat_web/channels/room_channel.ex
#
  def handle_in("new_msg", %{"msg" => msg, "geo" => geo, "counter" => counter}, socket) do
    user_name = socket.assigns[:user_name]
    broadcast(socket, "new_msg", %{msg: msg, user_name: user_name, time: now(), geo: geo, counter: counter})
    {:reply, :ok, socket}
  end
#

 クライアントでcounterを表示するようにします

assets/js/Chat.js
#
        return (
          <List key={index}>
            <ListItem
              leftAvatar={<Avatar>{message.user_name}</Avatar>}
              primaryText={unixTime2ymd(message.time)}
              secondaryText={geo}
              secondaryTextLines={2} >
              {message.counter} {message.msg}
            </ListItem>
          </List>
#

 以上でソースコードは全て修正しました。

3.Fault Toleranceの確認

次にコマンドでReactChat Applicationを立ち上げます。ブラウザを開き「さんど」でjoinします。キーワード「らーめん」と「花見」を登録します。以下のように「らーめん」と「花見」でそれぞれ独立したカウンターが動作しているのがわかります。
image.png

 それでは、Fault Toleranceの確認を行ってみます。iexシェルから以下のコマンドを打って、プロセスをクラッシュさせます。

alias ListenTweets.{Listener,ListenerSupervisor}
pid=ListenerSupervisor.pid_from_key("らーめん")
send pid, {:div,0}

 以下がクラッシュ時の出力です。

iex(2)> [debug] INCOMING "new_msg" on "room:lobby" to ReactChatWeb.RoomChannel
  Transport:  Phoenix.Transports.WebSocket
  Parameters: %{"counter" => 14, "geo" => nil, "msg" => "RT @ishibashitaka: お知らせ\nいつもご愛顧いただきましてありがとうございます。\nこのたび らーめん 鎹 は\n30年3月30日をもちまして、閉店することとなりました。\nこれまでの皆さまからのご支援、心より感謝申し上げます。\nなお、近日中に移転再開する予定でござい…"}
{"new_msg",
 %{
   "counter" => 14,   <===※クラッシュ前
   "geo" => nil,
   "msg" => "RT @ishibashitaka: お知らせ\nいつもご愛顧いただきましてありがとうございます。\nこのたび らーめ ん 鎹 は\n30年3月30日をもちまして、閉店することとなりました。\nこれまでの皆さまからのご支援、心より感謝申し 上げます。\nなお、近日中に移転再開する予定でござい…",
   "time" => 1521694170,
   "user_name" => "らーめん"
 }}
{"phx_reply", %{"response" => %{}, "status" => "ok"}}
 alias ListenTweets.{Listener,ListenerSupervisor}
[ListenTweets.Listener, ListenTweets.ListenerSupervisor]
iex(3)> pid=ListenerSupervisor.pid_from_key("らーめん")
#PID<0.403.0>
iex(4)> send pid, {:div,0}
{:div, 0}
iex(5)> [error] GenServer {Registry.Listener, "らーめん"} terminating
** (ArithmeticError) bad argument in arithmetic expression
    (listen_tweets) lib/listen_tweets/listener.ex:48: ListenTweets.Listener.handle_info/2
    (stdlib) gen_server.erl:616: :gen_server.try_dispatch/4
    (stdlib) gen_server.erl:686: :gen_server.handle_msg/6
    (stdlib) proc_lib.erl:247: :proc_lib.init_p_do_apply/3
Last message: {:div, 0}
State: "らーめん"
[info] JOIN "room:lobby" to ReactChatWeb.RoomChannel
  Transport:  Phoenix.Transports.WebSocket (1.0.0)
  Serializer:  Phoenix.Transports.WebSocketSerializer
  Parameters: %{"user_name" => "らーめん"}
[info] Replied room:lobby :ok
{"phx_reply", %{"response" => %{}, "status" => "ok"}}
{"presence_state",
 %{
   "さんど" => %{
     "metas" => [%{"online_at" => 1521693757, "phx_ref" => "bR+hgTs0Cfk="}]
   }
 }}
{"presence_diff",
 %{
   "joins" => %{
     "らーめん" => %{
       "metas" => [%{"online_at" => 1521694208, "phx_ref" => "GkLxy5uQLek="}]
     }
   },
   "leaves" => %{}
 }}

nil
iex(6)> [debug] INCOMING "new_msg" on "room:lobby" to ReactChatWeb.RoomChannel
  Transport:  Phoenix.Transports.WebSocket
  Parameters: %{"counter" => 15, "geo" => nil, "msg" => "RT @tenshinmukai: 4月1日に「CLAMP of STARS」をご購 入いただいた方の特典会に関しましては、石橋のご協力のもと、都内の別の場所にて鎹の台湾もつ鍋をご提供できる形を検討しております。\nよろしくお願いします。。 #SDB https://t.co/mF…"}
{"new_msg",
 %{
   "counter" => 15,   <===※クラッシュ後
   "geo" => nil,
   "msg" => "RT @tenshinmukai: 4月1日に「CLAMP of STARS」をご購入いただいた方の特典会に関しましては、石橋の ご協力のもと、都内の別の場所にて鎹の台湾もつ鍋をご提供できる形を検討しております。\nよろしくお願いします。。 #SDB https://t.co/mF…",
   "time" => 1521694242,
   "user_name" => "らーめん"
 }}
{"phx_reply", %{"response" => %{}, "status" => "ok"}}

※クラッシュ前と※クラッシュ後のcounterは、14と15になっています。クリアーされずに、再起動時にETSから正しい値を取り出していることがわかります。

 今回のケースは少し特殊ですが、通常はGenServerが保持しているstateの値を丸ごとETSに保管し、再起動時にリカバリーとしてETSからstateを戻す処理にすることが多いようです。

4.修正ソースコードの全リスト

今回修正したソースの全リストを掲載します。

lib/listen_tweets/application.ex
defmodule ListenTweets.Application do
  # See https://hexdocs.pm/elixir/Application.html
  # for more information on OTP Applications
  @moduledoc false

  use Application

  def start(_type, _args) do
    # List all child processes to be supervised
    children = [
      {Registry, keys: :unique, name: Registry.Listener}, #追加
      ListenTweets.ListenerSupervisor, #Listenerから変更
    ]

    # See https://hexdocs.pm/elixir/Supervisor.html
    # for other strategies and supported options
    :ets.new(:listener_state, [:public, :named_table])
    opts = [strategy: :one_for_one, name: ListenTweets.Supervisor]
    Supervisor.start_link(children, opts)
  end
end

lib/listen_tweets/listener.ex
defmodule ListenTweets.Listener do
  use GenServer, restart: :transient  #child_spec
#  use GenServer, restart: :permanent  #child_spec
#  use GenServer, restart: :temporary  #child_spec

  def start_link(key) do
    GenServer.start_link(__MODULE__, key, name: via_tuple(key))
  end


  # via_tuple
  def via_tuple(name), do: {:via, Registry, {Registry.Listener, name}}


  def init(key) do
    channel =
        get_pid()
        |> get_socket()
        |> get_channel(key)

    if channel == :error do
      IO.puts("error!!!")
    else
       PhoenixChannelClient.join(channel)
       pid = spawn(fn ->
           stream = ExTwitter.stream_filter(track: key)
           for tweet <- stream do
             msg_counter =
               case :ets.lookup(:listener_state, key) do
                 [] -> 1
                 [{_key, state}] -> state
               end

             PhoenixChannelClient.push(channel, "new_msg", %{msg: tweet.text, geo: tweet.geo, counter: msg_counter})

             :ets.insert(:listener_state, {key, msg_counter+1})
           end
       end)
    end
    { :ok, key }
  end

  def handle_cast({ :update, new_key }, _current_key) do
    { :noreply, new_key }
  end

  def handle_info({ :div, a}, key) do
    ans = 40 / a
    IO.puts ("ans=#{ans}")
    { :noreply, key }
  end

  def handle_info(mess, key) do
    IO.inspect mess
    { :noreply, key }
  end


  defp get_pid do
    res_pid = PhoenixChannelClient.start_link()
    case res_pid do
      {:ok, pid} -> pid
      _ -> :error
    end
  end


  defp get_socket (:error) do
    :error
  end
  defp get_socket (pid) do
     res_socket = PhoenixChannelClient.connect(pid,
        host: "localhost",
        port: 4000,
        path: "/socket/websocket",
        secure: false,
        heartbeat_interval: 30_000)
      case res_socket do
        {:ok, socket} -> socket
        _ -> :error
      end
  end

  defp get_channel(:error, _key) do
    :error
  end
  defp get_channel(socket, key) do
    channel = PhoenixChannelClient.channel(socket, "room:lobby", %{user_name: key})
    channel
  end
end
lib/react_chat_web/channels/room_channel.ex
defmodule ReactChatWeb.RoomChannel do
  use ReactChatWeb, :channel
  alias ReactChatWeb.Presence
  alias ListenTweets.ListenerSupervisor

  def join("room:lobby", %{"user_name" => user_name}, socket) do
    send(self(), {:after_join, user_name})
    {:ok, socket}
  end


  def handle_in("new_msg", %{"msg" => msg, "geo" => geo, "counter" => counter}, socket) do
    user_name = socket.assigns[:user_name]
    broadcast(socket, "new_msg", %{msg: msg, user_name: user_name, time: now(), geo: geo, counter: counter})
    {:reply, :ok, socket}
  end


  def handle_in("new_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.start_listener(msg)
    {:reply, :ok, socket}
  end

  def handle_in("del_key", %{"msg" => msg}, socket) do
    ListenerSupervisor.stop_listener(msg)
    {:reply, :ok, socket}
  end

  def handle_info({:after_join, user_name}, socket) do
    push(socket, "presence_state", Presence.list(socket))
    {:ok, _ref} = Presence.track(socket, user_name, %{online_at: now()})
    {:noreply, assign(socket, :user_name, user_name)}
  end

  def terminate(_reason, socket) do
    {:noreply, socket}
  end

  defp now do
    System.system_time(:seconds)
  end
end
assets/js/Chat.js
import React from "react";
import {Socket, Presence} from "phoenix"
import RaisedButton from 'material-ui/RaisedButton';
import Paper from 'material-ui/Paper';
import Divider from 'material-ui/Divider';
import TextField from 'material-ui/TextField';

import {Map, Marker, Popup, TileLayer, LayerGroup} from 'react-leaflet'
//import Paper from 'material-ui/Paper';
import {Tabs, Tab} from 'material-ui/Tabs';
import FontIcon from 'material-ui/FontIcon';
import {List, ListItem} from 'material-ui/List';
//import Divider from 'material-ui/Divider';
import Subheader from 'material-ui/Subheader';
import Avatar from 'material-ui/Avatar';



const max_messages_length =30;

function unixTime2ymd(intTime){
    // var d = new Date( intTime );
    var d = new Date( intTime * 1000 );
    var year  = d.getFullYear();
    var month = d.getMonth() + 1;
    var day  = d.getDate();
    var hour = ( '0' + d.getHours() ).slice(-2);
    var min  = ( '0' + d.getMinutes() ).slice(-2);
    var sec   = ( '0' + d.getSeconds() ).slice(-2);
    return( year + '-' + month + '-' + day + ' ' + hour + ':' + min + ':' + sec );
}


class Chat extends React.Component {
  constructor() {
    super();
    this.state = {
      isJoined: false,
      inputUser: "",
      inputMessage: "",
      inputKey: "",
      inputKey0: "",
      messages: [],
      geo_messages: [],
      presences: {},
      lat: 40,
      lng: -74,
      zoom: 6,
    }
  }
  handleInputUser(event) {
    this.setState({
      inputUser: event.target.value
    })
  }

  handleInputMessage(event) {
    this.setState({
      inputMessage: event.target.value
    })
  }

  handleInputKey(event) {
    this.setState({
      inputKey: event.target.value
    })
  }

  handleInputKey0(event) {
    this.setState({
      inputKey0: event.target.value
    })
  }

  handleJoin(event) {
    event.preventDefault();
    if(this.state.inputUser!="") {

        // assets/js/socket.jsのデフォルトの定義と同じ
        this.socket = new Socket("/socket", {params:
          {token: window.userToken}
        });
        this.socket.connect();

        this.channel = this.socket.channel("room:lobby",  {user_name: this.state.inputUser});

        // 現在のサーバの状態を初期状態として設定
        this.channel.on('presence_state', state => {
          let presences = this.state.presences;
          presences = Presence.syncState(presences, state);
          this.setState({ presences: presences })
          console.log('state', presences);
        });
        // 初期状態からの差分を更新していく
        this.channel.on('presence_diff', diff => {
          let presences = this.state.presences;
          presences = Presence.syncDiff(presences, diff);
          this.setState({ presences: presences })
          console.log('diff', presences);
        });

        this.channel.on("new_msg", payload => {
          let messages = this.state.messages;
          messages.push(payload)
          if(messages.length > max_messages_length) {
              messages.shift(); // 先頭を削除
          }
          this.setState({ messages: messages })

          if( !!payload.geo && !!payload.geo.coordinates ) {
              let geo_messages = this.state.geo_messages;
              geo_messages.push(payload);
              if(geo_messages.length > max_messages_length) {
                geo_messages.shift(); // 先頭を削除
              }
              this.setState({ geo_messages: geo_messages })
          }
        })

        this.channel.join()
          .receive("ok", response => { console.log("Joined successfully", response) })
          .receive('error', resp => { console.log('Unable to join', resp); });

        this.setState({ isJoined: true })
    }
  }

  handleLeave(event) {
    event.preventDefault();
    this.socket.disconnect();
    this.setState({ isJoined: false })
  }


  handleSubmit(event) {
    event.preventDefault();
    this.channel.push("new_msg", {msg: this.state.inputMessage, geo:null, counter:0})
    this.setState({ inputMessage: "" })
  }

  handleKey(event) {
    event.preventDefault();
    this.channel.push("new_key", {msg: this.state.inputKey})
    this.setState({ inputKey: "" })
  }

  handleKey0(event) {
    event.preventDefault();
    this.channel.push("del_key", {msg: this.state.inputKey0})
    this.setState({ inputKey0: "" })
  }

  render() {
    const style1 = { margin: '16px 32px 16px 16px', padding: '10px 32px 10px 26px',};
    const style2 = { display: 'inline-block', margin: '1px 8px 1px 4px',};

//--------------------------
    let messages = this.state.messages.map((message, index) => {
        let geo = "";
        if( !!message.geo && !!message.geo.coordinates ) {
            geo = JSON.stringify(message.geo.coordinates);
        }

        return (
          <List key={index}>
            <ListItem
              leftAvatar={<Avatar>{message.user_name}</Avatar>}
              primaryText={unixTime2ymd(message.time)}
              secondaryText={geo}
              secondaryTextLines={2} >
              {message.counter} {message.msg}
            </ListItem>
          </List>
        )
    });
    messages = messages.reverse();

    let geo_messages = this.state.geo_messages;
    geo_messages =  geo_messages.reverse();
    let center_position = [this.state.lat, this.state.lng]; // 緯度,経度
    if( geo_messages && geo_messages[0]) {
        center_position = [ geo_messages[0].geo.coordinates[0], geo_messages[0].geo.coordinates[1] ];
    }

    let Markers = [];
    for (let i = 0; i < geo_messages.length; i++) {
        let tweet_position = [ geo_messages[i].geo.coordinates[0], geo_messages[i].geo.coordinates[1] ];
        let tweet_coordinates = JSON.stringify(geo_messages[i].geo);
        Markers.push(
            <Marker position={tweet_position} key={i}>
              <Popup>
                <div>
                  <div>{unixTime2ymd(geo_messages[i].time)}</div>
                  <div>{geo_messages[i].msg}</div>
                  <div>{geo_messages[i].user_name}</div>
                  <div>{tweet_coordinates}</div>
                </div>
              </Popup>
            </Marker>
        )
    }
//--------------------------
    let presences = [];
    Presence.list(this.state.presences, (name, metas) => {
        presences.push(name);
    });
    let presences_list = presences.map( (user_name, index) =>
      <li key={index} style={style2}>{user_name}</li>
    );

    let form_jsx;
    if(this.state.isJoined===false) {
       form_jsx = (
        <form onSubmit={this.handleJoin.bind(this)} >
          <label>ユーザ名を指定してJoin</label>&nbsp;&nbsp;&nbsp;&nbsp;
          <TextField hintText="ユーザ名" value = {this.state.inputUser} onChange = {this.handleInputUser.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
          <RaisedButton type="submit" primary={true} label="Join" />
        </form>
       );
    } else {
       form_jsx = (
         <div>
           <Paper  style={style1}>
             <label>参加者 : {this.state.inputUser}</label>
             <ul>
               {presences_list}
             </ul>
             <div align="right">
               <form onSubmit={this.handleLeave.bind(this)} >
                 <RaisedButton type="submit" primary={true} label="Leave" />
               </form>
             </div>
           </Paper>
           <Paper  style={style1}>
             <form onSubmit={this.handleKey.bind(this)}>
               <label>キーワード登録</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey} onChange = {this.handleInputKey.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <form onSubmit={this.handleKey0.bind(this)}>
               <label>キーワード削除</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="key" value = {this.state.inputKey0} onChange = {this.handleInputKey0.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <form onSubmit={this.handleSubmit.bind(this)}>
               <label>チャット</label>&nbsp;&nbsp;&nbsp;&nbsp;
               <TextField hintText="Chat Text" value = {this.state.inputMessage} onChange = {this.handleInputMessage.bind(this)} />&nbsp;&nbsp;&nbsp;&nbsp;
               <RaisedButton type="submit" primary={true} label="Submit" />
             </form>
             <Divider />
             <br />
             <div>
               {messages}
             </div>
           </Paper>
         </div>
      );
    }

    return (
    <Tabs>
      <Tab label="チャット タイムライン">
        <br />
        <h1>チャット タイムライン編</h1>
        <div>
          {form_jsx}
        </div>
      </Tab>
      <Tab label="チャット マップ">
        <br />
        <h1>チャット マップ編</h1>
        <Map center={center_position} zoom={this.state.zoom}>
          <TileLayer attribution='&copy; <a href="http://osm.org/copyright">OpenStreetMap</a> contributors' url='http://{s}.tile.osm.org/{z}/{x}/{y}.png' />
          <LayerGroup>
            {Markers}
          </LayerGroup>
        </Map>
      </Tab>
    </Tabs>
    )
  }
}
export default Chat

 以上です。

5
3
0

Register as a new user and use Qiita more conveniently

  1. You get articles that match your needs
  2. You can efficiently read back useful information
  3. You can use dark theme
What you can do with signing up
5
3

Delete article

Deleted articles cannot be recovered.

Draft of this article would be also deleted.

Are you sure you want to delete this article?